-
-
Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
import asyncio | |
import psycopg2 | |
# dbname should be the same for the notifying process | |
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example") | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
cursor = conn.cursor() | |
cursor.execute(f"LISTEN match_updates;") | |
def handle_notify(): | |
conn.poll() | |
for notify in conn.notifies: | |
print(notify.payload) | |
conn.notifies.clear() | |
# It works with uvloop too: | |
# import uvloop | |
# loop = uvloop.new_event_loop() | |
# asyncio.set_event_loop(loop) | |
loop = asyncio.get_event_loop() | |
loop.add_reader(conn, handle_notify) | |
loop.run_forever() |
import time | |
import psycopg2 | |
# dbname should be the same for the listening process | |
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example") | |
cursor = conn.cursor() | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
while True: | |
val = time.time() | |
cursor.execute(f"NOTIFY match_updates, '{val}';") | |
time.sleep(1) |
@kissgyorgy this is absolutely fantastic. I'm trying to ATTACH PARTITIONS to a table on inserts by using the listener. So, if I run the statements in listen-new.py and then perform my required insert statements right after it, then how do i exit/terminate the listener (since in the script it uses loop.run_forever()). I intend to run the listener only while I am running the insert statements thru my Python program.
@kissgyorgy this is absolutely fantastic. I'm trying to ATTACH PARTITIONS to a table on inserts by using the listener. So, if I run the statements in listen-new.py and then perform my required insert statements right after it, then how do i exit/terminate the listener (since in the script it uses loop.run_forever()). I intend to run the listener only while I am running the insert statements thru my Python program.
I don't understand exactly what you are trying to do, but maybe a TRIGGER would be better for your use-case? Triggers can be used for partitioned tables too:
Creating a row-level trigger on a partitioned table will cause an identical “clone” trigger to be created on each of its existing partitions; and any partitions created or attached later will have an identical trigger, too.
https://www.postgresql.org/docs/current/sql-createtrigger.html
Isn't there an SQL injection in the NOTIFY part?
Isn't there an SQL injection in the NOTIFY part?
- If you look at the definition of SQL injection at Wikipedia, in the second sentence it says "... when user input is either incorrectly filtered...". There is no user input in the snippet.
- This is an example of how to wire it up, people shouldn't copy-paste code from the internet verbatim.
@d33tah You can call the pg_notify function instead of the NOTIFY directive. You should be able to do parameter substitution with psycopg2 then.
Thank you so much for the detailed reply!
It would be quite useful @kissgyorgy if you have time to make a similar gist for psycopg
version 3
Here is my take using psycopg version 3. CC @jamesbraza
Listener only:
import os
import psycopg
import asyncio
def run_main():
conn = psycopg.connect(host="localhost", dbname="example", user=os.environ['USER'], password="")
cursor = conn.cursor()
cursor.execute(f"LISTEN match_updates;")
conn.commit()
def handle_notify():
try:
for notify in conn.notifies(stop_after=0):
print(notify.payload)
except:
print('Error occurred')
raise
loop = asyncio.get_event_loop()
loop.add_reader(conn, handle_notify)
loop.run_forever()
if __name__ == "__main__":
try:
print(f'Starting')
run_main()
finally:
print('Exiting')
A version to listen to PostgreSQL while doing other not async job :
https://gist.github.com/ArthurDelannoyazerty/426d2aa084aad644b215b8cacbffe914
import asyncio
import logging
import os
import psycopg2
import threading
import time
from dotenv import find_dotenv, load_dotenv
def listen_to_notifications():
try:
load_dotenv(find_dotenv())
conn = psycopg2.connect(
host=os.environ.get('POSTGRES_IP'),
port=os.environ.get('POSTGRES_PORT'),
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASSWORD'),
database=os.environ.get('POSTGRES_DB_DPMC_NAME')
)
cursor = conn.cursor()
cursor.execute("LISTEN channel;") # Modify the channel here
conn.commit()
def handle_notify():
conn.poll()
for notify in conn.notifies:
print(notify.payload) # Your code here
conn.notifies.clear()
loop = asyncio.new_event_loop() # No event loop exist in the new thread, so we create one
asyncio.set_event_loop(loop)
loop.add_reader(conn, handle_notify)
loop.run_forever()
except KeyboardInterrupt:
print("Infinite loop stopped.")
def infinite_loop():
try:
while True:
print("Executing synchronous task...")
time.sleep(0.4) # Simulate work
except KeyboardInterrupt:
print("Infinite loop stopped.")
if __name__ == "__main__":
notification_thread = threading.Thread(target=listen_to_notifications, daemon=False) # Daemon=False --> The thread is closed when the main thread is finicshed
notification_thread.start()
# Run the infinite loop in the main thread (simulate a job)
infinite_loop() # Your code here
@kissgyorgy Thanks very much for the detailed reply. You went above and beyond. Highly appreciated.