Skip to content

Instantly share code, notes, and snippets.

@zamoosh
Last active May 19, 2025 18:41
Show Gist options
  • Save zamoosh/3a87830c8566341d0c7f9d317cb25d61 to your computer and use it in GitHub Desktop.
Save zamoosh/3a87830c8566341d0c7f9d317cb25d61 to your computer and use it in GitHub Desktop.
Redis stream sample code

๐Ÿ“Œ What This Code Does

This Python script showcases a multi-threaded producer-consumer system using Redis Streams and consumer groups.

๐Ÿ’ก Key Components:

  • Redis Streams: A log-like data structure in Redis, ideal for real-time data ingestion and processing.
  • Producer (writer): Continuously pushes randomized market data to a Redis stream.
  • Consumer (reader): Reads new entries from the stream via a consumer group, acknowledges processed items, and deletes them to avoid reprocessing.
  • Threading: Two threads are used to simulate concurrent real-world producer-consumer scenarios.

๐Ÿงต Architecture Overview

+------------+     XADD      +------------+     XREADGROUP     +------------+
|  writer()  | ------------> |  Redis     | <----------------- |  reader()  |
|  (Thread)  |               |  Stream S,U|      block=true    |  (Thread)  |
+------------+               +------------+                    +------------+
                                    |                               |
                                  XACK                           processes
                                    |                               |
                                  XDEL                         (optional clean-up)

๐Ÿ› ๏ธ How It Works

๐Ÿ”„ Producer (Writer Thread)

  • Generates fake market data (symbols like CADJPY, GBPCAD, etc.).
  • Pushes data into a Redis Stream using XADD.
  • Runs iteration number of times and sleeps randomly to simulate load variability.

๐Ÿ‘‚ Consumer (Reader Thread)

  • Creates a Redis consumer group (g1) if it doesn't exist.

  • Continuously reads new messages from the stream (>).

  • Processes incoming messages, logs them, and:

    • Acknowledges with XACK so Redis knows they're handled.
    • Deletes them with XDEL to clean up the stream.

โ–ถ๏ธ How to Use

1. Install Dependencies

pip install redis python-dotenv loguru

2. Set Up Environment Variables

Create a .env file:

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASS=

3. Run the Script

python streams.py

You'll see writer and reader threads working in parallel, with data flowing through Redis.


๐Ÿงผ Clean Exit

  • Both threads (t1, t2) are started and joined in __main__.
  • If you want graceful shutdown or cancellation, consider using threading.Event or signal handlers (not included here).
import os
from dotenv import load_dotenv
load_dotenv()
import redis
from loguru import logger
import threading
import json
import random
import time
REDIS_CONF = {
"host": os.getenv("REDIS_HOST"),
"port": int(os.getenv("REDIS_PORT", -1)),
"db": int(os.getenv("REDIS_DB", -1)),
"password": os.getenv("REDIS_PASS"),
"max_connections": 500,
"decode_responses": True,
}
REDIS_POOL = redis.ConnectionPool(**REDIS_CONF)
r = redis.Redis(connection_pool=REDIS_POOL)
def writer(stream: str, iteration: int):
logger.info("in writer")
while iteration > 0:
if iteration % random.randint(30, 50) == 0:
time.sleep(random.randint(1, 5) / 10)
pure_data = {
"PipWise_Ecn_MT5": {
time.time()
* 1000: [
{"i": "CADJPY", "b": random.randint(100, 2000) / 10, "a": random.randint(100, 2000) / 10},
{"i": "GBPCAD", "b": random.randint(100, 2000) / 10, "a": random.randint(100, 2000) / 10},
{"i": "GBPJPY", "b": random.randint(100, 2000) / 10, "a": random.randint(100, 2000) / 10},
{"i": "ETCUSD", "b": random.randint(100, 2000) / 10, "a": random.randint(100, 2000) / 10},
{"i": ".HK50Cash", "b": random.randint(100, 2000) / 10, "a": random.randint(100, 2000) / 10},
{"i": ".USTECHCash", "b": random.randint(100, 2000) / 10, "a": random.randint(100, 2000) / 10},
]
}
}
data = {"data": json.dumps(pure_data)}
r.xadd(stream, data)
iteration -= 1
return
def reader(stream: str):
logger.info("in reader")
try:
r.xgroup_create(stream, "g1", "$", mkstream=True)
except redis.exceptions.ResponseError as e:
logger.error(e)
while True:
res = r.xreadgroup(
streams={stream: ">"},
# streams={stream: 0},
consumername="c1",
groupname="g1",
block=5000,
count=1,
)
if not res:
continue
for _, messages in res:
ack_ids = []
for message_id, data in messages:
logger.info(f"got {message_id}: {data}")
ack_ids.append(message_id)
r.xack(stream, "g1", *ack_ids)
r.xdel(stream, *ack_ids)
return
if __name__ == "__main__":
t1 = threading.Thread(
target=writer,
args=("S,U", 100_000),
)
t2 = threading.Thread(
target=reader,
args=("S,U",),
)
t1.start()
t2.start()
t1.join()
t2.join()
logger.info("end of the program")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment