Skip to content

Instantly share code, notes, and snippets.

@kissgyorgy
Created September 4, 2020 16:37
Show Gist options
  • Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
How to use PostgreSQL's LISTEN/NOTIFY as a simple message queue with psycopg2 and asyncio
import asyncio
import psycopg2
# dbname should be the same for the notifying process
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute(f"LISTEN match_updates;")
def handle_notify():
conn.poll()
for notify in conn.notifies:
print(notify.payload)
conn.notifies.clear()
# It works with uvloop too:
# import uvloop
# loop = uvloop.new_event_loop()
# asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
loop.add_reader(conn, handle_notify)
loop.run_forever()
import time
import psycopg2
# dbname should be the same for the listening process
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
cursor = conn.cursor()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
while True:
val = time.time()
cursor.execute(f"NOTIFY match_updates, '{val}';")
time.sleep(1)
@ivarref
Copy link

ivarref commented Oct 15, 2024

Here is my take using psycopg version 3. CC @jamesbraza

Listener only:

import os
import psycopg
import asyncio

def run_main():
    conn = psycopg.connect(host="localhost", dbname="example", user=os.environ['USER'], password="")

    cursor = conn.cursor()
    cursor.execute(f"LISTEN match_updates;")
    conn.commit()

    def handle_notify():
        try:
            for notify in conn.notifies(stop_after=0):
                print(notify.payload)
        except:
            print('Error occurred')
            raise

    loop = asyncio.get_event_loop()
    loop.add_reader(conn, handle_notify)
    loop.run_forever()

if __name__ == "__main__":
    try:
        print(f'Starting')
        run_main()
    finally:
        print('Exiting')

@ArthurDelannoyazerty
Copy link

A version to listen to PostgreSQL while doing other not async job :

https://gist.github.com/ArthurDelannoyazerty/426d2aa084aad644b215b8cacbffe914

import asyncio
import logging
import os
import psycopg2
import threading
import time

from dotenv import find_dotenv, load_dotenv

def listen_to_notifications():
    try:
        load_dotenv(find_dotenv())
        
        conn = psycopg2.connect(
            host=os.environ.get('POSTGRES_IP'),
            port=os.environ.get('POSTGRES_PORT'),
            user=os.environ.get('POSTGRES_USER'),
            password=os.environ.get('POSTGRES_PASSWORD'),
            database=os.environ.get('POSTGRES_DB_DPMC_NAME')
        )
        cursor = conn.cursor()
        cursor.execute("LISTEN channel;")   # Modify the channel here
        conn.commit()

        def handle_notify():
            conn.poll()
            for notify in conn.notifies:
                print(notify.payload)       # Your code here
            conn.notifies.clear()

        loop = asyncio.new_event_loop()     # No event loop exist in the new thread, so we create one
        asyncio.set_event_loop(loop)
        loop.add_reader(conn, handle_notify)
        loop.run_forever()
    
    except KeyboardInterrupt:
        print("Infinite loop stopped.")


def infinite_loop():
    try:
        while True:
            print("Executing synchronous task...")
            time.sleep(0.4)  # Simulate work
    except KeyboardInterrupt:
        print("Infinite loop stopped.")


if __name__ == "__main__":
    notification_thread = threading.Thread(target=listen_to_notifications, daemon=False)        # Daemon=False  --> The thread is closed when the main thread is finicshed
    notification_thread.start()
    
    # Run the infinite loop in the main thread (simulate a job)
    infinite_loop()     # Your code here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment