Phase 3.1: Type Safety - Add bridge_types.py for shared type aliases (EventDict, PowerWatts, Timestamp, DeviceID) - Define protocols for callbacks and message parsers - Strict type annotations on all core modules (session_detector, event_queue, device_manager) - Fix Optional handling and type guards throughout codebase - Achieve full mypy compliance: 0 errors across 47 source files Phase 3.2: Logging Unification - Migrate from stdlib logging to pure structlog across all runtime modules - Convert all logs to structured event+fields format (snake_case event names) - Remove f-string and printf-style logger calls - Add contextvars support for per-request correlation - Implement FastAPI middleware to bind request_id, http_method, http_path - Propagate X-Request-ID header in responses - Remove stdlib logging imports except setup layer (utils/logging.py) - Ensure log-level consistency across all modules Files Modified: - iot_bridge/bridge_types.py (new) - Central type definitions - iot_bridge/core/* - Type safety and logging unification - iot_bridge/clients/* - Structured logging with request context - iot_bridge/parsers/* - Type-safe parsing with structured logs - iot_bridge/utils/logging.py - Pure structlog setup with contextvars - iot_bridge/api/server.py - Added request correlation middleware - iot_bridge/tests/* - Test fixtures updated for type safety - iot_bridge/OPTIMIZATION_PLAN.md - Phase 3 status updated Validation: - mypy . → 0 errors (47 files) - All unit tests pass - Runtime behavior unchanged - API response headers include X-Request-ID
282 lines
9.6 KiB
Python
282 lines
9.6 KiB
Python
"""
|
|
Device Manager for IoT Bridge
|
|
Manages dynamic addition/removal of devices and MQTT subscriptions
|
|
"""
|
|
|
|
from datetime import datetime
|
|
|
|
import structlog
|
|
|
|
from core.session_detector import SessionDetector
|
|
from parsers.shelly_parser import ShellyParser
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class DeviceManager:
|
|
"""
|
|
Manages IoT devices, session detectors, and MQTT subscriptions dynamically.
|
|
|
|
Handles:
|
|
- Adding new devices (creates detector + subscribes to MQTT)
|
|
- Updating devices (updates detector parameters)
|
|
- Removing devices (closes session + unsubscribes)
|
|
- Routing MQTT messages to correct detectors
|
|
- Tracking device online/offline status
|
|
"""
|
|
|
|
def __init__(self, mqtt_client, event_queue, parser=None, status_monitor=None):
|
|
"""
|
|
Initialize Device Manager.
|
|
|
|
Args:
|
|
mqtt_client: MQTTClient instance for managing subscriptions
|
|
event_queue: EventQueue instance for event handling
|
|
parser: Message parser (default: ShellyParser)
|
|
status_monitor: DeviceStatusMonitor instance for tracking online/offline
|
|
"""
|
|
self.mqtt_client = mqtt_client
|
|
self.event_queue = event_queue
|
|
self.parser = parser or ShellyParser()
|
|
self.status_monitor = status_monitor
|
|
|
|
# Device tracking
|
|
self.session_detectors: dict[str, SessionDetector] = {}
|
|
self.device_map: dict[str, str] = {} # topic -> device_id
|
|
|
|
logger.info("device_manager_initialized")
|
|
|
|
def apply_config(self, config):
|
|
"""
|
|
Apply new configuration: add/update/remove devices.
|
|
|
|
Args:
|
|
config: BridgeConfig instance from api.models
|
|
"""
|
|
from api.models import BridgeConfig
|
|
|
|
if not isinstance(config, BridgeConfig):
|
|
raise ValueError("config must be a BridgeConfig instance")
|
|
|
|
# Get current and new device IDs
|
|
current_device_ids = set(self.session_detectors.keys())
|
|
new_device_ids = {d.device_id for d in config.devices}
|
|
|
|
# Devices to add
|
|
to_add = new_device_ids - current_device_ids
|
|
# Devices to remove
|
|
to_remove = current_device_ids - new_device_ids
|
|
# Devices to potentially update
|
|
to_check = current_device_ids & new_device_ids
|
|
|
|
logger.info("config_diff", add=len(to_add), remove=len(to_remove), check=len(to_check))
|
|
|
|
# Remove old devices
|
|
for device_id in to_remove:
|
|
self._remove_device(device_id)
|
|
|
|
# Add new devices
|
|
for device in config.devices:
|
|
if device.device_id in to_add:
|
|
self._add_device(device)
|
|
|
|
# Check for updates in existing devices
|
|
for device in config.devices:
|
|
if device.device_id in to_check:
|
|
self._update_device(device)
|
|
|
|
logger.info(
|
|
"config_applied",
|
|
total_devices=len(self.session_detectors),
|
|
subscriptions=len(self.device_map),
|
|
)
|
|
|
|
def _add_device(self, device):
|
|
"""Add a new device: create detector + subscribe to MQTT."""
|
|
try:
|
|
device_id = device.device_id
|
|
|
|
# Create callback for this detector
|
|
def event_callback(event):
|
|
logger.info(
|
|
"event_generated",
|
|
event_type=event["event_type"],
|
|
device_id=device_id,
|
|
)
|
|
self.event_queue.enqueue(event)
|
|
|
|
# Create session detector
|
|
session_cfg = device.session_config
|
|
detector = SessionDetector(
|
|
device_id=device_id,
|
|
machine_name=device.machine_name,
|
|
standby_threshold_w=session_cfg.standby_threshold_w,
|
|
working_threshold_w=session_cfg.working_threshold_w,
|
|
start_debounce_s=session_cfg.start_debounce_s,
|
|
stop_debounce_s=session_cfg.stop_debounce_s,
|
|
message_timeout_s=session_cfg.message_timeout_s,
|
|
heartbeat_interval_s=session_cfg.heartbeat_interval_s,
|
|
event_callback=event_callback,
|
|
)
|
|
|
|
self.session_detectors[device_id] = detector
|
|
self.device_map[device.mqtt_topic] = device_id
|
|
|
|
# Subscribe to MQTT topic
|
|
self.mqtt_client.subscribe(device.mqtt_topic)
|
|
|
|
logger.info(
|
|
"device_added",
|
|
device_id=device_id,
|
|
machine_name=device.machine_name,
|
|
topic=device.mqtt_topic,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error("device_add_failed", device_id=device.device_id, error=str(e))
|
|
raise
|
|
|
|
def _update_device(self, device):
|
|
"""
|
|
Update existing device configuration.
|
|
|
|
Checks if session config changed and updates detector parameters.
|
|
Note: MQTT topic changes require remove + add.
|
|
"""
|
|
try:
|
|
device_id = device.device_id
|
|
detector = self.session_detectors.get(device_id)
|
|
|
|
if not detector:
|
|
logger.warning("device_not_found_for_update", device_id=device_id)
|
|
return
|
|
|
|
# Check if anything changed
|
|
session_cfg = device.session_config
|
|
changed = False
|
|
|
|
if detector.standby_threshold_w != session_cfg.standby_threshold_w:
|
|
detector.standby_threshold_w = session_cfg.standby_threshold_w
|
|
changed = True
|
|
|
|
if detector.working_threshold_w != session_cfg.working_threshold_w:
|
|
detector.working_threshold_w = session_cfg.working_threshold_w
|
|
changed = True
|
|
|
|
if detector.heartbeat_interval_s != session_cfg.heartbeat_interval_s:
|
|
detector.heartbeat_interval_s = session_cfg.heartbeat_interval_s
|
|
changed = True
|
|
|
|
if detector.machine_name != device.machine_name:
|
|
detector.machine_name = device.machine_name
|
|
changed = True
|
|
|
|
if changed:
|
|
logger.info(
|
|
"device_updated",
|
|
device_id=device_id,
|
|
standby_w=session_cfg.standby_threshold_w,
|
|
working_w=session_cfg.working_threshold_w,
|
|
)
|
|
else:
|
|
logger.debug("device_unchanged", device_id=device_id)
|
|
|
|
except Exception as e:
|
|
logger.error("device_update_failed", device_id=device.device_id, error=str(e))
|
|
|
|
def _remove_device(self, device_id: str):
|
|
"""
|
|
Remove device: close active session + unsubscribe from MQTT.
|
|
"""
|
|
try:
|
|
detector = self.session_detectors.get(device_id)
|
|
|
|
if not detector:
|
|
logger.warning("device_not_found_for_removal", device_id=device_id)
|
|
return
|
|
|
|
# Close active session if any
|
|
if detector.current_session_id:
|
|
logger.info(
|
|
"closing_session_before_removal",
|
|
device_id=device_id,
|
|
session_id=detector.current_session_id[:8],
|
|
)
|
|
detector._end_session("session_ended", datetime.utcnow()) # Triggers event
|
|
|
|
# Find and remove topic mapping
|
|
topic_to_remove = None
|
|
for topic, dev_id in self.device_map.items():
|
|
if dev_id == device_id:
|
|
topic_to_remove = topic
|
|
break
|
|
|
|
if topic_to_remove:
|
|
# Unsubscribe from MQTT
|
|
self.mqtt_client.unsubscribe(topic_to_remove)
|
|
del self.device_map[topic_to_remove]
|
|
|
|
# Remove detector
|
|
del self.session_detectors[device_id]
|
|
|
|
logger.info("device_removed", device_id=device_id, topic=topic_to_remove)
|
|
|
|
except Exception as e:
|
|
logger.error("device_remove_failed", device_id=device_id, error=str(e))
|
|
|
|
def route_message(self, topic: str, payload: dict):
|
|
"""
|
|
Route MQTT message to appropriate device detector.
|
|
|
|
Args:
|
|
topic: MQTT topic
|
|
payload: Message payload (dict)
|
|
"""
|
|
device_id = self.device_map.get(topic)
|
|
|
|
if not device_id:
|
|
logger.warning("no_device_for_topic", topic=topic)
|
|
return
|
|
|
|
detector = self.session_detectors.get(device_id)
|
|
|
|
if not detector:
|
|
logger.warning("no_detector_for_device", device_id=device_id)
|
|
return
|
|
|
|
try:
|
|
# Update device status (track last_seen for online/offline detection)
|
|
if self.status_monitor:
|
|
self.status_monitor.update_last_seen(device_id)
|
|
|
|
# Parse message
|
|
parsed = self.parser.parse_message(topic, payload)
|
|
|
|
if parsed and parsed.get("apower") is not None:
|
|
power_w = parsed["apower"]
|
|
detector.process_power_measurement(power_w, datetime.utcnow())
|
|
|
|
except Exception as e:
|
|
logger.error("message_routing_error", topic=topic, error=str(e))
|
|
|
|
def check_timeouts(self):
|
|
"""Check all detectors for timeouts."""
|
|
current_time = datetime.utcnow()
|
|
for detector in self.session_detectors.values():
|
|
detector.check_timeout(current_time)
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current device manager status."""
|
|
return {
|
|
"device_count": len(self.session_detectors),
|
|
"subscriptions": len(self.device_map),
|
|
"devices": [
|
|
{
|
|
"device_id": device_id,
|
|
"state": detector.state,
|
|
"session_active": detector.current_session_id is not None,
|
|
}
|
|
for device_id, detector in self.session_detectors.items()
|
|
],
|
|
}
|