Skip to content

Instantly share code, notes, and snippets.

@draincoder
Created November 13, 2024 16:59
Show Gist options
  • Save draincoder/35f6a0298fa2c4d6241b678bc2d6b7b5 to your computer and use it in GitHub Desktop.
Save draincoder/35f6a0298fa2c4d6241b678bc2d6b7b5 to your computer and use it in GitHub Desktop.
FastAPI + FastStream tracing
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from faststream.redis import RedisMessage
from faststream.redis.fastapi import RedisRouter
from faststream.redis.opentelemetry import RedisTelemetryMiddleware
router = RedisRouter()
@router.get("/export/{market}")
async def market_handler(market: str) -> str:
response: RedisMessage = await router.broker.request(
{"market": market},
channel="export",
timeout=40.0,
)
return response.body.decode()
def create_app() -> FastAPI:
resource = Resource.create(attributes={"service.name": "fastapi"})
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://127.0.0.1:4317")))
app = FastAPI(debug=True)
app.include_router(router)
FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)
router.broker.add_middleware(RedisTelemetryMiddleware(tracer_provider=tracer_provider))
return app
if __name__ == "__main__":
uvicorn.run(create_app(), host="127.0.0.1", port=8000)
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from faststream import FastStream
from faststream.redis import RedisBroker, RedisMessage
from faststream.redis.opentelemetry import RedisTelemetryMiddleware
def get_tracer_provider() -> TracerProvider:
resource = Resource.create(attributes={"service.name": "faststream"})
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://127.0.0.1:4317")))
return tracer_provider
broker = RedisBroker(middlewares=(RedisTelemetryMiddleware(tracer_provider=get_tracer_provider()),))
app = FastStream(broker)
@broker.subscriber(channel="export")
async def market_handler(msg: RedisMessage) -> str:
return f"exported {json.loads(msg.body)['market']}"
if __name__ == "__main__":
asyncio.run(app.run())
@draincoder
Copy link
Author

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment