- status_monitor: add availability_managed set; _monitor_loop skips devices in this set so the LWT/availability topic is the sole online/offline source - device_manager: register device with status_monitor.set_availability_managed() so the monitor actually skips them (previously the monitor had no knowledge of DeviceManager.availability_managed) - mqtt_bridge: remove blanket 'reset all devices to offline' on bridge restart; this was causing a race condition where the cron reset state AFTER the bridge had already sent device_online events via retained MQTT messages; stale running session cleanup is kept (still needed)
409 lines
17 KiB
Python
409 lines
17 KiB
Python
"""
|
||
Device Manager for IoT Bridge
|
||
Manages dynamic addition/removal of devices and MQTT subscriptions
|
||
"""
|
||
|
||
import uuid
|
||
import time
|
||
from datetime import datetime
|
||
|
||
import structlog
|
||
|
||
from core.session_detector import SessionDetector
|
||
from parsers.registry import get_parser
|
||
|
||
logger = structlog.get_logger()
|
||
|
||
|
||
class DeviceManager:
|
||
"""
|
||
Manages IoT devices, session detectors, and MQTT subscriptions dynamically.
|
||
|
||
Handles:
|
||
- Adding new devices (creates detector + subscribes to MQTT)
|
||
- Updating devices (updates detector parameters)
|
||
- Removing devices (closes session + unsubscribes)
|
||
- Routing MQTT messages to correct detectors
|
||
- Tracking device online/offline status
|
||
"""
|
||
|
||
def __init__(self, mqtt_client, event_queue, status_monitor=None):
|
||
"""
|
||
Initialize Device Manager.
|
||
|
||
Args:
|
||
mqtt_client: MQTTClient instance for managing subscriptions
|
||
event_queue: EventQueue instance for event handling
|
||
status_monitor: DeviceStatusMonitor instance for tracking online/offline
|
||
"""
|
||
self.mqtt_client = mqtt_client
|
||
self.event_queue = event_queue
|
||
self.status_monitor = status_monitor
|
||
|
||
# Device tracking
|
||
self.session_detectors: dict[str, SessionDetector] = {}
|
||
self.device_map: dict[str, str] = {} # topic -> device_id
|
||
self.parsers: dict[str, object] = {} # device_id -> parser instance
|
||
# Devices whose online/offline state is managed purely by their
|
||
# availability topic (LWT). The timeout monitor is bypassed for them.
|
||
self.availability_managed: set[str] = set()
|
||
# Topics that carry only heartbeat/status data – routed to update_last_seen
|
||
# only, not to the session parser (avoids spurious parse warnings).
|
||
self.heartbeat_topics: set[str] = set()
|
||
|
||
logger.info("device_manager_initialized")
|
||
|
||
def apply_config(self, config):
|
||
"""
|
||
Apply new configuration: add/update/remove devices.
|
||
|
||
Args:
|
||
config: BridgeConfig instance from api.models
|
||
"""
|
||
from api.models import BridgeConfig
|
||
|
||
if not isinstance(config, BridgeConfig):
|
||
raise ValueError("config must be a BridgeConfig instance")
|
||
|
||
# Get current and new device IDs
|
||
current_device_ids = set(self.session_detectors.keys())
|
||
new_device_ids = {d.device_id for d in config.devices}
|
||
|
||
# Devices to add
|
||
to_add = new_device_ids - current_device_ids
|
||
# Devices to remove
|
||
to_remove = current_device_ids - new_device_ids
|
||
# Devices to potentially update
|
||
to_check = current_device_ids & new_device_ids
|
||
|
||
logger.info("config_diff", add=len(to_add), remove=len(to_remove), check=len(to_check))
|
||
|
||
# Remove old devices
|
||
for device_id in to_remove:
|
||
self._remove_device(device_id)
|
||
|
||
# Add new devices
|
||
for device in config.devices:
|
||
if device.device_id in to_add:
|
||
self._add_device(device)
|
||
|
||
# Check for updates in existing devices
|
||
for device in config.devices:
|
||
if device.device_id in to_check:
|
||
self._update_device(device)
|
||
|
||
logger.info(
|
||
"config_applied",
|
||
total_devices=len(self.session_detectors),
|
||
subscriptions=len(self.device_map),
|
||
)
|
||
|
||
def _add_device(self, device):
|
||
"""Add a new device: create detector + subscribe to MQTT."""
|
||
try:
|
||
device_id = device.device_id
|
||
|
||
# Create callback for this detector
|
||
def event_callback(event):
|
||
"""Forward detector events to the delivery queue.
|
||
|
||
Args:
|
||
event: Session event dictionary emitted by detector.
|
||
|
||
Returns:
|
||
None.
|
||
"""
|
||
logger.info(
|
||
"event_generated",
|
||
event_type=event["event_type"],
|
||
device_id=device_id,
|
||
)
|
||
self.event_queue.enqueue(event)
|
||
|
||
# Create session detector – parameters derived from parser_config with defaults
|
||
session_cfg = device.to_session_config()
|
||
detector = SessionDetector(
|
||
device_id=device_id,
|
||
machine_name=device.machine_name,
|
||
standby_threshold_w=session_cfg.standby_threshold_w,
|
||
working_threshold_w=session_cfg.working_threshold_w,
|
||
start_debounce_s=session_cfg.start_debounce_s,
|
||
stop_debounce_s=session_cfg.stop_debounce_s,
|
||
message_timeout_s=session_cfg.message_timeout_s,
|
||
heartbeat_interval_s=session_cfg.heartbeat_interval_s,
|
||
event_callback=event_callback,
|
||
)
|
||
|
||
self.session_detectors[device_id] = detector
|
||
self.device_map[device.mqtt_topic] = device_id
|
||
self.parsers[device_id] = get_parser(device.parser_type)
|
||
|
||
# Subscribe to MQTT topic
|
||
self.mqtt_client.subscribe(device.mqtt_topic)
|
||
|
||
# For direct_session devices: subscribe availability topic (LWT).
|
||
# Online/offline is managed exclusively via this topic – the timeout
|
||
# monitor is deliberately bypassed for these devices.
|
||
if device.parser_type == "direct_session":
|
||
pcfg = device.parser_config or {}
|
||
|
||
avail_topic = pcfg.get("availability_topic", f"{device_id}/availability")
|
||
self.device_map[avail_topic] = device_id
|
||
self.mqtt_client.subscribe(avail_topic)
|
||
self.availability_managed.add(device_id)
|
||
if self.status_monitor:
|
||
self.status_monitor.set_availability_managed(device_id)
|
||
logger.info("availability_topic_subscribed", device_id=device_id, topic=avail_topic)
|
||
|
||
status_topic = pcfg.get("status_topic", "")
|
||
if status_topic and status_topic not in self.device_map:
|
||
self.device_map[status_topic] = device_id
|
||
self.heartbeat_topics.add(status_topic)
|
||
self.mqtt_client.subscribe(status_topic)
|
||
logger.info("status_topic_subscribed", device_id=device_id, topic=status_topic)
|
||
|
||
logger.info(
|
||
"device_added",
|
||
device_id=device_id,
|
||
machine_name=device.machine_name,
|
||
topic=device.mqtt_topic,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error("device_add_failed", device_id=device.device_id, error=str(e))
|
||
raise
|
||
|
||
def _update_device(self, device):
|
||
"""
|
||
Update existing device configuration.
|
||
|
||
Checks if session config changed and updates detector parameters.
|
||
Note: MQTT topic changes require remove + add.
|
||
"""
|
||
try:
|
||
device_id = device.device_id
|
||
detector = self.session_detectors.get(device_id)
|
||
|
||
if not detector:
|
||
logger.warning("device_not_found_for_update", device_id=device_id)
|
||
return
|
||
|
||
# Check if anything changed
|
||
session_cfg = device.to_session_config()
|
||
changed = False
|
||
|
||
if detector.standby_threshold_w != session_cfg.standby_threshold_w:
|
||
detector.standby_threshold_w = session_cfg.standby_threshold_w
|
||
changed = True
|
||
|
||
if detector.working_threshold_w != session_cfg.working_threshold_w:
|
||
detector.working_threshold_w = session_cfg.working_threshold_w
|
||
changed = True
|
||
|
||
if detector.heartbeat_interval_s != session_cfg.heartbeat_interval_s:
|
||
detector.heartbeat_interval_s = session_cfg.heartbeat_interval_s
|
||
changed = True
|
||
|
||
if detector.machine_name != device.machine_name:
|
||
detector.machine_name = device.machine_name
|
||
changed = True
|
||
|
||
# Check if status_topic changed – subscribe new, unsubscribe old
|
||
new_status_topic = (device.parser_config or {}).get("status_topic", "")
|
||
old_status_topics = [t for t, d in self.device_map.items()
|
||
if d == device_id
|
||
and t != device.mqtt_topic
|
||
and not t.endswith("/availability")]
|
||
old_status_topic = old_status_topics[0] if old_status_topics else ""
|
||
|
||
if new_status_topic != old_status_topic:
|
||
if old_status_topic and old_status_topic in self.device_map:
|
||
self.mqtt_client.unsubscribe(old_status_topic)
|
||
del self.device_map[old_status_topic]
|
||
logger.info("status_topic_unsubscribed", device_id=device_id, topic=old_status_topic)
|
||
if new_status_topic and new_status_topic not in self.device_map:
|
||
self.device_map[new_status_topic] = device_id
|
||
self.mqtt_client.subscribe(new_status_topic)
|
||
logger.info("status_topic_subscribed", device_id=device_id, topic=new_status_topic)
|
||
changed = True
|
||
|
||
if changed:
|
||
logger.info(
|
||
"device_updated",
|
||
device_id=device_id,
|
||
standby_w=session_cfg.standby_threshold_w,
|
||
working_w=session_cfg.working_threshold_w,
|
||
)
|
||
else:
|
||
logger.debug("device_unchanged", device_id=device_id)
|
||
|
||
except Exception as e:
|
||
logger.error("device_update_failed", device_id=device.device_id, error=str(e))
|
||
|
||
def _remove_device(self, device_id: str):
|
||
"""
|
||
Remove device: close active session + unsubscribe from MQTT.
|
||
"""
|
||
try:
|
||
detector = self.session_detectors.get(device_id)
|
||
|
||
if not detector:
|
||
logger.warning("device_not_found_for_removal", device_id=device_id)
|
||
return
|
||
|
||
# Close active session if any
|
||
if detector.current_session_id:
|
||
logger.info(
|
||
"closing_session_before_removal",
|
||
device_id=device_id,
|
||
session_id=detector.current_session_id[:8],
|
||
)
|
||
detector._end_session("session_ended", datetime.utcnow()) # Triggers event
|
||
|
||
# Find and remove ALL topic mappings for this device
|
||
# (a direct_session device has two: session + availability)
|
||
topics_to_remove = [t for t, d in self.device_map.items() if d == device_id]
|
||
for topic_to_remove in topics_to_remove:
|
||
self.mqtt_client.unsubscribe(topic_to_remove)
|
||
del self.device_map[topic_to_remove]
|
||
|
||
# Remove detector and parser
|
||
del self.session_detectors[device_id]
|
||
self.parsers.pop(device_id, None)
|
||
|
||
logger.info("device_removed", device_id=device_id, topics=topics_to_remove)
|
||
|
||
except Exception as e:
|
||
logger.error("device_remove_failed", device_id=device_id, error=str(e))
|
||
|
||
def route_message(self, topic: str, payload: dict):
|
||
"""
|
||
Route MQTT message to appropriate device detector.
|
||
|
||
Args:
|
||
topic: MQTT topic
|
||
payload: Message payload (dict)
|
||
"""
|
||
device_id = self.device_map.get(topic)
|
||
|
||
if not device_id:
|
||
logger.warning("no_device_for_topic", topic=topic)
|
||
return
|
||
|
||
detector = self.session_detectors.get(device_id)
|
||
|
||
if not detector:
|
||
logger.warning("no_detector_for_device", device_id=device_id)
|
||
return
|
||
|
||
try:
|
||
# Detect plain-text availability messages ("online" / "offline") FIRST,
|
||
# before update_last_seen – otherwise:
|
||
# "offline" would mark the device online in the monitor (message received),
|
||
# then immediately send device_offline, leaving is_online=True → spurious
|
||
# timeout-offline 20s later.
|
||
# "online" would emit device_online twice (monitor + _raw check).
|
||
raw = payload.get("_raw", "").strip().lower() if isinstance(payload, dict) else ""
|
||
if raw in ("online", "offline"):
|
||
is_online = raw == "online"
|
||
event_type = "device_online" if is_online else "device_offline"
|
||
|
||
# Sync the status monitor state WITHOUT triggering its own event
|
||
# emission – route_message is the sole emitter for availability events.
|
||
if self.status_monitor:
|
||
if is_online:
|
||
self.status_monitor.mark_online_silent(device_id)
|
||
else:
|
||
# Mark offline in monitor without emitting a duplicate event.
|
||
with self.status_monitor.lock:
|
||
if device_id in self.status_monitor.devices:
|
||
self.status_monitor.devices[device_id].is_online = False
|
||
self.status_monitor.devices[device_id].last_state_change = time.time()
|
||
self.status_monitor._save_status()
|
||
|
||
event = {
|
||
"event_uid": str(uuid.uuid4()),
|
||
"event_type": event_type,
|
||
"device_id": device_id,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
"payload": {},
|
||
}
|
||
logger.info("availability_event_enqueued", device_id=device_id, status=raw)
|
||
self.event_queue.enqueue(event)
|
||
return
|
||
|
||
# Regular JSON message – update last_seen for timeout-monitored devices.
|
||
# Availability-managed devices (direct_session + LWT) skip this:
|
||
# their online/offline is driven by the availability topic, not message frequency.
|
||
if self.status_monitor and device_id not in self.availability_managed:
|
||
self.status_monitor.update_last_seen(device_id)
|
||
|
||
# Heartbeat/status topics carry no session data – skip the parser.
|
||
if topic in self.heartbeat_topics:
|
||
logger.debug("heartbeat_topic_skipped", topic=topic, device_id=device_id)
|
||
return
|
||
|
||
# Parse message using per-device parser
|
||
parser = self.parsers.get(device_id)
|
||
if parser is None:
|
||
logger.warning("no_parser_for_device", device_id=device_id)
|
||
return
|
||
parsed = parser.parse_message(topic, payload)
|
||
|
||
if parsed and parsed.get("apower") is not None:
|
||
power_w = parsed["apower"]
|
||
detector.process_power_measurement(power_w, datetime.utcnow())
|
||
|
||
elif parsed and parsed.get("message_type") == "session_complete":
|
||
# Direct-session device: Gerät liefert fertige Session direkt,
|
||
# kein SessionDetector nötig.
|
||
session_id_raw = parsed.get("session_id")
|
||
event = {
|
||
"event_uid": str(uuid.uuid4()),
|
||
"event_type": "session_complete",
|
||
"device_id": device_id,
|
||
"session_id": str(session_id_raw) if session_id_raw is not None else None,
|
||
"timestamp": parsed.get("timestamp", datetime.utcnow().isoformat()),
|
||
"payload": {
|
||
"session_seconds": parsed.get("session_seconds"),
|
||
"session_minutes": parsed.get("session_minutes"),
|
||
"session_start_time": parsed.get("session_start_time"),
|
||
"freetime_s": parsed.get("freetime_s", 0),
|
||
},
|
||
}
|
||
logger.info(
|
||
"session_complete_event_enqueued",
|
||
device_id=device_id,
|
||
session_minutes=parsed.get("session_minutes"),
|
||
session_seconds=parsed.get("session_seconds"),
|
||
)
|
||
self.event_queue.enqueue(event)
|
||
|
||
except Exception as e:
|
||
logger.error("message_routing_error", topic=topic, error=str(e))
|
||
|
||
def check_timeouts(self):
|
||
"""Check all detectors for timeouts."""
|
||
current_time = datetime.utcnow()
|
||
for detector in self.session_detectors.values():
|
||
detector.check_timeout(current_time)
|
||
|
||
def get_status(self) -> dict:
|
||
"""Get runtime status snapshot of managed devices.
|
||
|
||
Returns:
|
||
Dictionary with counts and per-device state information.
|
||
"""
|
||
return {
|
||
"device_count": len(self.session_detectors),
|
||
"subscriptions": len(self.device_map),
|
||
"devices": [
|
||
{
|
||
"device_id": device_id,
|
||
"state": detector.state,
|
||
"session_active": detector.current_session_id is not None,
|
||
}
|
||
for device_id, detector in self.session_detectors.items()
|
||
],
|
||
}
|