Created
September 9, 2025 13:15
-
-
Save cbonesana/26fedaedb7188d8d58e4fdf1679aecd7 to your computer and use it in GitHub Desktop.
FastAPI streaming proxy example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Launch this script directly or launche the three components separately using uvicorn.""" | |
| from typing import AsyncGenerator, Any | |
| from fastapi import FastAPI | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.requests import Request | |
| from httpx import AsyncClient | |
| from pydantic import BaseModel | |
| import asyncio | |
| import json | |
| import uvicorn | |
| class InData(BaseModel): | |
| start: int | |
| end: int | |
| step: int = 1 | |
| sleep: float = 1.0 | |
| class RangeChunk(BaseModel): | |
| i: int | |
| n: int | |
| PORT_GENERATOR: int = 8000 | |
| PORT_PROXY: int = 7000 | |
| app_gen = FastAPI() | |
| @app_gen.post("/stream") | |
| async def gen_stream(req: Request) -> StreamingResponse: | |
| print("generator", req.headers) | |
| chunks = [] | |
| async for chunk in req.stream(): | |
| print("generator", chunk) | |
| chunks.append(chunk) | |
| in_str = b"".join(chunks) | |
| print("generator request", in_str) | |
| in_data = InData(**json.loads(in_str)) | |
| print("generator request", in_data) | |
| async def range_stream(data: InData) -> AsyncGenerator[bytes, Any]: | |
| n = 0 | |
| for i in range(data.start, data.end, data.step): | |
| obj = RangeChunk(i=i, n=n) | |
| n += 1 | |
| print("generator", obj) | |
| yield obj.model_dump_json().encode() | |
| await asyncio.sleep(data.sleep) | |
| return StreamingResponse( | |
| range_stream(in_data), | |
| ) | |
| app_proxy = FastAPI() | |
| @app_proxy.post("/stream") | |
| async def proxy_stream(req: Request) -> StreamingResponse: | |
| print("proxy", req.headers) | |
| # x = await req.body() | |
| # print(len(x)) | |
| r_values = {"status_code": 200, "headers": {}} | |
| async def request_stream() -> AsyncGenerator[bytes, None]: | |
| print("entering request_stream()") | |
| async for chunk in req.stream(): | |
| print("proxy request stream", chunk) | |
| yield chunk | |
| async def response_stream() -> AsyncGenerator[bytes, None]: | |
| print("entering response_stream()") | |
| headers = {k: v for k, v in req.headers.items() if k.lower != "host"} | |
| headers["Transfer-Encoding"] = "chunked" | |
| async with AsyncClient() as client: | |
| async with client.stream( | |
| "POST", | |
| f"http://localhost:{PORT_GENERATOR}/stream", | |
| content=request_stream(), | |
| headers=headers, | |
| timeout=None, | |
| ) as resp: | |
| r_values["status_code"] = resp.status_code | |
| r_values["headers"] = resp.headers | |
| async for chunk in resp.aiter_bytes(): | |
| if await req.is_disconnected(): | |
| print("proxy response stream", "closed") | |
| break | |
| print("proxy response stream", chunk) | |
| yield chunk | |
| return StreamingResponse( | |
| response_stream(), | |
| r_values["status_code"], | |
| r_values["headers"], | |
| ) | |
| async def run(app: FastAPI, port: int) -> None: | |
| config = uvicorn.Config( | |
| app, | |
| host="localhost", | |
| port=port, | |
| log_level="error", | |
| ) | |
| server = uvicorn.Server(config) | |
| await server.serve() | |
| async def probe() -> None: | |
| data = InData( | |
| start=1, | |
| end=10, | |
| step=2, | |
| sleep=0.0, | |
| ) | |
| async def client_streamer() -> AsyncGenerator[bytes, None]: | |
| d = data.model_dump_json().encode() | |
| print("client sending", d) | |
| yield d | |
| async with AsyncClient() as client: | |
| async with client.stream( | |
| "POST", | |
| f"http://localhost:{PORT_PROXY}/stream", | |
| content=client_streamer(), | |
| # content=data.model_dump_json(), | |
| headers={ | |
| # "Content-Type": "application/json", | |
| # "Transfer-Encoding": "chunked", | |
| }, | |
| timeout=None, | |
| ) as r: | |
| r.raise_for_status() | |
| async for chunk in r.aiter_bytes(): | |
| print("client", chunk) | |
| async def main() -> None: | |
| task_gen = asyncio.create_task(run(app_gen, PORT_GENERATOR)) | |
| task_proxy = asyncio.create_task(run(app_proxy, PORT_PROXY)) | |
| await asyncio.sleep(2.0) | |
| task_probe = asyncio.create_task(probe()) | |
| await task_probe | |
| # task_gen.cancel() | |
| # task_proxy.cancel() | |
| await task_gen | |
| await task_proxy | |
| if __name__ == "__main__": | |
| try: | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(main()) | |
| except Exception as _: | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment