72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
"""Dependency injection helpers for IoT Bridge runtime wiring."""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Callable
|
|
|
|
from core.bootstrap import BootstrapConfig, bootstrap, get_mqtt_config
|
|
from core.service_manager import ServiceManager
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RuntimeContainer:
|
|
"""Container for runtime dependencies and factory callables."""
|
|
|
|
bootstrap_factory: Callable[[], BootstrapConfig] = bootstrap
|
|
mqtt_config_factory: Callable[[object], object] = get_mqtt_config
|
|
service_manager_factory: Callable[..., ServiceManager] = ServiceManager
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RuntimeContext:
|
|
"""Resolved runtime objects used by main orchestration."""
|
|
|
|
boot_config: BootstrapConfig
|
|
service_manager: ServiceManager
|
|
mqtt_config: object
|
|
|
|
|
|
def create_service_manager(
|
|
boot_config: BootstrapConfig,
|
|
service_manager_factory: Callable[..., ServiceManager] = ServiceManager,
|
|
) -> ServiceManager:
|
|
"""Create a ``ServiceManager`` instance through an injectable factory.
|
|
|
|
Args:
|
|
boot_config: Bootstrapped runtime configuration container.
|
|
service_manager_factory: Factory used to instantiate ``ServiceManager``.
|
|
|
|
Returns:
|
|
Configured ``ServiceManager`` instance.
|
|
"""
|
|
return service_manager_factory(
|
|
config=boot_config.config,
|
|
logger=boot_config.logger,
|
|
bridge_port=boot_config.bridge_port,
|
|
bridge_token=boot_config.bridge_token,
|
|
)
|
|
|
|
|
|
def build_runtime_context(container: RuntimeContainer | None = None) -> RuntimeContext:
|
|
"""Resolve runtime context from a dependency container.
|
|
|
|
Args:
|
|
container: Optional dependency container override for tests/custom wiring.
|
|
|
|
Returns:
|
|
Fully resolved ``RuntimeContext`` used by the main entrypoint.
|
|
"""
|
|
runtime_container = container or RuntimeContainer()
|
|
|
|
boot_config = runtime_container.bootstrap_factory()
|
|
service_manager = create_service_manager(
|
|
boot_config=boot_config,
|
|
service_manager_factory=runtime_container.service_manager_factory,
|
|
)
|
|
mqtt_config = runtime_container.mqtt_config_factory(boot_config.config)
|
|
|
|
return RuntimeContext(
|
|
boot_config=boot_config,
|
|
service_manager=service_manager,
|
|
mqtt_config=mqtt_config,
|
|
)
|