Skip to content

Instantly share code, notes, and snippets.

@dslaw
Last active August 9, 2019 21:25

Revisions

  1. dslaw revised this gist Aug 9, 2019. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion writer.py
    Original file line number Diff line number Diff line change
    @@ -109,7 +109,7 @@ def main():
    conn.commit()

    states = initial_states
    for n in range(args.number):
    for _ in range(args.number):
    states = update_outbreak_statuses(cursor, states)
    conn.commit()
    delay_ms = random.randrange(500, 1_200)
  2. dslaw revised this gist Aug 8, 2019. No changes.
  3. dslaw revised this gist Aug 8, 2019. 1 changed file with 4 additions and 1 deletion.
    5 changes: 4 additions & 1 deletion writer.py
    Original file line number Diff line number Diff line change
    @@ -45,7 +45,10 @@ def update_outbreak_status(cursor, state, timestamp):
    dead = %(dead)s,
    treated = %(treated)s,
    modified_at = %(modified_at)s
    """, {"modified_at": timestamp, **current})
    """, {
    "modified_at": timestamp,
    **current
    })
    return current


  4. dslaw created this gist Aug 8, 2019.
    27 changes: 27 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,27 @@
    Create a dev database:

    ```bash
    docker run \
    -d \
    -p 8011:5432 \
    -e POSTGRES_USER=dev \
    -e POSTGRES_DB=dev \
    -e POSTGRES_PASSWORD=dev \
    --name=pubsub-example \
    postgres:11

    psql --host=localhost --port=8011 --dbname=dev --user=dev -f schema.sql
    ```

    Install dependencies:

    ```bash
    pip install persist-queue psycopg2-binary
    ```

    Run the example:

    ```bash
    python subscriber.py &
    python writer.py --number=50
    ```
    41 changes: 41 additions & 0 deletions schema.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,41 @@
    create extension if not exists "uuid-ossp";

    create table if not exists outbreak (
    disease text not null,
    region text not null,
    infected integer not null,
    dead integer not null,
    treated integer not null,
    modified_at timestamp not null,
    check (infected >= 0),
    check (dead >= 0),
    check (treated >= 0),
    primary key (disease, region)
    );

    create table if not exists table_audit (
    id serial primary key,
    tablename text not null,
    status text not null,
    modified_at timestamp not null
    );

    create or replace function cdc_publish() returns trigger as $$
    begin
    perform pg_notify(
    'processed',
    json_build_object(
    'id', uuid_generate_v4()::text,
    'op', tg_op,
    'table', tg_table_schema || '.' || tg_table_name,
    'data', row_to_json(new)
    )::text
    );
    return null;
    end;
    $$ language plpgsql;

    drop trigger if exists outbreak_cdc_publish on outbreak;
    create trigger outbreak_cdc_publish
    after insert or update or delete on outbreak
    for each row execute procedure cdc_publish();
    50 changes: 50 additions & 0 deletions subscriber.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
    import persistqueue
    import psycopg2
    import select

    from writer import params


    CHANNEL = "processed"
    POLL_TIMEOUT = 5


    def poll(conn, callback):
    while True:
    if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []):
    continue

    conn.poll()
    while conn.notifies:
    notification = conn.notifies.pop(0)
    if notification.channel == CHANNEL:
    print("Received: ", notification.payload)
    callback(notification)

    return


    def push_message(queue, notification):
    message = notification.payload
    queue.put(message)
    return


    def main():
    pq = persistqueue.SQLiteAckQueue("queue", auto_commit=True)

    def handler(notification):
    return push_message(pq, notification)

    with psycopg2.connect(**params) as conn:
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    cursor.execute(f"listen {CHANNEL}")
    poll(conn, handler)

    return


    if __name__ == "__main__":
    main()
    119 changes: 119 additions & 0 deletions writer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,119 @@
    from datetime import datetime
    from time import sleep
    import argparse
    import psycopg2
    import random


    params = {
    "host": "localhost",
    "port": 8011,
    "database": "dev",
    "user": "dev",
    "password": "dev",
    }


    def make_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("--number", "-n", type=int, default=50)
    return parser



    def update_counts(previous):
    out = {}
    for field in ("infected", "dead", "treated"):
    previous_count = previous[field]
    change = random.randint(-10, 10)
    applied = previous_count + change
    out[field] = max(applied, 0)

    return {**previous, **out}


    def update_outbreak_status(cursor, state, timestamp):
    current = update_counts(state)
    cursor.execute("""
    insert into outbreak
    (disease, region, infected, dead, treated, modified_at)
    values
    (%(disease)s, %(region)s, %(infected)s, %(dead)s, %(treated)s, %(modified_at)s)
    on conflict (disease, region) do update
    set
    infected = %(infected)s,
    dead = %(dead)s,
    treated = %(treated)s,
    modified_at = %(modified_at)s
    """, {"modified_at": timestamp, **current})
    return current


    def update_outbreak_statuses(cursor, states):
    modified_at = datetime.utcnow()

    batch_size = random.choice([1, 2, 3])
    indices = set(random.sample([0, 1, 2], k=batch_size))
    updated = []
    for idx, state in enumerate(states):
    if idx not in indices:
    updated.append(state)
    continue
    updated.append(
    update_outbreak_status(cursor, state, modified_at)
    )

    cursor.execute("""
    insert into table_audit
    (tablename, status, modified_at)
    values
    ('outbreak', 'Success', %(modified_at)s)
    """, {"modified_at": modified_at})
    return updated


    def main():
    parser = make_parser()
    args = parser.parse_args()

    initial_states = [
    {
    "disease": "Cholera",
    "region": "Congo Basin",
    "infected": 2,
    "dead": 0,
    "treated": 0,
    }, {
    "disease": "SARS",
    "region": "Western China",
    "infected": 20,
    "dead": 4,
    "treated": 2,
    }, {
    "disease": "Avian Flu",
    "region": "Southern China",
    "infected": 4,
    "dead": 0,
    "treated": 1,
    }
    ]

    random.seed(13)
    with psycopg2.connect(**params) as conn:
    cursor = conn.cursor()
    cursor.execute("truncate table outbreak")
    cursor.execute("truncate table table_audit")
    conn.commit()

    states = initial_states
    for n in range(args.number):
    states = update_outbreak_statuses(cursor, states)
    conn.commit()
    delay_ms = random.randrange(500, 1_200)
    sleep(delay_ms / 1_000)

    return


    if __name__ == "__main__":
    main()