Last active
August 9, 2019 21:25
Revisions
-
dslaw revised this gist
Aug 9, 2019 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -109,7 +109,7 @@ def main(): conn.commit() states = initial_states for _ in range(args.number): states = update_outbreak_statuses(cursor, states) conn.commit() delay_ms = random.randrange(500, 1_200) -
dslaw revised this gist
Aug 8, 2019 . No changes.There are no files selected for viewing
-
dslaw revised this gist
Aug 8, 2019 . 1 changed file with 4 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -45,7 +45,10 @@ def update_outbreak_status(cursor, state, timestamp): dead = %(dead)s, treated = %(treated)s, modified_at = %(modified_at)s """, { "modified_at": timestamp, **current }) return current -
dslaw created this gist
Aug 8, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,27 @@ Create a dev database: ```bash docker run \ -d \ -p 8011:5432 \ -e POSTGRES_USER=dev \ -e POSTGRES_DB=dev \ -e POSTGRES_PASSWORD=dev \ --name=pubsub-example \ postgres:11 psql --host=localhost --port=8011 --dbname=dev --user=dev -f schema.sql ``` Install dependencies: ```bash pip install persist-queue psycopg2-binary ``` Run the example: ```bash python subscriber.py & python writer.py --number=50 ``` This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,41 @@ create extension if not exists "uuid-ossp"; create table if not exists outbreak ( disease text not null, region text not null, infected integer not null, dead integer not null, treated integer not null, modified_at timestamp not null, check (infected >= 0), check (dead >= 0), check (treated >= 0), primary key (disease, region) ); create table if not exists table_audit ( id serial primary key, tablename text not null, status text not null, modified_at timestamp not null ); create or replace function cdc_publish() returns trigger as $$ begin perform pg_notify( 'processed', json_build_object( 'id', uuid_generate_v4()::text, 'op', tg_op, 'table', tg_table_schema || '.' || tg_table_name, 'data', row_to_json(new) )::text ); return null; end; $$ language plpgsql; drop trigger if exists outbreak_cdc_publish on outbreak; create trigger outbreak_cdc_publish after insert or update or delete on outbreak for each row execute procedure cdc_publish(); This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,50 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import persistqueue import psycopg2 import select from writer import params CHANNEL = "processed" POLL_TIMEOUT = 5 def poll(conn, callback): while True: if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []): continue conn.poll() while conn.notifies: notification = conn.notifies.pop(0) if notification.channel == CHANNEL: print("Received: ", notification.payload) callback(notification) return def push_message(queue, notification): message = notification.payload queue.put(message) return def main(): pq = persistqueue.SQLiteAckQueue("queue", auto_commit=True) def handler(notification): return push_message(pq, notification) with psycopg2.connect(**params) as conn: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cursor = conn.cursor() cursor.execute(f"listen {CHANNEL}") poll(conn, handler) return if __name__ == "__main__": main() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,119 @@ from datetime import datetime from time import sleep import argparse import psycopg2 import random params = { "host": "localhost", "port": 8011, "database": "dev", "user": "dev", "password": "dev", } def make_parser(): parser = argparse.ArgumentParser() parser.add_argument("--number", "-n", type=int, default=50) return parser def update_counts(previous): out = {} for field in ("infected", "dead", "treated"): previous_count = previous[field] change = random.randint(-10, 10) applied = previous_count + change out[field] = max(applied, 0) return {**previous, **out} def update_outbreak_status(cursor, state, timestamp): current = update_counts(state) cursor.execute(""" insert into outbreak (disease, region, infected, dead, treated, modified_at) values (%(disease)s, %(region)s, %(infected)s, %(dead)s, %(treated)s, %(modified_at)s) on conflict (disease, region) do update set infected = %(infected)s, dead = %(dead)s, treated = %(treated)s, modified_at = %(modified_at)s """, {"modified_at": timestamp, **current}) return current def update_outbreak_statuses(cursor, states): modified_at = datetime.utcnow() batch_size = random.choice([1, 2, 3]) indices = set(random.sample([0, 1, 2], k=batch_size)) updated = [] for idx, state in enumerate(states): if idx not in indices: updated.append(state) continue updated.append( update_outbreak_status(cursor, state, modified_at) ) cursor.execute(""" insert into table_audit (tablename, status, modified_at) values ('outbreak', 'Success', %(modified_at)s) """, {"modified_at": modified_at}) return updated def main(): parser = make_parser() args = parser.parse_args() initial_states = [ { "disease": "Cholera", "region": "Congo Basin", "infected": 2, "dead": 0, "treated": 0, }, { "disease": "SARS", "region": "Western China", "infected": 20, "dead": 4, "treated": 2, }, { "disease": "Avian Flu", "region": "Southern China", "infected": 4, "dead": 0, "treated": 1, } ] random.seed(13) with psycopg2.connect(**params) as conn: cursor = conn.cursor() cursor.execute("truncate table outbreak") cursor.execute("truncate table table_audit") conn.commit() states = initial_states for n in range(args.number): states = update_outbreak_statuses(cursor, states) conn.commit() delay_ms = random.randrange(500, 1_200) sleep(delay_ms / 1_000) return if __name__ == "__main__": main()