Skip to content

Instantly share code, notes, and snippets.

@draincoder
Last active December 11, 2024 14:20
Show Gist options
  • Save draincoder/f38106a12f3b89d5ba46136c16f355d2 to your computer and use it in GitHub Desktop.
Save draincoder/f38106a12f3b89d5ba46136c16f355d2 to your computer and use it in GitHub Desktop.
FastStream Redis batch
import asyncio
from faststream import FastStream
from faststream.redis import RedisBroker, RedisMessage, StreamSub
broker = RedisBroker()
app = FastStream(broker)
@broker.subscriber(stream=StreamSub("test-stream", batch=True, polling_interval=1000))
async def handle(msg: RedisMessage) -> None:
print(msg.raw_message)
print(msg.body)
await msg.ack()
@app.after_startup
async def send() -> None:
for i in range(10):
await broker.publish("hello", stream="test-stream")
if __name__ == "__main__":
asyncio.run(app. run())
annotated-types==0.7.0
anyio==4.7.0
fast-depends==2.4.12
faststream==0.5.33
idna==3.10
pydantic==2.10.3
pydantic_core==2.27.1
redis==5.2.1
sniffio==1.3.1
typing_extensions==4.12.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment