Created
December 14, 2023 12:56
-
-
Save WilliamStam/963dce6f4416d43c0f2efd5372838fc2 to your computer and use it in GitHub Desktop.
Fastapi event system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataclasses | |
import logging | |
from typing import Annotated, Callable, Dict, List | |
from fastapi import Depends, FastAPI | |
from fastapi.dependencies.models import Dependant | |
from fastapi.dependencies.utils import get_dependant, solve_dependencies | |
from starlette.requests import Request | |
from utilities.events import EventsCollection | |
logger = logging.getLogger(__name__) | |
class EventsCollection(): | |
def __init__(self): | |
self.events: Dict[str, List[Callable]] = {} | |
def register(self, event, listener): | |
event_name = f"{event.__module__}.{event.__name__}" | |
logger.warning(f"registering event [{event_name}] with listener [{str(listener)}]") | |
self.events.setdefault(event_name, []) | |
self.events[event_name].append(listener) | |
async def get_events(self, event): | |
event_name = f"{event.__module__}.{event.__class__.__name__}" | |
logger.warning(f"getting listeners for event [{event_name}]") | |
listeners = self.events.get(event_name, []) | |
return listeners | |
async def dispatch(self, event, request, **kwargs) -> None: | |
event_name = f"{event.__module__}.{event.__class__.__name__}" | |
logger.warning(f"dispatching event [{event_name}]") | |
listeners = self.events.get(event_name, None) | |
if listeners: | |
for listener in listeners: | |
logger.warning(f"found listener [{listener}]") | |
solved_result = await solve_dependencies( | |
request=request, | |
dependant=get_dependant(call=listener, path=request.url.path) | |
) | |
( | |
sub_values, | |
sub_errors, | |
background_tasks, | |
_, # the subdependency returns the same response we have | |
sub_dependency_cache, | |
) = solved_result | |
for key in sub_values.keys(): | |
if key in kwargs: | |
sub_values[key] = kwargs[key] | |
logger.warning(kwargs) | |
logger.warning(sub_values) | |
await listener(event, **sub_values) | |
events = EventsCollection() | |
# | |
# Events = Annotated[EventsCollection, Depends(events)] | |
@dataclasses.dataclass | |
class BasicEvent: | |
bird: str | |
owl: str | |
# want this to be looked up in the listener for the event | |
@dataclasses.dataclass | |
class ListenerDependency: | |
dog: str | |
cat: str | |
# defining the listeners dependency | |
async def basic_dependency(): | |
return ListenerDependency( | |
dog="woof", | |
cat="meow" | |
) | |
BasicDependency = Annotated[ListenerDependency, Depends(basic_dependency)] | |
# the listener | |
async def listener1(event): | |
print("LISTENER1 CALLED") | |
print(event) | |
async def listener2(event, dep: BasicDependency): | |
print("LISTENER2 CALLED") | |
print(event) | |
print(dep) | |
# can take any fastapi dependency | |
from dependencies.user import CurrentUser, get_token | |
async def listener3(event, user: CurrentUser, token = Depends(get_token)): | |
print("LISTENER3 CALLED") | |
print(event) | |
print(user) | |
print(token) | |
events.register(BasicEvent, listener1) | |
events.register(BasicEvent, listener2) | |
events.register(BasicEvent, listener3) | |
app = FastAPI() | |
class EventDependency: | |
def __init__(self, request: Request): | |
self.request = request | |
async def __call__(self, *args, **kwargs): | |
logger.debug("EventDependency __call__") | |
return events | |
async def dispatch(self, ev, **kwargs): | |
logger.debug("EventDependency dispatch") | |
await events.dispatch(ev, request=self.request, **kwargs) | |
Events = Annotated[EventDependency, Depends(EventDependency)] | |
@app.get("/") | |
async def testing(events: Events) -> str: | |
event = BasicEvent( | |
bird="squeak", | |
owl="hiss" | |
) | |
await events.dispatch(event) | |
# await event_dispatcher(event) | |
return "woof" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment