This Python script showcases a multi-threaded producer-consumer system using Redis Streams and consumer groups.
- Redis Streams: A log-like data structure in Redis, ideal for real-time data ingestion and processing.
- Producer (
writer
): Continuously pushes randomized market data to a Redis stream. - Consumer (
reader
): Reads new entries from the stream via a consumer group, acknowledges processed items, and deletes them to avoid reprocessing. - Threading: Two threads are used to simulate concurrent real-world producer-consumer scenarios.
+------------+ XADD +------------+ XREADGROUP +------------+
| writer() | ------------> | Redis | <----------------- | reader() |
| (Thread) | | Stream S,U| block=true | (Thread) |
+------------+ +------------+ +------------+
| |
XACK processes
| |
XDEL (optional clean-up)
- Generates fake market data (symbols like
CADJPY
,GBPCAD
, etc.). - Pushes data into a Redis Stream using
XADD
. - Runs
iteration
number of times and sleeps randomly to simulate load variability.
-
Creates a Redis consumer group (
g1
) if it doesn't exist. -
Continuously reads new messages from the stream (
>
). -
Processes incoming messages, logs them, and:
- Acknowledges with
XACK
so Redis knows they're handled. - Deletes them with
XDEL
to clean up the stream.
- Acknowledges with
pip install redis python-dotenv loguru
Create a .env
file:
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASS=
python streams.py
You'll see writer
and reader
threads working in parallel, with data flowing through Redis.
- Both threads (
t1
,t2
) are started and joined in__main__
. - If you want graceful shutdown or cancellation, consider using
threading.Event
orsignal
handlers (not included here).