Created
December 8, 2023 11:30
-
-
Save Tobi-De/92491f68a2ee4825cf377eaf7bba8195 to your computer and use it in GitHub Desktop.
sse litestar
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from litestar import Litestar, get | |
from litestar.channels.backends.redis import RedisChannelsPubSubBackend | |
from litestar.channels.plugin import ChannelsPlugin | |
from litestar.response.sse import ServerSentEvent | |
import redis.asyncio as Redis | |
from typing import Generator, AsyncGenerator | |
import json | |
from contextlib import asynccontextmanager | |
redis = Redis.from_url("redis://localhost:6379") | |
redis_backend = RedisChannelsPubSubBackend(redis=redis, key_prefix="RELAY_CHANNELS") | |
channels_plugin = ChannelsPlugin(backend=redis_backend, arbitrary_channels_allowed=True) | |
COUNT_KEY = "REDIS_COUNT_KEY" | |
@asynccontextmanager | |
async def increment(channel: str) -> Generator[None, None, None]: | |
await redis.hincrby(COUNT_KEY, channel, 1) | |
try: | |
yield | |
finally: | |
await redis.hincrby(COUNT_KEY, channel, -1) | |
async def generate_event( | |
channel_name: str, channels: ChannelsPlugin | |
) -> AsyncGenerator[bytes, None]: | |
async with increment(channel=channel_name), channels.start_subscription( | |
[channel_name] | |
) as subscriber: | |
async for message in subscriber.iter_events(): | |
print(message) | |
yield message | |
@get("/sse/{channel:str}") | |
async def sse(channel: str, channels: ChannelsPlugin) -> ServerSentEvent: | |
return ServerSentEvent(generate_event(channel_name=channel, channels=channels)) | |
@get("/count") | |
async def count()->dict[str, int]: | |
return { | |
k.decode(): int(v) for k, v in (await redis.hgetall(COUNT_KEY)).items() | |
} | |
app = Litestar(route_handlers=[sse, count], plugins=[channels_plugin]) | |
if __name__ == "__main__": | |
import redis as sync_redis | |
counter = 0 | |
while True: | |
print("Sending message") | |
r = sync_redis.from_url("redis://localhost:6379") | |
r.publish(channel="test_channel", message=json.dumps({"data": counter})) | |
counter += 1 | |
time.sleep(4) | |
# stream event with curl | |
# curl -N http://localhost:8001/sse/test_channel | |
# send event with python | |
# python app.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment