Skip to content

Instantly share code, notes, and snippets.

@kissgyorgy
Created September 4, 2020 16:37
Show Gist options
  • Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
How to use PostgreSQL's LISTEN/NOTIFY as a simple message queue with psycopg2 and asyncio
import asyncio
import psycopg2
# dbname should be the same for the notifying process
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute(f"LISTEN match_updates;")
def handle_notify():
conn.poll()
for notify in conn.notifies:
print(notify.payload)
conn.notifies.clear()
# It works with uvloop too:
# import uvloop
# loop = uvloop.new_event_loop()
# asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
loop.add_reader(conn, handle_notify)
loop.run_forever()
import time
import psycopg2
# dbname should be the same for the listening process
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
cursor = conn.cursor()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
while True:
val = time.time()
cursor.execute(f"NOTIFY match_updates, '{val}';")
time.sleep(1)
@kissgyorgy
Copy link
Author

Isn't there an SQL injection in the NOTIFY part?

  1. If you look at the definition of SQL injection at Wikipedia, in the second sentence it says "... when user input is either incorrectly filtered...". There is no user input in the snippet.
  2. This is an example of how to wire it up, people shouldn't copy-paste code from the internet verbatim.

@kartikdc
Copy link

@d33tah You can call the pg_notify function instead of the NOTIFY directive. You should be able to do parameter substitution with psycopg2 then.

@ubarar
Copy link

ubarar commented Oct 15, 2023

Thank you so much for the detailed reply!

@jamesbraza
Copy link

It would be quite useful @kissgyorgy if you have time to make a similar gist for psycopg version 3

@ivarref
Copy link

ivarref commented Oct 15, 2024

Here is my take using psycopg version 3. CC @jamesbraza

Listener only:

import os
import psycopg
import asyncio

def run_main():
    conn = psycopg.connect(host="localhost", dbname="example", user=os.environ['USER'], password="")

    cursor = conn.cursor()
    cursor.execute(f"LISTEN match_updates;")
    conn.commit()

    def handle_notify():
        try:
            for notify in conn.notifies(stop_after=0):
                print(notify.payload)
        except:
            print('Error occurred')
            raise

    loop = asyncio.get_event_loop()
    loop.add_reader(conn, handle_notify)
    loop.run_forever()

if __name__ == "__main__":
    try:
        print(f'Starting')
        run_main()
    finally:
        print('Exiting')

@ArthurDelannoyazerty
Copy link

A version to listen to PostgreSQL while doing other not async job :

https://gist.github.com/ArthurDelannoyazerty/426d2aa084aad644b215b8cacbffe914

import asyncio
import logging
import os
import psycopg2
import threading
import time

from dotenv import find_dotenv, load_dotenv

def listen_to_notifications():
    try:
        load_dotenv(find_dotenv())
        
        conn = psycopg2.connect(
            host=os.environ.get('POSTGRES_IP'),
            port=os.environ.get('POSTGRES_PORT'),
            user=os.environ.get('POSTGRES_USER'),
            password=os.environ.get('POSTGRES_PASSWORD'),
            database=os.environ.get('POSTGRES_DB_DPMC_NAME')
        )
        cursor = conn.cursor()
        cursor.execute("LISTEN channel;")   # Modify the channel here
        conn.commit()

        def handle_notify():
            conn.poll()
            for notify in conn.notifies:
                print(notify.payload)       # Your code here
            conn.notifies.clear()

        loop = asyncio.new_event_loop()     # No event loop exist in the new thread, so we create one
        asyncio.set_event_loop(loop)
        loop.add_reader(conn, handle_notify)
        loop.run_forever()
    
    except KeyboardInterrupt:
        print("Infinite loop stopped.")


def infinite_loop():
    try:
        while True:
            print("Executing synchronous task...")
            time.sleep(0.4)  # Simulate work
    except KeyboardInterrupt:
        print("Infinite loop stopped.")


if __name__ == "__main__":
    notification_thread = threading.Thread(target=listen_to_notifications, daemon=False)        # Daemon=False  --> The thread is closed when the main thread is finicshed
    notification_thread.start()
    
    # Run the infinite loop in the main thread (simulate a job)
    infinite_loop()     # Your code here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment