Skip to content

Instantly share code, notes, and snippets.

@cbonesana
Created September 9, 2025 13:15
Show Gist options
  • Save cbonesana/26fedaedb7188d8d58e4fdf1679aecd7 to your computer and use it in GitHub Desktop.
Save cbonesana/26fedaedb7188d8d58e4fdf1679aecd7 to your computer and use it in GitHub Desktop.
FastAPI streaming proxy example
"""Launch this script directly or launche the three components separately using uvicorn."""
from typing import AsyncGenerator, Any
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.requests import Request
from httpx import AsyncClient
from pydantic import BaseModel
import asyncio
import json
import uvicorn
class InData(BaseModel):
start: int
end: int
step: int = 1
sleep: float = 1.0
class RangeChunk(BaseModel):
i: int
n: int
PORT_GENERATOR: int = 8000
PORT_PROXY: int = 7000
app_gen = FastAPI()
@app_gen.post("/stream")
async def gen_stream(req: Request) -> StreamingResponse:
print("generator", req.headers)
chunks = []
async for chunk in req.stream():
print("generator", chunk)
chunks.append(chunk)
in_str = b"".join(chunks)
print("generator request", in_str)
in_data = InData(**json.loads(in_str))
print("generator request", in_data)
async def range_stream(data: InData) -> AsyncGenerator[bytes, Any]:
n = 0
for i in range(data.start, data.end, data.step):
obj = RangeChunk(i=i, n=n)
n += 1
print("generator", obj)
yield obj.model_dump_json().encode()
await asyncio.sleep(data.sleep)
return StreamingResponse(
range_stream(in_data),
)
app_proxy = FastAPI()
@app_proxy.post("/stream")
async def proxy_stream(req: Request) -> StreamingResponse:
print("proxy", req.headers)
# x = await req.body()
# print(len(x))
r_values = {"status_code": 200, "headers": {}}
async def request_stream() -> AsyncGenerator[bytes, None]:
print("entering request_stream()")
async for chunk in req.stream():
print("proxy request stream", chunk)
yield chunk
async def response_stream() -> AsyncGenerator[bytes, None]:
print("entering response_stream()")
headers = {k: v for k, v in req.headers.items() if k.lower != "host"}
headers["Transfer-Encoding"] = "chunked"
async with AsyncClient() as client:
async with client.stream(
"POST",
f"http://localhost:{PORT_GENERATOR}/stream",
content=request_stream(),
headers=headers,
timeout=None,
) as resp:
r_values["status_code"] = resp.status_code
r_values["headers"] = resp.headers
async for chunk in resp.aiter_bytes():
if await req.is_disconnected():
print("proxy response stream", "closed")
break
print("proxy response stream", chunk)
yield chunk
return StreamingResponse(
response_stream(),
r_values["status_code"],
r_values["headers"],
)
async def run(app: FastAPI, port: int) -> None:
config = uvicorn.Config(
app,
host="localhost",
port=port,
log_level="error",
)
server = uvicorn.Server(config)
await server.serve()
async def probe() -> None:
data = InData(
start=1,
end=10,
step=2,
sleep=0.0,
)
async def client_streamer() -> AsyncGenerator[bytes, None]:
d = data.model_dump_json().encode()
print("client sending", d)
yield d
async with AsyncClient() as client:
async with client.stream(
"POST",
f"http://localhost:{PORT_PROXY}/stream",
content=client_streamer(),
# content=data.model_dump_json(),
headers={
# "Content-Type": "application/json",
# "Transfer-Encoding": "chunked",
},
timeout=None,
) as r:
r.raise_for_status()
async for chunk in r.aiter_bytes():
print("client", chunk)
async def main() -> None:
task_gen = asyncio.create_task(run(app_gen, PORT_GENERATOR))
task_proxy = asyncio.create_task(run(app_proxy, PORT_PROXY))
await asyncio.sleep(2.0)
task_probe = asyncio.create_task(probe())
await task_probe
# task_gen.cancel()
# task_proxy.cancel()
await task_gen
await task_proxy
if __name__ == "__main__":
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
except Exception as _:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment