This Python script showcases a multi-threaded producer-consumer system using Redis Streams and consumer groups.
- Redis Streams: A log-like data structure in Redis, ideal for real-time data ingestion and processing.
- Producer (
writer): Continuously pushes randomized market data to a Redis stream. - Consumer (
reader): Reads new entries from the stream via a consumer group, acknowledges processed items, and deletes them to avoid reprocessing. - Threading: Two threads are used to simulate concurrent real-world producer-consumer scenarios.
+------------+ XADD +------------+ XREADGROUP +------------+
| writer() | ------------> | Redis | <----------------- | reader() |
| (Thread) | | Stream S,U| block=true | (Thread) |
+------------+ +------------+ +------------+
| |
XACK processes
| |
XDEL (optional clean-up)
- Generates fake market data (symbols like
CADJPY,GBPCAD, etc.). - Pushes data into a Redis Stream using
XADD. - Runs
iterationnumber of times and sleeps randomly to simulate load variability.
-
Creates a Redis consumer group (
g1) if it doesn't exist. -
Continuously reads new messages from the stream (
>). -
Processes incoming messages, logs them, and:
- Acknowledges with
XACKso Redis knows they're handled. - Deletes them with
XDELto clean up the stream.
- Acknowledges with
pip install redis python-dotenv loguruCreate a .env file:
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASS=python streams.pyYou'll see writer and reader threads working in parallel, with data flowing through Redis.
- Both threads (
t1,t2) are started and joined in__main__. - If you want graceful shutdown or cancellation, consider using
threading.Eventorsignalhandlers (not included here).