Created
February 21, 2026 10:58
-
-
Save Tishka17/043400e7c95e28ea7ef8395fbec8b449 to your computer and use it in GitHub Desktop.
diwire generated code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Generated DI resolver module. | |
| Generated by: diwire._internal.resolvers.assembly.renderer.ResolversAssemblyRenderer.get_providers_code | |
| diwire version used for generation: 1.0.1 | |
| Generation configuration: | |
| - root scope level: 1 | |
| - managed scopes: app:1, session:2, request:3, action:4, step:5 | |
| - graph has async specs: False | |
| - cleanup support enabled in graph: True | |
| - provider count: 1 | |
| - cached provider count: 1 | |
| - thread lock count: 1 | |
| - async lock count: 0 | |
| - provider slots: 1 | |
| Examples: | |
| >>> root = build_root_resolver(registrations) | |
| >>> service = root.resolve(SomeService) | |
| >>> async_service = await root.aresolve(SomeAsyncService) | |
| >>> request_scope = root.enter_scope() | |
| >>> scoped_service = request_scope.resolve(RequestScopedService) | |
| """ | |
| from __future__ import annotations | |
| import threading | |
| from contextlib import asynccontextmanager, contextmanager | |
| from types import TracebackType | |
| from typing import Any, NoReturn | |
| from diwire.exceptions import ( | |
| DIWireAsyncDependencyInSyncContextError, | |
| DIWireDependencyNotRegisteredError, | |
| DIWireScopeMismatchError, | |
| ) | |
| from diwire._internal.markers import ( | |
| is_async_provider_annotation, | |
| is_all_annotation, | |
| component_base_key, | |
| is_from_context_annotation, | |
| is_maybe_annotation, | |
| is_provider_annotation, | |
| strip_all_annotation, | |
| strip_from_context_annotation, | |
| strip_maybe_annotation, | |
| strip_provider_annotation, | |
| ) | |
| from diwire._internal.providers import ProvidersRegistrations | |
| _MISSING_RESOLVER: Any = object() | |
| _MISSING_CACHE: Any = object() | |
| _MISSING_PROVIDER: Any = object() | |
| _all_slots_by_key: dict[Any, tuple[int, ...]] = {} | |
| _dep_registered_keys: set[Any] = set() | |
| _scope_obj_1: Any = 1 | |
| _scope_obj_2: Any = 2 | |
| _scope_obj_3: Any = 3 | |
| _scope_obj_4: Any = 4 | |
| _scope_obj_5: Any = 5 | |
| _dep_1_type: Any = _MISSING_PROVIDER | |
| _provider_1: Any = _MISSING_PROVIDER | |
| _dep_1_thread_lock = threading.Lock() | |
| class RootResolver: | |
| """ | |
| Generated resolver for scope 'app' (level 1). | |
| This class is generated and optimized for direct slot-based dependency resolution. | |
| All visible provider slots: 1. | |
| Providers declared in this exact scope: none. | |
| """ | |
| __slots__ = ( | |
| "_root_resolver", | |
| "_context", | |
| "_parent_context_resolver", | |
| "_cleanup_enabled", | |
| "__dict__", | |
| "_cleanup_callbacks", | |
| "_owned_scope_resolvers", | |
| ) | |
| def __init__( | |
| self, | |
| cleanup_enabled: bool = True, | |
| context: Any | None = None, | |
| parent_context_resolver: Any = None, | |
| ) -> None: | |
| """ | |
| Initialize resolver state for the current scope. | |
| The constructor wires scope ancestry references, cache slots, and optional cleanup state. | |
| Root scope class: RootResolver. | |
| Stateless scope reuse enabled: False. | |
| """ | |
| self._cleanup_enabled = cleanup_enabled | |
| self._root_resolver = self | |
| self._context = context | |
| self._parent_context_resolver = parent_context_resolver | |
| self._cleanup_callbacks: list[tuple[int, Any]] = [] | |
| self._owned_scope_resolvers: tuple[Any, ...] = () | |
| def enter_scope( | |
| self, | |
| scope: Any | None = None, | |
| *, | |
| context: Any | None = None, | |
| ) -> RootResolver | _SessionResolver | _RequestResolver | _ActionResolver | _StepResolver: | |
| """ | |
| Open a deeper scope resolver from this resolver. | |
| Current scope: app:1. | |
| Allowed explicit transitions: session:2, request:3, action:4, step:5. | |
| Passing None follows the default transition for the scope graph. | |
| Optional context mapping is attached to the opened target scope. | |
| """ | |
| if scope is _scope_obj_3 or scope == 3: | |
| return _RequestResolver( | |
| self, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| ) | |
| if scope is None: | |
| return _RequestResolver( | |
| self, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| ) | |
| target_scope_level = scope | |
| if target_scope_level is _scope_obj_1 or target_scope_level == 1: | |
| return self | |
| if target_scope_level <= 1: | |
| msg = f"Cannot enter scope level {target_scope_level} from level 1." | |
| raise DIWireScopeMismatchError(msg) | |
| if target_scope_level is _scope_obj_2 or target_scope_level == 2: | |
| return _SessionResolver( | |
| self, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| ) | |
| if target_scope_level is _scope_obj_4 or target_scope_level == 4: | |
| request_resolver = _RequestResolver( | |
| self, | |
| self._cleanup_enabled, | |
| None, | |
| self, | |
| ) | |
| action_resolver = _ActionResolver( | |
| request_resolver._root_resolver, | |
| request_resolver._cleanup_enabled, | |
| context, | |
| request_resolver, | |
| request_resolver, | |
| ) | |
| action_resolver._owned_scope_resolvers = (request_resolver,) | |
| return action_resolver | |
| if target_scope_level is _scope_obj_5 or target_scope_level == 5: | |
| request_resolver = _RequestResolver( | |
| self, | |
| self._cleanup_enabled, | |
| None, | |
| self, | |
| ) | |
| action_resolver = _ActionResolver( | |
| request_resolver._root_resolver, | |
| request_resolver._cleanup_enabled, | |
| None, | |
| request_resolver, | |
| request_resolver, | |
| ) | |
| step_resolver = _StepResolver( | |
| action_resolver._root_resolver, | |
| action_resolver._cleanup_enabled, | |
| context, | |
| action_resolver, | |
| request_resolver, | |
| ) | |
| step_resolver._owned_scope_resolvers = (request_resolver, action_resolver) | |
| return step_resolver | |
| msg = f"Scope level {target_scope_level} is not a valid next transition from level 1." | |
| raise DIWireScopeMismatchError(msg) | |
| def resolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated synchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks to avoid reflective dispatch. | |
| if dependency is _dep_1_type: | |
| return RootResolver.resolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return self.resolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(self.resolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| async def aresolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated asynchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks for asynchronous resolution. | |
| if dependency is _dep_1_type: | |
| return await RootResolver.aresolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return await self.aresolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(await self.aresolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _resolve_from_context(self, key: Any) -> Any: | |
| context = self._context | |
| if context is not None and key in context: | |
| return context[key] | |
| parent_context_resolver = self._parent_context_resolver | |
| while parent_context_resolver is not None: | |
| parent_context = parent_context_resolver._context | |
| if parent_context is not None and key in parent_context: | |
| return parent_context[key] | |
| parent_context_resolver = parent_context_resolver._parent_context_resolver | |
| msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).") | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _is_registered_dependency(self, dependency: Any) -> bool: | |
| return dependency in _dep_registered_keys | |
| def __enter__(self) -> RootResolver: | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| msg = "Cannot execute async cleanup in sync context. Use 'async with'." | |
| raise DIWireAsyncDependencyInSyncContextError(msg) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| owned_scope_resolver.__exit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| def close( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return self.__exit__(exc_type, exc_value, traceback) | |
| async def __aenter__(self) -> RootResolver: | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| await cleanup(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| async def aclose( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return await self.__aexit__(exc_type, exc_value, traceback) | |
| def resolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (sync method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: RootResolver | |
| Dependency wiring: none | |
| Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope. | |
| """ | |
| msg = "Provider slot 1 requires opened scope level 3." | |
| raise DIWireScopeMismatchError(msg) | |
| async def aresolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (async method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: RootResolver | |
| Dependency wiring: none | |
| Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations. | |
| """ | |
| return self.resolve_1() | |
| class _SessionResolver: | |
| """ | |
| Generated resolver for scope 'session' (level 2). | |
| This class is generated and optimized for direct slot-based dependency resolution. | |
| All visible provider slots: 1. | |
| Providers declared in this exact scope: none. | |
| """ | |
| __slots__ = ( | |
| "_root_resolver", | |
| "_context", | |
| "_parent_context_resolver", | |
| "_cleanup_enabled", | |
| "_cleanup_callbacks", | |
| "_owned_scope_resolvers", | |
| ) | |
| def __init__( | |
| self, | |
| root_resolver: RootResolver, | |
| cleanup_enabled: bool = True, | |
| context: Any | None = None, | |
| parent_context_resolver: Any = None, | |
| ) -> None: | |
| """ | |
| Initialize resolver state for the current scope. | |
| The constructor wires scope ancestry references, cache slots, and optional cleanup state. | |
| Root scope class: RootResolver. | |
| Stateless scope reuse enabled: False. | |
| """ | |
| self._cleanup_enabled = cleanup_enabled | |
| self._root_resolver = root_resolver | |
| self._context = context | |
| self._parent_context_resolver = parent_context_resolver | |
| self._cleanup_callbacks: list[tuple[int, Any]] = [] | |
| self._owned_scope_resolvers: tuple[Any, ...] = () | |
| def enter_scope( | |
| self, | |
| scope: Any | None = None, | |
| *, | |
| context: Any | None = None, | |
| ) -> _SessionResolver | _RequestResolver | _ActionResolver | _StepResolver: | |
| """ | |
| Open a deeper scope resolver from this resolver. | |
| Current scope: session:2. | |
| Allowed explicit transitions: request:3, action:4, step:5. | |
| Passing None follows the default transition for the scope graph. | |
| Optional context mapping is attached to the opened target scope. | |
| """ | |
| if scope is _scope_obj_3 or scope == 3: | |
| return _RequestResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| ) | |
| if scope is None: | |
| return _RequestResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| ) | |
| target_scope_level = scope | |
| if target_scope_level is _scope_obj_2 or target_scope_level == 2: | |
| return self | |
| if target_scope_level <= 2: | |
| msg = f"Cannot enter scope level {target_scope_level} from level 2." | |
| raise DIWireScopeMismatchError(msg) | |
| if target_scope_level is _scope_obj_4 or target_scope_level == 4: | |
| request_resolver = _RequestResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| None, | |
| self, | |
| ) | |
| action_resolver = _ActionResolver( | |
| request_resolver._root_resolver, | |
| request_resolver._cleanup_enabled, | |
| context, | |
| request_resolver, | |
| request_resolver, | |
| ) | |
| action_resolver._owned_scope_resolvers = (request_resolver,) | |
| return action_resolver | |
| if target_scope_level is _scope_obj_5 or target_scope_level == 5: | |
| request_resolver = _RequestResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| None, | |
| self, | |
| ) | |
| action_resolver = _ActionResolver( | |
| request_resolver._root_resolver, | |
| request_resolver._cleanup_enabled, | |
| None, | |
| request_resolver, | |
| request_resolver, | |
| ) | |
| step_resolver = _StepResolver( | |
| action_resolver._root_resolver, | |
| action_resolver._cleanup_enabled, | |
| context, | |
| action_resolver, | |
| request_resolver, | |
| ) | |
| step_resolver._owned_scope_resolvers = (request_resolver, action_resolver) | |
| return step_resolver | |
| msg = f"Scope level {target_scope_level} is not a valid next transition from level 2." | |
| raise DIWireScopeMismatchError(msg) | |
| def resolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated synchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks to avoid reflective dispatch. | |
| if dependency is _dep_1_type: | |
| return _SessionResolver.resolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return self.resolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(self.resolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| async def aresolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated asynchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks for asynchronous resolution. | |
| if dependency is _dep_1_type: | |
| return await _SessionResolver.aresolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return await self.aresolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(await self.aresolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _resolve_from_context(self, key: Any) -> Any: | |
| context = self._context | |
| if context is not None and key in context: | |
| return context[key] | |
| parent_context_resolver = self._parent_context_resolver | |
| while parent_context_resolver is not None: | |
| parent_context = parent_context_resolver._context | |
| if parent_context is not None and key in parent_context: | |
| return parent_context[key] | |
| parent_context_resolver = parent_context_resolver._parent_context_resolver | |
| msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).") | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _is_registered_dependency(self, dependency: Any) -> bool: | |
| return dependency in _dep_registered_keys | |
| def __enter__(self) -> _SessionResolver: | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| msg = "Cannot execute async cleanup in sync context. Use 'async with'." | |
| raise DIWireAsyncDependencyInSyncContextError(msg) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| owned_scope_resolver.__exit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| def close( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return self.__exit__(exc_type, exc_value, traceback) | |
| async def __aenter__(self) -> _SessionResolver: | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| await cleanup(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| async def aclose( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return await self.__aexit__(exc_type, exc_value, traceback) | |
| def resolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (sync method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _SessionResolver | |
| Dependency wiring: none | |
| Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope. | |
| """ | |
| msg = "Provider slot 1 requires opened scope level 3." | |
| raise DIWireScopeMismatchError(msg) | |
| async def aresolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (async method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _SessionResolver | |
| Dependency wiring: none | |
| Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations. | |
| """ | |
| return self.resolve_1() | |
| class _RequestResolver: | |
| """ | |
| Generated resolver for scope 'request' (level 3). | |
| This class is generated and optimized for direct slot-based dependency resolution. | |
| All visible provider slots: 1. | |
| Providers declared in this exact scope: 1. | |
| """ | |
| __slots__ = ( | |
| "_root_resolver", | |
| "_context", | |
| "_parent_context_resolver", | |
| "_cleanup_enabled", | |
| "_cleanup_callbacks", | |
| "_owned_scope_resolvers", | |
| "_cache_1", | |
| ) | |
| def __init__( | |
| self, | |
| root_resolver: RootResolver, | |
| cleanup_enabled: bool = True, | |
| context: Any | None = None, | |
| parent_context_resolver: Any = None, | |
| ) -> None: | |
| """ | |
| Initialize resolver state for the current scope. | |
| The constructor wires scope ancestry references, cache slots, and optional cleanup state. | |
| Root scope class: RootResolver. | |
| Stateless scope reuse enabled: False. | |
| """ | |
| self._cleanup_enabled = cleanup_enabled | |
| self._root_resolver = root_resolver | |
| self._context = context | |
| self._parent_context_resolver = parent_context_resolver | |
| self._cleanup_callbacks: list[tuple[int, Any]] = [] | |
| self._owned_scope_resolvers: tuple[Any, ...] = () | |
| self._cache_1 = _MISSING_CACHE | |
| def enter_scope( | |
| self, | |
| scope: Any | None = None, | |
| *, | |
| context: Any | None = None, | |
| ) -> _RequestResolver | _ActionResolver | _StepResolver: | |
| """ | |
| Open a deeper scope resolver from this resolver. | |
| Current scope: request:3. | |
| Allowed explicit transitions: action:4, step:5. | |
| Passing None follows the default transition for the scope graph. | |
| Optional context mapping is attached to the opened target scope. | |
| """ | |
| if scope is _scope_obj_4 or scope == 4: | |
| return _ActionResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| self, | |
| ) | |
| if scope is None: | |
| return _ActionResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| self, | |
| ) | |
| target_scope_level = scope | |
| if target_scope_level is _scope_obj_3 or target_scope_level == 3: | |
| return self | |
| if target_scope_level <= 3: | |
| msg = f"Cannot enter scope level {target_scope_level} from level 3." | |
| raise DIWireScopeMismatchError(msg) | |
| if target_scope_level is _scope_obj_5 or target_scope_level == 5: | |
| action_resolver = _ActionResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| None, | |
| self, | |
| self, | |
| ) | |
| step_resolver = _StepResolver( | |
| action_resolver._root_resolver, | |
| action_resolver._cleanup_enabled, | |
| context, | |
| action_resolver, | |
| action_resolver._request_resolver, | |
| ) | |
| step_resolver._owned_scope_resolvers = (action_resolver,) | |
| return step_resolver | |
| msg = f"Scope level {target_scope_level} is not a valid next transition from level 3." | |
| raise DIWireScopeMismatchError(msg) | |
| def resolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated synchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks to avoid reflective dispatch. | |
| if dependency is _dep_1_type: | |
| return _RequestResolver.resolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return self.resolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(self.resolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| async def aresolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated asynchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks for asynchronous resolution. | |
| if dependency is _dep_1_type: | |
| return await _RequestResolver.aresolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return await self.aresolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(await self.aresolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _resolve_from_context(self, key: Any) -> Any: | |
| context = self._context | |
| if context is not None and key in context: | |
| return context[key] | |
| parent_context_resolver = self._parent_context_resolver | |
| while parent_context_resolver is not None: | |
| parent_context = parent_context_resolver._context | |
| if parent_context is not None and key in parent_context: | |
| return parent_context[key] | |
| parent_context_resolver = parent_context_resolver._parent_context_resolver | |
| msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).") | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _is_registered_dependency(self, dependency: Any) -> bool: | |
| return dependency in _dep_registered_keys | |
| def __enter__(self) -> _RequestResolver: | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| msg = "Cannot execute async cleanup in sync context. Use 'async with'." | |
| raise DIWireAsyncDependencyInSyncContextError(msg) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| owned_scope_resolver.__exit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| def close( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return self.__exit__(exc_type, exc_value, traceback) | |
| async def __aenter__(self) -> _RequestResolver: | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| await cleanup(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| async def aclose( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return await self.__aexit__(exc_type, exc_value, traceback) | |
| def resolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (sync method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _RequestResolver | |
| Dependency wiring: none | |
| Behavior: Builds the provider value in this resolver, enforcing scope guards and cache policy. | |
| """ | |
| provider_scope_resolver = self | |
| cached_value = self._cache_1 | |
| if cached_value is not _MISSING_CACHE: | |
| return cached_value | |
| with _dep_1_thread_lock: | |
| if (cached_value := self._cache_1) is not _MISSING_CACHE: | |
| return cached_value | |
| if self._cleanup_enabled: | |
| _provider_cm = contextmanager(_provider_1)() | |
| value = _provider_cm.__enter__() | |
| provider_scope_resolver._cleanup_callbacks.append((0, _provider_cm.__exit__)) | |
| else: | |
| _provider_gen = _provider_1() | |
| value = next(_provider_gen) | |
| self._cache_1 = value | |
| return value | |
| async def aresolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (async method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _RequestResolver | |
| Dependency wiring: none | |
| Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations. | |
| """ | |
| return self.resolve_1() | |
| class _ActionResolver: | |
| """ | |
| Generated resolver for scope 'action' (level 4). | |
| This class is generated and optimized for direct slot-based dependency resolution. | |
| All visible provider slots: 1. | |
| Providers declared in this exact scope: none. | |
| """ | |
| __slots__ = ( | |
| "_root_resolver", | |
| "_context", | |
| "_parent_context_resolver", | |
| "_cleanup_enabled", | |
| "_cleanup_callbacks", | |
| "_owned_scope_resolvers", | |
| "_request_resolver", | |
| ) | |
| def __init__( | |
| self, | |
| root_resolver: RootResolver, | |
| cleanup_enabled: bool = True, | |
| context: Any | None = None, | |
| parent_context_resolver: Any = None, | |
| request_resolver: Any = _MISSING_RESOLVER, | |
| ) -> None: | |
| """ | |
| Initialize resolver state for the current scope. | |
| The constructor wires scope ancestry references, cache slots, and optional cleanup state. | |
| Root scope class: RootResolver. | |
| Stateless scope reuse enabled: False. | |
| """ | |
| self._cleanup_enabled = cleanup_enabled | |
| self._root_resolver = root_resolver | |
| self._context = context | |
| self._parent_context_resolver = parent_context_resolver | |
| self._cleanup_callbacks: list[tuple[int, Any]] = [] | |
| self._owned_scope_resolvers: tuple[Any, ...] = () | |
| self._request_resolver = request_resolver | |
| def enter_scope( | |
| self, | |
| scope: Any | None = None, | |
| *, | |
| context: Any | None = None, | |
| ) -> _ActionResolver | _StepResolver: | |
| """ | |
| Open a deeper scope resolver from this resolver. | |
| Current scope: action:4. | |
| Allowed explicit transitions: step:5. | |
| Passing None follows the default transition for the scope graph. | |
| Optional context mapping is attached to the opened target scope. | |
| """ | |
| if scope is _scope_obj_5 or scope == 5: | |
| return _StepResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| self._request_resolver, | |
| ) | |
| if scope is None: | |
| return _StepResolver( | |
| self._root_resolver, | |
| self._cleanup_enabled, | |
| context, | |
| self, | |
| self._request_resolver, | |
| ) | |
| target_scope_level = scope | |
| if target_scope_level is _scope_obj_4 or target_scope_level == 4: | |
| return self | |
| if target_scope_level <= 4: | |
| msg = f"Cannot enter scope level {target_scope_level} from level 4." | |
| raise DIWireScopeMismatchError(msg) | |
| msg = f"Scope level {target_scope_level} is not a valid next transition from level 4." | |
| raise DIWireScopeMismatchError(msg) | |
| def resolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated synchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks to avoid reflective dispatch. | |
| if dependency is _dep_1_type: | |
| return _ActionResolver.resolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return self.resolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(self.resolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| async def aresolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated asynchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks for asynchronous resolution. | |
| if dependency is _dep_1_type: | |
| return await _ActionResolver.aresolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return await self.aresolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(await self.aresolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _resolve_from_context(self, key: Any) -> Any: | |
| context = self._context | |
| if context is not None and key in context: | |
| return context[key] | |
| parent_context_resolver = self._parent_context_resolver | |
| while parent_context_resolver is not None: | |
| parent_context = parent_context_resolver._context | |
| if parent_context is not None and key in parent_context: | |
| return parent_context[key] | |
| parent_context_resolver = parent_context_resolver._parent_context_resolver | |
| msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).") | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _is_registered_dependency(self, dependency: Any) -> bool: | |
| return dependency in _dep_registered_keys | |
| def __enter__(self) -> _ActionResolver: | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| msg = "Cannot execute async cleanup in sync context. Use 'async with'." | |
| raise DIWireAsyncDependencyInSyncContextError(msg) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| owned_scope_resolver.__exit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| def close( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return self.__exit__(exc_type, exc_value, traceback) | |
| async def __aenter__(self) -> _ActionResolver: | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| await cleanup(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| async def aclose( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return await self.__aexit__(exc_type, exc_value, traceback) | |
| def resolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (sync method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _ActionResolver | |
| Dependency wiring: none | |
| Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers. | |
| """ | |
| owner_resolver = self._request_resolver | |
| if owner_resolver is _MISSING_RESOLVER: | |
| msg = "Provider slot 1 requires opened scope level 3." | |
| raise DIWireScopeMismatchError(msg) | |
| return owner_resolver.resolve_1() | |
| async def aresolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (async method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _ActionResolver | |
| Dependency wiring: none | |
| Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations. | |
| """ | |
| return self.resolve_1() | |
| class _StepResolver: | |
| """ | |
| Generated resolver for scope 'step' (level 5). | |
| This class is generated and optimized for direct slot-based dependency resolution. | |
| All visible provider slots: 1. | |
| Providers declared in this exact scope: none. | |
| """ | |
| __slots__ = ( | |
| "_root_resolver", | |
| "_context", | |
| "_parent_context_resolver", | |
| "_cleanup_enabled", | |
| "_cleanup_callbacks", | |
| "_owned_scope_resolvers", | |
| "_request_resolver", | |
| ) | |
| def __init__( | |
| self, | |
| root_resolver: RootResolver, | |
| cleanup_enabled: bool = True, | |
| context: Any | None = None, | |
| parent_context_resolver: Any = None, | |
| request_resolver: Any = _MISSING_RESOLVER, | |
| ) -> None: | |
| """ | |
| Initialize resolver state for the current scope. | |
| The constructor wires scope ancestry references, cache slots, and optional cleanup state. | |
| Root scope class: RootResolver. | |
| Stateless scope reuse enabled: False. | |
| """ | |
| self._cleanup_enabled = cleanup_enabled | |
| self._root_resolver = root_resolver | |
| self._context = context | |
| self._parent_context_resolver = parent_context_resolver | |
| self._cleanup_callbacks: list[tuple[int, Any]] = [] | |
| self._owned_scope_resolvers: tuple[Any, ...] = () | |
| self._request_resolver = request_resolver | |
| def enter_scope( | |
| self, | |
| scope: Any | None = None, | |
| *, | |
| context: Any | None = None, | |
| ) -> NoReturn: | |
| """ | |
| Open a deeper scope resolver from this resolver. | |
| Current scope: step:5. | |
| Allowed explicit transitions: none. | |
| Passing None follows the default transition for the scope graph. | |
| Optional context mapping is attached to the opened target scope. | |
| """ | |
| msg = "Cannot enter deeper scope from level 5." | |
| raise DIWireScopeMismatchError(msg) | |
| def resolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated synchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks to avoid reflective dispatch. | |
| if dependency is _dep_1_type: | |
| return _StepResolver.resolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return self.resolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(self.resolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| async def aresolve(self, dependency: Any) -> Any: | |
| """ | |
| Route a dependency token to a generated asynchronous provider resolver method. | |
| Known provider slots: 1. | |
| Dispatch uses identity checks against module-level `_dep_<slot>_type` globals. | |
| """ | |
| # Fast path identity checks for asynchronous resolution. | |
| if dependency is _dep_1_type: | |
| return await _StepResolver.aresolve_1(self) | |
| if is_maybe_annotation(dependency): | |
| inner = strip_maybe_annotation(dependency) | |
| if is_provider_annotation(inner): | |
| provider_inner = strip_provider_annotation(inner) | |
| if is_async_provider_annotation(inner): | |
| return lambda: self.aresolve(provider_inner) | |
| return lambda: self.resolve(provider_inner) | |
| if is_from_context_annotation(inner): | |
| key = strip_from_context_annotation(inner) | |
| try: | |
| return self._resolve_from_context(key) | |
| except DIWireDependencyNotRegisteredError: | |
| return None | |
| if not self._is_registered_dependency(inner): | |
| return None | |
| return await self.aresolve(inner) | |
| if is_provider_annotation(dependency): | |
| inner = strip_provider_annotation(dependency) | |
| if is_async_provider_annotation(dependency): | |
| return lambda: self.aresolve(inner) | |
| return lambda: self.resolve(inner) | |
| if is_from_context_annotation(dependency): | |
| key = strip_from_context_annotation(dependency) | |
| return self._resolve_from_context(key) | |
| if is_all_annotation(dependency): | |
| inner = strip_all_annotation(dependency) | |
| slots = _all_slots_by_key.get(inner, ()) | |
| if not slots: | |
| return () | |
| results: list[Any] = [] | |
| for slot in slots: | |
| if slot == 1: | |
| results.append(await self.aresolve_1()) | |
| continue | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| return tuple(results) | |
| # Any dependency not pre-bound in build_root_resolver is unknown here. | |
| msg = f"Dependency {dependency!r} is not registered." | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _resolve_from_context(self, key: Any) -> Any: | |
| context = self._context | |
| if context is not None and key in context: | |
| return context[key] | |
| parent_context_resolver = self._parent_context_resolver | |
| while parent_context_resolver is not None: | |
| parent_context = parent_context_resolver._context | |
| if parent_context is not None and key in parent_context: | |
| return parent_context[key] | |
| parent_context_resolver = parent_context_resolver._parent_context_resolver | |
| msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).") | |
| raise DIWireDependencyNotRegisteredError(msg) | |
| def _is_registered_dependency(self, dependency: Any) -> bool: | |
| return dependency in _dep_registered_keys | |
| def __enter__(self) -> _StepResolver: | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| msg = "Cannot execute async cleanup in sync context. Use 'async with'." | |
| raise DIWireAsyncDependencyInSyncContextError(msg) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| owned_scope_resolver.__exit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| def close( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return self.__exit__(exc_type, exc_value, traceback) | |
| async def __aenter__(self) -> _StepResolver: | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| traceback: TracebackType | None, | |
| ) -> None: | |
| cleanup_error: BaseException | None = None | |
| while self._cleanup_callbacks: | |
| cleanup_kind, cleanup = self._cleanup_callbacks.pop() | |
| try: | |
| if cleanup_kind == 0: | |
| cleanup(exc_type, exc_value, traceback) | |
| else: | |
| await cleanup(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if self._owned_scope_resolvers: | |
| for owned_scope_resolver in reversed(self._owned_scope_resolvers): | |
| try: | |
| await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback) | |
| except BaseException as error: | |
| if exc_type is None and cleanup_error is None: | |
| cleanup_error = error | |
| if exc_type is None and cleanup_error is not None: | |
| raise cleanup_error | |
| return None | |
| async def aclose( | |
| self, | |
| exc_type: type[BaseException] | None = None, | |
| exc_value: BaseException | None = None, | |
| traceback: TracebackType | None = None, | |
| ) -> None: | |
| return await self.__aexit__(exc_type, exc_value, traceback) | |
| def resolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (sync method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _StepResolver | |
| Dependency wiring: none | |
| Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers. | |
| """ | |
| owner_resolver = self._request_resolver | |
| if owner_resolver is _MISSING_RESOLVER: | |
| msg = "Provider slot 1 requires opened scope level 3." | |
| raise DIWireScopeMismatchError(msg) | |
| return owner_resolver.resolve_1() | |
| async def aresolve_1(self) -> Any: | |
| """ | |
| Provider slot 1 resolver (async method). | |
| Returns: __main__.Session | |
| Provider spec kind: generator | |
| Provider target: __main__.session_factory | |
| Declared scope: request (level 3) | |
| Declared lifetime: scoped | |
| Cache policy: cached | |
| Cache owner scope: 3 | |
| Declared lock mode: auto | |
| Effective lock mode: thread | |
| Sync path thread lock: True | |
| Async path async lock: False | |
| Provider declared async: False | |
| Graph requires async: False | |
| Cleanup callbacks required: True | |
| Resolver class handling this call: _StepResolver | |
| Dependency wiring: none | |
| Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations. | |
| """ | |
| return self.resolve_1() | |
| def build_root_resolver( | |
| registrations: ProvidersRegistrations, | |
| *, | |
| cleanup_enabled: bool = True, | |
| ) -> RootResolver: | |
| """ | |
| Build and return the generated root resolver instance. | |
| This function rebinds module-level provider globals for the supplied registrations. | |
| Global rebinding makes `resolve_<slot>` methods run without registration lookups. | |
| Provider slots configured during bootstrap: 1. | |
| Root resolver class: RootResolver. | |
| Examples: | |
| >>> root = build_root_resolver(registrations) | |
| >>> root.resolve(SomeService) | |
| >>> await root.aresolve(SomeAsyncService) | |
| >>> scoped = root.enter_scope() | |
| """ | |
| # Bind module-level globals to this container registration snapshot. | |
| # This keeps hot paths in resolver methods free from dictionary lookups. | |
| global _all_slots_by_key | |
| global _dep_registered_keys | |
| # Rebuild All[...] indexes for collect-all dependency dispatch. | |
| _all_slots_by_key = {} | |
| _dep_registered_keys = set() | |
| all_slots_by_key: dict[Any, list[int]] = {} | |
| global _dep_1_type, _provider_1 | |
| # --- Provider slot 1 bootstrap metadata --- | |
| # Read provider spec by stable slot id from registrations. | |
| registration_1 = registrations.get_by_slot(1) | |
| # Capture dependency identity token used by `resolve`/`aresolve` dispatch. | |
| _dep_1_type = registration_1.provides | |
| # Track dependency keys that are directly registered in this compiled graph. | |
| _dep_registered_keys.add(_dep_1_type) | |
| # Capture provider object (instance/type/factory/generator/context manager). | |
| _provider_1 = registration_1.generator | |
| # Index provider slot for All[...] dependency dispatch. | |
| component_base_1 = component_base_key(_dep_1_type) | |
| base_key_1 = component_base_1 | |
| if base_key_1 is None and not hasattr(_dep_1_type, '__metadata__'): | |
| base_key_1 = _dep_1_type | |
| if base_key_1 is not None: | |
| all_slots_by_key.setdefault(base_key_1, []).append(1) | |
| # Freeze All[...] indexes for stable repr and to discourage mutation. | |
| _all_slots_by_key = { | |
| key: tuple(slots) for key, slots in all_slots_by_key.items() | |
| } | |
| # Construct a fresh root resolver configured with optional cleanup callbacks. | |
| return RootResolver(cleanup_enabled) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections.abc import Generator | |
| from diwire import Container, Lifetime, Scope | |
| class Session: | |
| def __init__(self) -> None: | |
| self.closed = False | |
| def close(self) -> None: | |
| self.closed = True | |
| def session_factory() -> Generator[Session, None, None]: | |
| session = Session() | |
| try: | |
| yield session | |
| finally: | |
| session.close() | |
| class Foo: | |
| def __init__(self, session: Session) -> None: | |
| self.session = session | |
| container = Container( | |
| use_resolver_context=False,) | |
| container.add_generator( | |
| session_factory, | |
| provides=Session, | |
| scope=Scope.REQUEST, | |
| lifetime=Lifetime.SCOPED, | |
| ) | |
| with container.enter_scope() as request_scope: | |
| session1 = request_scope.resolve(Session) | |
| with container.enter_scope() as request_scope2: | |
| session2 = request_scope2.resolve(Session) | |
| assert session2 is not session1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment