Created
November 15, 2023 18:30
-
-
Save cra/4ec345e4627f89352045796ead7fc20d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from datetime import timedelta | |
import psycopg2 # pip install psycopg2-binary | |
# pip install bytewax==0.17.2 | |
from bytewax.connectors.stdio import StdOutput | |
from bytewax.connectors.periodic import SimplePollingInput | |
from bytewax.dataflow import Dataflow | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
) | |
class PGTableInput(SimplePollingInput): | |
TABLE = 'public.entries' | |
TS_FIELD = 'timestamp' | |
DATA_FIELDS = 'id,event_type' | |
def __init__(self, interval): | |
# skip align_to argument | |
super().__init__(interval, None) | |
self.conn = psycopg2.connect( | |
dbname='analytics', | |
user='my_user', | |
password='my_password', | |
host='data.some.db', | |
port=5433, | |
) | |
self.cur = self.conn.cursor() | |
# you might want to pass some initial value for this one | |
self.latest_ts = None | |
@property | |
def query(self): | |
# this is to make sure timestamp is the first | |
query = f''' | |
SELECT | |
{self.TS_FIELD}, {self.DATA_FIELDS} | |
FROM {self.TABLE} | |
''' | |
if self.latest_ts: | |
query += f''' | |
WHERE {self.TS_FIELD} > {self.latest_ts} | |
''' | |
query += f'ORDER BY {self.TS_FIELD}' | |
return query | |
def _fetch_rows(self): | |
self.cur.execute(self.query) | |
return self.cur.fetchall() | |
def next_item(self): | |
rows = self._fetch_rows() | |
logging.info(f'{len(rows)} new rows, latest ts={self.latest_ts}') | |
# first field of last row is the latest ts | |
if rows: | |
self.latest_ts = rows[-1][0] | |
return rows | |
def do_the_thing(row): | |
''' | |
Placeholder that does something with a row passed to it | |
''' | |
return (len(row), row) | |
flow = Dataflow() | |
inp = PGTableInput(interval=timedelta(seconds=1)) | |
out = StdOutput() | |
flow.input('inp', inp) | |
flow.flat_map(lambda x: x) # (a,b,c) -> a b c | |
flow.redistribute() # rebalances flow among workers | |
flow.map(do_the_thing) | |
flow.output('out', out) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment