Created
September 4, 2020 16:37
-
-
Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
How to use PostgreSQL's LISTEN/NOTIFY as a simple message queue with psycopg2 and asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import psycopg2 | |
# dbname should be the same for the notifying process | |
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example") | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
cursor = conn.cursor() | |
cursor.execute(f"LISTEN match_updates;") | |
def handle_notify(): | |
conn.poll() | |
for notify in conn.notifies: | |
print(notify.payload) | |
conn.notifies.clear() | |
# It works with uvloop too: | |
# import uvloop | |
# loop = uvloop.new_event_loop() | |
# asyncio.set_event_loop(loop) | |
loop = asyncio.get_event_loop() | |
loop.add_reader(conn, handle_notify) | |
loop.run_forever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import psycopg2 | |
# dbname should be the same for the listening process | |
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example") | |
cursor = conn.cursor() | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
while True: | |
val = time.time() | |
cursor.execute(f"NOTIFY match_updates, '{val}';") | |
time.sleep(1) |
A version to listen to PostgreSQL while doing other not async job :
https://gist.github.com/ArthurDelannoyazerty/426d2aa084aad644b215b8cacbffe914
import asyncio
import logging
import os
import psycopg2
import threading
import time
from dotenv import find_dotenv, load_dotenv
def listen_to_notifications():
try:
load_dotenv(find_dotenv())
conn = psycopg2.connect(
host=os.environ.get('POSTGRES_IP'),
port=os.environ.get('POSTGRES_PORT'),
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASSWORD'),
database=os.environ.get('POSTGRES_DB_DPMC_NAME')
)
cursor = conn.cursor()
cursor.execute("LISTEN channel;") # Modify the channel here
conn.commit()
def handle_notify():
conn.poll()
for notify in conn.notifies:
print(notify.payload) # Your code here
conn.notifies.clear()
loop = asyncio.new_event_loop() # No event loop exist in the new thread, so we create one
asyncio.set_event_loop(loop)
loop.add_reader(conn, handle_notify)
loop.run_forever()
except KeyboardInterrupt:
print("Infinite loop stopped.")
def infinite_loop():
try:
while True:
print("Executing synchronous task...")
time.sleep(0.4) # Simulate work
except KeyboardInterrupt:
print("Infinite loop stopped.")
if __name__ == "__main__":
notification_thread = threading.Thread(target=listen_to_notifications, daemon=False) # Daemon=False --> The thread is closed when the main thread is finicshed
notification_thread.start()
# Run the infinite loop in the main thread (simulate a job)
infinite_loop() # Your code here
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here is my take using psycopg version 3. CC @jamesbraza
Listener only: