odoo_mqtt/iot_bridge/core/device_manager.py
matthias.lotz 910f2d1112 feat: online/offline detection via availability topic (direct_session)
Lasercutter (direct_session) sendet Plain-Text 'online'/'offline' auf
<device-id>/availability – bislang wurden diese Nachrichten still verworfen.

Änderungen:
- mqtt_client.py: non-JSON Payloads als {'_raw': text} durchleiten
  (statt bei JSONDecodeError komplett zu verwerfen)
- device_manager._add_device: direct_session-Geräte abonnieren zusätzlich
  <device_id>/availability; Eintrag landet in device_map
- device_manager._remove_device: entfernt ALLE Topics eines Geräts
  (vorher nur das erste gefundene – Bug bei mehreren Topics)
- device_manager.route_message: {'_raw': 'online'/'offline'} erzeugt
  device_online / device_offline Event in der Queue (case-insensitive)
- 15 neue Unit-Tests in test_availability_pipeline.py (102/102 grün)
2026-03-11 15:39:36 +01:00

343 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Device Manager for IoT Bridge
Manages dynamic addition/removal of devices and MQTT subscriptions
"""
import uuid
from datetime import datetime
import structlog
from core.session_detector import SessionDetector
from parsers.registry import get_parser
logger = structlog.get_logger()
class DeviceManager:
"""
Manages IoT devices, session detectors, and MQTT subscriptions dynamically.
Handles:
- Adding new devices (creates detector + subscribes to MQTT)
- Updating devices (updates detector parameters)
- Removing devices (closes session + unsubscribes)
- Routing MQTT messages to correct detectors
- Tracking device online/offline status
"""
def __init__(self, mqtt_client, event_queue, status_monitor=None):
"""
Initialize Device Manager.
Args:
mqtt_client: MQTTClient instance for managing subscriptions
event_queue: EventQueue instance for event handling
status_monitor: DeviceStatusMonitor instance for tracking online/offline
"""
self.mqtt_client = mqtt_client
self.event_queue = event_queue
self.status_monitor = status_monitor
# Device tracking
self.session_detectors: dict[str, SessionDetector] = {}
self.device_map: dict[str, str] = {} # topic -> device_id
self.parsers: dict[str, object] = {} # device_id -> parser instance
logger.info("device_manager_initialized")
def apply_config(self, config):
"""
Apply new configuration: add/update/remove devices.
Args:
config: BridgeConfig instance from api.models
"""
from api.models import BridgeConfig
if not isinstance(config, BridgeConfig):
raise ValueError("config must be a BridgeConfig instance")
# Get current and new device IDs
current_device_ids = set(self.session_detectors.keys())
new_device_ids = {d.device_id for d in config.devices}
# Devices to add
to_add = new_device_ids - current_device_ids
# Devices to remove
to_remove = current_device_ids - new_device_ids
# Devices to potentially update
to_check = current_device_ids & new_device_ids
logger.info("config_diff", add=len(to_add), remove=len(to_remove), check=len(to_check))
# Remove old devices
for device_id in to_remove:
self._remove_device(device_id)
# Add new devices
for device in config.devices:
if device.device_id in to_add:
self._add_device(device)
# Check for updates in existing devices
for device in config.devices:
if device.device_id in to_check:
self._update_device(device)
logger.info(
"config_applied",
total_devices=len(self.session_detectors),
subscriptions=len(self.device_map),
)
def _add_device(self, device):
"""Add a new device: create detector + subscribe to MQTT."""
try:
device_id = device.device_id
# Create callback for this detector
def event_callback(event):
"""Forward detector events to the delivery queue.
Args:
event: Session event dictionary emitted by detector.
Returns:
None.
"""
logger.info(
"event_generated",
event_type=event["event_type"],
device_id=device_id,
)
self.event_queue.enqueue(event)
# Create session detector parameters derived from parser_config with defaults
session_cfg = device.to_session_config()
detector = SessionDetector(
device_id=device_id,
machine_name=device.machine_name,
standby_threshold_w=session_cfg.standby_threshold_w,
working_threshold_w=session_cfg.working_threshold_w,
start_debounce_s=session_cfg.start_debounce_s,
stop_debounce_s=session_cfg.stop_debounce_s,
message_timeout_s=session_cfg.message_timeout_s,
heartbeat_interval_s=session_cfg.heartbeat_interval_s,
event_callback=event_callback,
)
self.session_detectors[device_id] = detector
self.device_map[device.mqtt_topic] = device_id
self.parsers[device_id] = get_parser(device.parser_type)
# Subscribe to MQTT topic
self.mqtt_client.subscribe(device.mqtt_topic)
# For direct_session devices: also subscribe to <device_id>/availability.
# The device sends plain "online"/"offline" as Last-Will / connect message.
if device.parser_type == "direct_session":
avail_topic = f"{device_id}/availability"
self.device_map[avail_topic] = device_id
self.mqtt_client.subscribe(avail_topic)
logger.info("availability_topic_subscribed", device_id=device_id, topic=avail_topic)
logger.info(
"device_added",
device_id=device_id,
machine_name=device.machine_name,
topic=device.mqtt_topic,
)
except Exception as e:
logger.error("device_add_failed", device_id=device.device_id, error=str(e))
raise
def _update_device(self, device):
"""
Update existing device configuration.
Checks if session config changed and updates detector parameters.
Note: MQTT topic changes require remove + add.
"""
try:
device_id = device.device_id
detector = self.session_detectors.get(device_id)
if not detector:
logger.warning("device_not_found_for_update", device_id=device_id)
return
# Check if anything changed
session_cfg = device.to_session_config()
changed = False
if detector.standby_threshold_w != session_cfg.standby_threshold_w:
detector.standby_threshold_w = session_cfg.standby_threshold_w
changed = True
if detector.working_threshold_w != session_cfg.working_threshold_w:
detector.working_threshold_w = session_cfg.working_threshold_w
changed = True
if detector.heartbeat_interval_s != session_cfg.heartbeat_interval_s:
detector.heartbeat_interval_s = session_cfg.heartbeat_interval_s
changed = True
if detector.machine_name != device.machine_name:
detector.machine_name = device.machine_name
changed = True
if changed:
logger.info(
"device_updated",
device_id=device_id,
standby_w=session_cfg.standby_threshold_w,
working_w=session_cfg.working_threshold_w,
)
else:
logger.debug("device_unchanged", device_id=device_id)
except Exception as e:
logger.error("device_update_failed", device_id=device.device_id, error=str(e))
def _remove_device(self, device_id: str):
"""
Remove device: close active session + unsubscribe from MQTT.
"""
try:
detector = self.session_detectors.get(device_id)
if not detector:
logger.warning("device_not_found_for_removal", device_id=device_id)
return
# Close active session if any
if detector.current_session_id:
logger.info(
"closing_session_before_removal",
device_id=device_id,
session_id=detector.current_session_id[:8],
)
detector._end_session("session_ended", datetime.utcnow()) # Triggers event
# Find and remove ALL topic mappings for this device
# (a direct_session device has two: session + availability)
topics_to_remove = [t for t, d in self.device_map.items() if d == device_id]
for topic_to_remove in topics_to_remove:
self.mqtt_client.unsubscribe(topic_to_remove)
del self.device_map[topic_to_remove]
# Remove detector and parser
del self.session_detectors[device_id]
self.parsers.pop(device_id, None)
logger.info("device_removed", device_id=device_id, topics=topics_to_remove)
except Exception as e:
logger.error("device_remove_failed", device_id=device_id, error=str(e))
def route_message(self, topic: str, payload: dict):
"""
Route MQTT message to appropriate device detector.
Args:
topic: MQTT topic
payload: Message payload (dict)
"""
device_id = self.device_map.get(topic)
if not device_id:
logger.warning("no_device_for_topic", topic=topic)
return
detector = self.session_detectors.get(device_id)
if not detector:
logger.warning("no_detector_for_device", device_id=device_id)
return
try:
# Update device status (track last_seen for online/offline detection)
if self.status_monitor:
self.status_monitor.update_last_seen(device_id)
# Handle plain-text availability messages ("online" / "offline")
raw = payload.get("_raw", "").strip().lower() if isinstance(payload, dict) else ""
if raw in ("online", "offline"):
event_type = "device_online" if raw == "online" else "device_offline"
event = {
"event_uid": str(uuid.uuid4()),
"event_type": event_type,
"device_id": device_id,
"timestamp": datetime.utcnow().isoformat(),
"payload": {},
}
logger.info("availability_event_enqueued", device_id=device_id, status=raw)
self.event_queue.enqueue(event)
return
# Parse message using per-device parser
parser = self.parsers.get(device_id)
if parser is None:
logger.warning("no_parser_for_device", device_id=device_id)
return
parsed = parser.parse_message(topic, payload)
if parsed and parsed.get("apower") is not None:
power_w = parsed["apower"]
detector.process_power_measurement(power_w, datetime.utcnow())
elif parsed and parsed.get("message_type") == "session_complete":
# Direct-session device: Gerät liefert fertige Session direkt,
# kein SessionDetector nötig.
session_id_raw = parsed.get("session_id")
event = {
"event_uid": str(uuid.uuid4()),
"event_type": "session_complete",
"device_id": device_id,
"session_id": str(session_id_raw) if session_id_raw is not None else None,
"timestamp": parsed.get("timestamp", datetime.utcnow().isoformat()),
"payload": {
"session_seconds": parsed.get("session_seconds"),
"session_minutes": parsed.get("session_minutes"),
"session_start_time": parsed.get("session_start_time"),
"freetime_s": parsed.get("freetime_s", 0),
},
}
logger.info(
"session_complete_event_enqueued",
device_id=device_id,
session_minutes=parsed.get("session_minutes"),
session_seconds=parsed.get("session_seconds"),
)
self.event_queue.enqueue(event)
except Exception as e:
logger.error("message_routing_error", topic=topic, error=str(e))
def check_timeouts(self):
"""Check all detectors for timeouts."""
current_time = datetime.utcnow()
for detector in self.session_detectors.values():
detector.check_timeout(current_time)
def get_status(self) -> dict:
"""Get runtime status snapshot of managed devices.
Returns:
Dictionary with counts and per-device state information.
"""
return {
"device_count": len(self.session_detectors),
"subscriptions": len(self.device_map),
"devices": [
{
"device_id": device_id,
"state": detector.state,
"session_active": detector.current_session_id is not None,
}
for device_id, detector in self.session_detectors.items()
],
}