Skip to content

Instantly share code, notes, and snippets.

@Tishka17
Created February 21, 2026 10:58
Show Gist options
  • Select an option

  • Save Tishka17/043400e7c95e28ea7ef8395fbec8b449 to your computer and use it in GitHub Desktop.

Select an option

Save Tishka17/043400e7c95e28ea7ef8395fbec8b449 to your computer and use it in GitHub Desktop.
diwire generated code
"""
Generated DI resolver module.
Generated by: diwire._internal.resolvers.assembly.renderer.ResolversAssemblyRenderer.get_providers_code
diwire version used for generation: 1.0.1
Generation configuration:
- root scope level: 1
- managed scopes: app:1, session:2, request:3, action:4, step:5
- graph has async specs: False
- cleanup support enabled in graph: True
- provider count: 1
- cached provider count: 1
- thread lock count: 1
- async lock count: 0
- provider slots: 1
Examples:
>>> root = build_root_resolver(registrations)
>>> service = root.resolve(SomeService)
>>> async_service = await root.aresolve(SomeAsyncService)
>>> request_scope = root.enter_scope()
>>> scoped_service = request_scope.resolve(RequestScopedService)
"""
from __future__ import annotations
import threading
from contextlib import asynccontextmanager, contextmanager
from types import TracebackType
from typing import Any, NoReturn
from diwire.exceptions import (
DIWireAsyncDependencyInSyncContextError,
DIWireDependencyNotRegisteredError,
DIWireScopeMismatchError,
)
from diwire._internal.markers import (
is_async_provider_annotation,
is_all_annotation,
component_base_key,
is_from_context_annotation,
is_maybe_annotation,
is_provider_annotation,
strip_all_annotation,
strip_from_context_annotation,
strip_maybe_annotation,
strip_provider_annotation,
)
from diwire._internal.providers import ProvidersRegistrations
_MISSING_RESOLVER: Any = object()
_MISSING_CACHE: Any = object()
_MISSING_PROVIDER: Any = object()
_all_slots_by_key: dict[Any, tuple[int, ...]] = {}
_dep_registered_keys: set[Any] = set()
_scope_obj_1: Any = 1
_scope_obj_2: Any = 2
_scope_obj_3: Any = 3
_scope_obj_4: Any = 4
_scope_obj_5: Any = 5
_dep_1_type: Any = _MISSING_PROVIDER
_provider_1: Any = _MISSING_PROVIDER
_dep_1_thread_lock = threading.Lock()
class RootResolver:
"""
Generated resolver for scope 'app' (level 1).
This class is generated and optimized for direct slot-based dependency resolution.
All visible provider slots: 1.
Providers declared in this exact scope: none.
"""
__slots__ = (
"_root_resolver",
"_context",
"_parent_context_resolver",
"_cleanup_enabled",
"__dict__",
"_cleanup_callbacks",
"_owned_scope_resolvers",
)
def __init__(
self,
cleanup_enabled: bool = True,
context: Any | None = None,
parent_context_resolver: Any = None,
) -> None:
"""
Initialize resolver state for the current scope.
The constructor wires scope ancestry references, cache slots, and optional cleanup state.
Root scope class: RootResolver.
Stateless scope reuse enabled: False.
"""
self._cleanup_enabled = cleanup_enabled
self._root_resolver = self
self._context = context
self._parent_context_resolver = parent_context_resolver
self._cleanup_callbacks: list[tuple[int, Any]] = []
self._owned_scope_resolvers: tuple[Any, ...] = ()
def enter_scope(
self,
scope: Any | None = None,
*,
context: Any | None = None,
) -> RootResolver | _SessionResolver | _RequestResolver | _ActionResolver | _StepResolver:
"""
Open a deeper scope resolver from this resolver.
Current scope: app:1.
Allowed explicit transitions: session:2, request:3, action:4, step:5.
Passing None follows the default transition for the scope graph.
Optional context mapping is attached to the opened target scope.
"""
if scope is _scope_obj_3 or scope == 3:
return _RequestResolver(
self,
self._cleanup_enabled,
context,
self,
)
if scope is None:
return _RequestResolver(
self,
self._cleanup_enabled,
context,
self,
)
target_scope_level = scope
if target_scope_level is _scope_obj_1 or target_scope_level == 1:
return self
if target_scope_level <= 1:
msg = f"Cannot enter scope level {target_scope_level} from level 1."
raise DIWireScopeMismatchError(msg)
if target_scope_level is _scope_obj_2 or target_scope_level == 2:
return _SessionResolver(
self,
self._cleanup_enabled,
context,
self,
)
if target_scope_level is _scope_obj_4 or target_scope_level == 4:
request_resolver = _RequestResolver(
self,
self._cleanup_enabled,
None,
self,
)
action_resolver = _ActionResolver(
request_resolver._root_resolver,
request_resolver._cleanup_enabled,
context,
request_resolver,
request_resolver,
)
action_resolver._owned_scope_resolvers = (request_resolver,)
return action_resolver
if target_scope_level is _scope_obj_5 or target_scope_level == 5:
request_resolver = _RequestResolver(
self,
self._cleanup_enabled,
None,
self,
)
action_resolver = _ActionResolver(
request_resolver._root_resolver,
request_resolver._cleanup_enabled,
None,
request_resolver,
request_resolver,
)
step_resolver = _StepResolver(
action_resolver._root_resolver,
action_resolver._cleanup_enabled,
context,
action_resolver,
request_resolver,
)
step_resolver._owned_scope_resolvers = (request_resolver, action_resolver)
return step_resolver
msg = f"Scope level {target_scope_level} is not a valid next transition from level 1."
raise DIWireScopeMismatchError(msg)
def resolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated synchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks to avoid reflective dispatch.
if dependency is _dep_1_type:
return RootResolver.resolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return self.resolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(self.resolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
async def aresolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated asynchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks for asynchronous resolution.
if dependency is _dep_1_type:
return await RootResolver.aresolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return await self.aresolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(await self.aresolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
def _resolve_from_context(self, key: Any) -> Any:
context = self._context
if context is not None and key in context:
return context[key]
parent_context_resolver = self._parent_context_resolver
while parent_context_resolver is not None:
parent_context = parent_context_resolver._context
if parent_context is not None and key in parent_context:
return parent_context[key]
parent_context_resolver = parent_context_resolver._parent_context_resolver
msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
raise DIWireDependencyNotRegisteredError(msg)
def _is_registered_dependency(self, dependency: Any) -> bool:
return dependency in _dep_registered_keys
def __enter__(self) -> RootResolver:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
msg = "Cannot execute async cleanup in sync context. Use 'async with'."
raise DIWireAsyncDependencyInSyncContextError(msg)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
owned_scope_resolver.__exit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
def close(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return self.__exit__(exc_type, exc_value, traceback)
async def __aenter__(self) -> RootResolver:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
await cleanup(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
async def aclose(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return await self.__aexit__(exc_type, exc_value, traceback)
def resolve_1(self) -> Any:
"""
Provider slot 1 resolver (sync method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: RootResolver
Dependency wiring: none
Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope.
"""
msg = "Provider slot 1 requires opened scope level 3."
raise DIWireScopeMismatchError(msg)
async def aresolve_1(self) -> Any:
"""
Provider slot 1 resolver (async method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: RootResolver
Dependency wiring: none
Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
"""
return self.resolve_1()
class _SessionResolver:
"""
Generated resolver for scope 'session' (level 2).
This class is generated and optimized for direct slot-based dependency resolution.
All visible provider slots: 1.
Providers declared in this exact scope: none.
"""
__slots__ = (
"_root_resolver",
"_context",
"_parent_context_resolver",
"_cleanup_enabled",
"_cleanup_callbacks",
"_owned_scope_resolvers",
)
def __init__(
self,
root_resolver: RootResolver,
cleanup_enabled: bool = True,
context: Any | None = None,
parent_context_resolver: Any = None,
) -> None:
"""
Initialize resolver state for the current scope.
The constructor wires scope ancestry references, cache slots, and optional cleanup state.
Root scope class: RootResolver.
Stateless scope reuse enabled: False.
"""
self._cleanup_enabled = cleanup_enabled
self._root_resolver = root_resolver
self._context = context
self._parent_context_resolver = parent_context_resolver
self._cleanup_callbacks: list[tuple[int, Any]] = []
self._owned_scope_resolvers: tuple[Any, ...] = ()
def enter_scope(
self,
scope: Any | None = None,
*,
context: Any | None = None,
) -> _SessionResolver | _RequestResolver | _ActionResolver | _StepResolver:
"""
Open a deeper scope resolver from this resolver.
Current scope: session:2.
Allowed explicit transitions: request:3, action:4, step:5.
Passing None follows the default transition for the scope graph.
Optional context mapping is attached to the opened target scope.
"""
if scope is _scope_obj_3 or scope == 3:
return _RequestResolver(
self._root_resolver,
self._cleanup_enabled,
context,
self,
)
if scope is None:
return _RequestResolver(
self._root_resolver,
self._cleanup_enabled,
context,
self,
)
target_scope_level = scope
if target_scope_level is _scope_obj_2 or target_scope_level == 2:
return self
if target_scope_level <= 2:
msg = f"Cannot enter scope level {target_scope_level} from level 2."
raise DIWireScopeMismatchError(msg)
if target_scope_level is _scope_obj_4 or target_scope_level == 4:
request_resolver = _RequestResolver(
self._root_resolver,
self._cleanup_enabled,
None,
self,
)
action_resolver = _ActionResolver(
request_resolver._root_resolver,
request_resolver._cleanup_enabled,
context,
request_resolver,
request_resolver,
)
action_resolver._owned_scope_resolvers = (request_resolver,)
return action_resolver
if target_scope_level is _scope_obj_5 or target_scope_level == 5:
request_resolver = _RequestResolver(
self._root_resolver,
self._cleanup_enabled,
None,
self,
)
action_resolver = _ActionResolver(
request_resolver._root_resolver,
request_resolver._cleanup_enabled,
None,
request_resolver,
request_resolver,
)
step_resolver = _StepResolver(
action_resolver._root_resolver,
action_resolver._cleanup_enabled,
context,
action_resolver,
request_resolver,
)
step_resolver._owned_scope_resolvers = (request_resolver, action_resolver)
return step_resolver
msg = f"Scope level {target_scope_level} is not a valid next transition from level 2."
raise DIWireScopeMismatchError(msg)
def resolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated synchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks to avoid reflective dispatch.
if dependency is _dep_1_type:
return _SessionResolver.resolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return self.resolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(self.resolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
async def aresolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated asynchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks for asynchronous resolution.
if dependency is _dep_1_type:
return await _SessionResolver.aresolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return await self.aresolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(await self.aresolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
def _resolve_from_context(self, key: Any) -> Any:
context = self._context
if context is not None and key in context:
return context[key]
parent_context_resolver = self._parent_context_resolver
while parent_context_resolver is not None:
parent_context = parent_context_resolver._context
if parent_context is not None and key in parent_context:
return parent_context[key]
parent_context_resolver = parent_context_resolver._parent_context_resolver
msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
raise DIWireDependencyNotRegisteredError(msg)
def _is_registered_dependency(self, dependency: Any) -> bool:
return dependency in _dep_registered_keys
def __enter__(self) -> _SessionResolver:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
msg = "Cannot execute async cleanup in sync context. Use 'async with'."
raise DIWireAsyncDependencyInSyncContextError(msg)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
owned_scope_resolver.__exit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
def close(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return self.__exit__(exc_type, exc_value, traceback)
async def __aenter__(self) -> _SessionResolver:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
await cleanup(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
async def aclose(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return await self.__aexit__(exc_type, exc_value, traceback)
def resolve_1(self) -> Any:
"""
Provider slot 1 resolver (sync method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _SessionResolver
Dependency wiring: none
Behavior: This resolver cannot access the provider yet because the required scope is deeper than the current resolver scope.
"""
msg = "Provider slot 1 requires opened scope level 3."
raise DIWireScopeMismatchError(msg)
async def aresolve_1(self) -> Any:
"""
Provider slot 1 resolver (async method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _SessionResolver
Dependency wiring: none
Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
"""
return self.resolve_1()
class _RequestResolver:
"""
Generated resolver for scope 'request' (level 3).
This class is generated and optimized for direct slot-based dependency resolution.
All visible provider slots: 1.
Providers declared in this exact scope: 1.
"""
__slots__ = (
"_root_resolver",
"_context",
"_parent_context_resolver",
"_cleanup_enabled",
"_cleanup_callbacks",
"_owned_scope_resolvers",
"_cache_1",
)
def __init__(
self,
root_resolver: RootResolver,
cleanup_enabled: bool = True,
context: Any | None = None,
parent_context_resolver: Any = None,
) -> None:
"""
Initialize resolver state for the current scope.
The constructor wires scope ancestry references, cache slots, and optional cleanup state.
Root scope class: RootResolver.
Stateless scope reuse enabled: False.
"""
self._cleanup_enabled = cleanup_enabled
self._root_resolver = root_resolver
self._context = context
self._parent_context_resolver = parent_context_resolver
self._cleanup_callbacks: list[tuple[int, Any]] = []
self._owned_scope_resolvers: tuple[Any, ...] = ()
self._cache_1 = _MISSING_CACHE
def enter_scope(
self,
scope: Any | None = None,
*,
context: Any | None = None,
) -> _RequestResolver | _ActionResolver | _StepResolver:
"""
Open a deeper scope resolver from this resolver.
Current scope: request:3.
Allowed explicit transitions: action:4, step:5.
Passing None follows the default transition for the scope graph.
Optional context mapping is attached to the opened target scope.
"""
if scope is _scope_obj_4 or scope == 4:
return _ActionResolver(
self._root_resolver,
self._cleanup_enabled,
context,
self,
self,
)
if scope is None:
return _ActionResolver(
self._root_resolver,
self._cleanup_enabled,
context,
self,
self,
)
target_scope_level = scope
if target_scope_level is _scope_obj_3 or target_scope_level == 3:
return self
if target_scope_level <= 3:
msg = f"Cannot enter scope level {target_scope_level} from level 3."
raise DIWireScopeMismatchError(msg)
if target_scope_level is _scope_obj_5 or target_scope_level == 5:
action_resolver = _ActionResolver(
self._root_resolver,
self._cleanup_enabled,
None,
self,
self,
)
step_resolver = _StepResolver(
action_resolver._root_resolver,
action_resolver._cleanup_enabled,
context,
action_resolver,
action_resolver._request_resolver,
)
step_resolver._owned_scope_resolvers = (action_resolver,)
return step_resolver
msg = f"Scope level {target_scope_level} is not a valid next transition from level 3."
raise DIWireScopeMismatchError(msg)
def resolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated synchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks to avoid reflective dispatch.
if dependency is _dep_1_type:
return _RequestResolver.resolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return self.resolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(self.resolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
async def aresolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated asynchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks for asynchronous resolution.
if dependency is _dep_1_type:
return await _RequestResolver.aresolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return await self.aresolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(await self.aresolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
def _resolve_from_context(self, key: Any) -> Any:
context = self._context
if context is not None and key in context:
return context[key]
parent_context_resolver = self._parent_context_resolver
while parent_context_resolver is not None:
parent_context = parent_context_resolver._context
if parent_context is not None and key in parent_context:
return parent_context[key]
parent_context_resolver = parent_context_resolver._parent_context_resolver
msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
raise DIWireDependencyNotRegisteredError(msg)
def _is_registered_dependency(self, dependency: Any) -> bool:
return dependency in _dep_registered_keys
def __enter__(self) -> _RequestResolver:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
msg = "Cannot execute async cleanup in sync context. Use 'async with'."
raise DIWireAsyncDependencyInSyncContextError(msg)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
owned_scope_resolver.__exit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
def close(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return self.__exit__(exc_type, exc_value, traceback)
async def __aenter__(self) -> _RequestResolver:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
await cleanup(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
async def aclose(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return await self.__aexit__(exc_type, exc_value, traceback)
def resolve_1(self) -> Any:
"""
Provider slot 1 resolver (sync method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _RequestResolver
Dependency wiring: none
Behavior: Builds the provider value in this resolver, enforcing scope guards and cache policy.
"""
provider_scope_resolver = self
cached_value = self._cache_1
if cached_value is not _MISSING_CACHE:
return cached_value
with _dep_1_thread_lock:
if (cached_value := self._cache_1) is not _MISSING_CACHE:
return cached_value
if self._cleanup_enabled:
_provider_cm = contextmanager(_provider_1)()
value = _provider_cm.__enter__()
provider_scope_resolver._cleanup_callbacks.append((0, _provider_cm.__exit__))
else:
_provider_gen = _provider_1()
value = next(_provider_gen)
self._cache_1 = value
return value
async def aresolve_1(self) -> Any:
"""
Provider slot 1 resolver (async method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _RequestResolver
Dependency wiring: none
Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
"""
return self.resolve_1()
class _ActionResolver:
"""
Generated resolver for scope 'action' (level 4).
This class is generated and optimized for direct slot-based dependency resolution.
All visible provider slots: 1.
Providers declared in this exact scope: none.
"""
__slots__ = (
"_root_resolver",
"_context",
"_parent_context_resolver",
"_cleanup_enabled",
"_cleanup_callbacks",
"_owned_scope_resolvers",
"_request_resolver",
)
def __init__(
self,
root_resolver: RootResolver,
cleanup_enabled: bool = True,
context: Any | None = None,
parent_context_resolver: Any = None,
request_resolver: Any = _MISSING_RESOLVER,
) -> None:
"""
Initialize resolver state for the current scope.
The constructor wires scope ancestry references, cache slots, and optional cleanup state.
Root scope class: RootResolver.
Stateless scope reuse enabled: False.
"""
self._cleanup_enabled = cleanup_enabled
self._root_resolver = root_resolver
self._context = context
self._parent_context_resolver = parent_context_resolver
self._cleanup_callbacks: list[tuple[int, Any]] = []
self._owned_scope_resolvers: tuple[Any, ...] = ()
self._request_resolver = request_resolver
def enter_scope(
self,
scope: Any | None = None,
*,
context: Any | None = None,
) -> _ActionResolver | _StepResolver:
"""
Open a deeper scope resolver from this resolver.
Current scope: action:4.
Allowed explicit transitions: step:5.
Passing None follows the default transition for the scope graph.
Optional context mapping is attached to the opened target scope.
"""
if scope is _scope_obj_5 or scope == 5:
return _StepResolver(
self._root_resolver,
self._cleanup_enabled,
context,
self,
self._request_resolver,
)
if scope is None:
return _StepResolver(
self._root_resolver,
self._cleanup_enabled,
context,
self,
self._request_resolver,
)
target_scope_level = scope
if target_scope_level is _scope_obj_4 or target_scope_level == 4:
return self
if target_scope_level <= 4:
msg = f"Cannot enter scope level {target_scope_level} from level 4."
raise DIWireScopeMismatchError(msg)
msg = f"Scope level {target_scope_level} is not a valid next transition from level 4."
raise DIWireScopeMismatchError(msg)
def resolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated synchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks to avoid reflective dispatch.
if dependency is _dep_1_type:
return _ActionResolver.resolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return self.resolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(self.resolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
async def aresolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated asynchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks for asynchronous resolution.
if dependency is _dep_1_type:
return await _ActionResolver.aresolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return await self.aresolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(await self.aresolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
def _resolve_from_context(self, key: Any) -> Any:
context = self._context
if context is not None and key in context:
return context[key]
parent_context_resolver = self._parent_context_resolver
while parent_context_resolver is not None:
parent_context = parent_context_resolver._context
if parent_context is not None and key in parent_context:
return parent_context[key]
parent_context_resolver = parent_context_resolver._parent_context_resolver
msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
raise DIWireDependencyNotRegisteredError(msg)
def _is_registered_dependency(self, dependency: Any) -> bool:
return dependency in _dep_registered_keys
def __enter__(self) -> _ActionResolver:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
msg = "Cannot execute async cleanup in sync context. Use 'async with'."
raise DIWireAsyncDependencyInSyncContextError(msg)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
owned_scope_resolver.__exit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
def close(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return self.__exit__(exc_type, exc_value, traceback)
async def __aenter__(self) -> _ActionResolver:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
await cleanup(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
async def aclose(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return await self.__aexit__(exc_type, exc_value, traceback)
def resolve_1(self) -> Any:
"""
Provider slot 1 resolver (sync method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _ActionResolver
Dependency wiring: none
Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
"""
owner_resolver = self._request_resolver
if owner_resolver is _MISSING_RESOLVER:
msg = "Provider slot 1 requires opened scope level 3."
raise DIWireScopeMismatchError(msg)
return owner_resolver.resolve_1()
async def aresolve_1(self) -> Any:
"""
Provider slot 1 resolver (async method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _ActionResolver
Dependency wiring: none
Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
"""
return self.resolve_1()
class _StepResolver:
"""
Generated resolver for scope 'step' (level 5).
This class is generated and optimized for direct slot-based dependency resolution.
All visible provider slots: 1.
Providers declared in this exact scope: none.
"""
__slots__ = (
"_root_resolver",
"_context",
"_parent_context_resolver",
"_cleanup_enabled",
"_cleanup_callbacks",
"_owned_scope_resolvers",
"_request_resolver",
)
def __init__(
self,
root_resolver: RootResolver,
cleanup_enabled: bool = True,
context: Any | None = None,
parent_context_resolver: Any = None,
request_resolver: Any = _MISSING_RESOLVER,
) -> None:
"""
Initialize resolver state for the current scope.
The constructor wires scope ancestry references, cache slots, and optional cleanup state.
Root scope class: RootResolver.
Stateless scope reuse enabled: False.
"""
self._cleanup_enabled = cleanup_enabled
self._root_resolver = root_resolver
self._context = context
self._parent_context_resolver = parent_context_resolver
self._cleanup_callbacks: list[tuple[int, Any]] = []
self._owned_scope_resolvers: tuple[Any, ...] = ()
self._request_resolver = request_resolver
def enter_scope(
self,
scope: Any | None = None,
*,
context: Any | None = None,
) -> NoReturn:
"""
Open a deeper scope resolver from this resolver.
Current scope: step:5.
Allowed explicit transitions: none.
Passing None follows the default transition for the scope graph.
Optional context mapping is attached to the opened target scope.
"""
msg = "Cannot enter deeper scope from level 5."
raise DIWireScopeMismatchError(msg)
def resolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated synchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks to avoid reflective dispatch.
if dependency is _dep_1_type:
return _StepResolver.resolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return self.resolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(self.resolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
async def aresolve(self, dependency: Any) -> Any:
"""
Route a dependency token to a generated asynchronous provider resolver method.
Known provider slots: 1.
Dispatch uses identity checks against module-level `_dep_<slot>_type` globals.
"""
# Fast path identity checks for asynchronous resolution.
if dependency is _dep_1_type:
return await _StepResolver.aresolve_1(self)
if is_maybe_annotation(dependency):
inner = strip_maybe_annotation(dependency)
if is_provider_annotation(inner):
provider_inner = strip_provider_annotation(inner)
if is_async_provider_annotation(inner):
return lambda: self.aresolve(provider_inner)
return lambda: self.resolve(provider_inner)
if is_from_context_annotation(inner):
key = strip_from_context_annotation(inner)
try:
return self._resolve_from_context(key)
except DIWireDependencyNotRegisteredError:
return None
if not self._is_registered_dependency(inner):
return None
return await self.aresolve(inner)
if is_provider_annotation(dependency):
inner = strip_provider_annotation(dependency)
if is_async_provider_annotation(dependency):
return lambda: self.aresolve(inner)
return lambda: self.resolve(inner)
if is_from_context_annotation(dependency):
key = strip_from_context_annotation(dependency)
return self._resolve_from_context(key)
if is_all_annotation(dependency):
inner = strip_all_annotation(dependency)
slots = _all_slots_by_key.get(inner, ())
if not slots:
return ()
results: list[Any] = []
for slot in slots:
if slot == 1:
results.append(await self.aresolve_1())
continue
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
return tuple(results)
# Any dependency not pre-bound in build_root_resolver is unknown here.
msg = f"Dependency {dependency!r} is not registered."
raise DIWireDependencyNotRegisteredError(msg)
def _resolve_from_context(self, key: Any) -> Any:
context = self._context
if context is not None and key in context:
return context[key]
parent_context_resolver = self._parent_context_resolver
while parent_context_resolver is not None:
parent_context = parent_context_resolver._context
if parent_context is not None and key in parent_context:
return parent_context[key]
parent_context_resolver = parent_context_resolver._parent_context_resolver
msg = (f"Context value for {key!r} is not provided. Pass it via `enter_scope(..., context={...})` (or `diwire_context` for injected callables).")
raise DIWireDependencyNotRegisteredError(msg)
def _is_registered_dependency(self, dependency: Any) -> bool:
return dependency in _dep_registered_keys
def __enter__(self) -> _StepResolver:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
msg = "Cannot execute async cleanup in sync context. Use 'async with'."
raise DIWireAsyncDependencyInSyncContextError(msg)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
owned_scope_resolver.__exit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
def close(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return self.__exit__(exc_type, exc_value, traceback)
async def __aenter__(self) -> _StepResolver:
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
cleanup_error: BaseException | None = None
while self._cleanup_callbacks:
cleanup_kind, cleanup = self._cleanup_callbacks.pop()
try:
if cleanup_kind == 0:
cleanup(exc_type, exc_value, traceback)
else:
await cleanup(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if self._owned_scope_resolvers:
for owned_scope_resolver in reversed(self._owned_scope_resolvers):
try:
await owned_scope_resolver.__aexit__(exc_type, exc_value, traceback)
except BaseException as error:
if exc_type is None and cleanup_error is None:
cleanup_error = error
if exc_type is None and cleanup_error is not None:
raise cleanup_error
return None
async def aclose(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
return await self.__aexit__(exc_type, exc_value, traceback)
def resolve_1(self) -> Any:
"""
Provider slot 1 resolver (sync method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _StepResolver
Dependency wiring: none
Behavior: This resolver delegates to the cache owner resolver so scoped caching remains consistent across nested resolvers.
"""
owner_resolver = self._request_resolver
if owner_resolver is _MISSING_RESOLVER:
msg = "Provider slot 1 requires opened scope level 3."
raise DIWireScopeMismatchError(msg)
return owner_resolver.resolve_1()
async def aresolve_1(self) -> Any:
"""
Provider slot 1 resolver (async method).
Returns: __main__.Session
Provider spec kind: generator
Provider target: __main__.session_factory
Declared scope: request (level 3)
Declared lifetime: scoped
Cache policy: cached
Cache owner scope: 3
Declared lock mode: auto
Effective lock mode: thread
Sync path thread lock: True
Async path async lock: False
Provider declared async: False
Graph requires async: False
Cleanup callbacks required: True
Resolver class handling this call: _StepResolver
Dependency wiring: none
Behavior: Async variant delegates to sync resolution because this provider graph does not require awaitable operations.
"""
return self.resolve_1()
def build_root_resolver(
registrations: ProvidersRegistrations,
*,
cleanup_enabled: bool = True,
) -> RootResolver:
"""
Build and return the generated root resolver instance.
This function rebinds module-level provider globals for the supplied registrations.
Global rebinding makes `resolve_<slot>` methods run without registration lookups.
Provider slots configured during bootstrap: 1.
Root resolver class: RootResolver.
Examples:
>>> root = build_root_resolver(registrations)
>>> root.resolve(SomeService)
>>> await root.aresolve(SomeAsyncService)
>>> scoped = root.enter_scope()
"""
# Bind module-level globals to this container registration snapshot.
# This keeps hot paths in resolver methods free from dictionary lookups.
global _all_slots_by_key
global _dep_registered_keys
# Rebuild All[...] indexes for collect-all dependency dispatch.
_all_slots_by_key = {}
_dep_registered_keys = set()
all_slots_by_key: dict[Any, list[int]] = {}
global _dep_1_type, _provider_1
# --- Provider slot 1 bootstrap metadata ---
# Read provider spec by stable slot id from registrations.
registration_1 = registrations.get_by_slot(1)
# Capture dependency identity token used by `resolve`/`aresolve` dispatch.
_dep_1_type = registration_1.provides
# Track dependency keys that are directly registered in this compiled graph.
_dep_registered_keys.add(_dep_1_type)
# Capture provider object (instance/type/factory/generator/context manager).
_provider_1 = registration_1.generator
# Index provider slot for All[...] dependency dispatch.
component_base_1 = component_base_key(_dep_1_type)
base_key_1 = component_base_1
if base_key_1 is None and not hasattr(_dep_1_type, '__metadata__'):
base_key_1 = _dep_1_type
if base_key_1 is not None:
all_slots_by_key.setdefault(base_key_1, []).append(1)
# Freeze All[...] indexes for stable repr and to discourage mutation.
_all_slots_by_key = {
key: tuple(slots) for key, slots in all_slots_by_key.items()
}
# Construct a fresh root resolver configured with optional cleanup callbacks.
return RootResolver(cleanup_enabled)
from collections.abc import Generator
from diwire import Container, Lifetime, Scope
class Session:
def __init__(self) -> None:
self.closed = False
def close(self) -> None:
self.closed = True
def session_factory() -> Generator[Session, None, None]:
session = Session()
try:
yield session
finally:
session.close()
class Foo:
def __init__(self, session: Session) -> None:
self.session = session
container = Container(
use_resolver_context=False,)
container.add_generator(
session_factory,
provides=Session,
scope=Scope.REQUEST,
lifetime=Lifetime.SCOPED,
)
with container.enter_scope() as request_scope:
session1 = request_scope.resolve(Session)
with container.enter_scope() as request_scope2:
session2 = request_scope2.resolve(Session)
assert session2 is not session1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment