odoo_mqtt/iot_bridge/device_status_monitor.py
matthias.lotz ba588842ad feat: Add automatic API documentation generation and device status monitoring
- Implement build script (build_docs.py) with AST parser to auto-generate HTML docs from docstrings
- Add comprehensive Google-style docstrings to all controllers and models
- Create static/description/index.html for Odoo Apps UI with module overview
- Generate api_reference.html (20.5 KB) from source code, linked from Odoo UI
- Add DOCUMENTATION_STRATEGY.md with comparison of 5 documentation approaches
- Create API.md with complete REST API documentation

Device Status Monitoring:
- Implement device_status_monitor.py with health checks and offline detection
- Add /status endpoint for device health overview
- Automatic offline detection after message_timeout_s

Config Push Architecture:
- Add POST /config endpoint to IoT Bridge for dynamic device management
- Auto-push device config from Odoo on create/write/unlink
- Implement device_manager.py for runtime device updates

E2E Tests:
- All 6 E2E tests passing (Create, Update, Push, Delete, Restart, Status Monitor)
- Test coverage for device lifecycle and config synchronization

Documentation is auto-generated via: ./build_docs.sh
View in Odoo: Settings → Apps → Open Workshop MQTT → API Reference
2026-02-15 11:03:22 +01:00

289 lines
10 KiB
Python

"""
Device Status Monitor - Tracks device online/offline status based on MQTT activity.
Monitors last_seen timestamps and emits device_online/device_offline events to Odoo.
"""
import json
import structlog
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Callable
from dataclasses import dataclass, asdict
import uuid
logger = structlog.get_logger()
@dataclass
class DeviceStatus:
"""Status information for a device."""
device_id: str
last_seen: float # Unix timestamp
is_online: bool
last_state_change: float # Unix timestamp of last online/offline transition
class DeviceStatusMonitor:
"""
Monitors device activity and tracks online/offline status.
- Tracks last_seen timestamp per device on every MQTT message
- Runs background thread that checks timeouts
- Emits device_online/device_offline events
- Persists status to /data/device_status.json for restart recovery
"""
def __init__(self,
timeout_seconds: int = 30,
check_interval_seconds: int = 5,
persistence_path: str = "/data/device_status.json",
event_callback: Optional[Callable] = None):
"""
Initialize Device Status Monitor.
Args:
timeout_seconds: Seconds without message before device is marked offline
check_interval_seconds: How often to check for timeouts
persistence_path: Path to persist device status
event_callback: Callback(event_dict) for device_online/offline events
"""
self.timeout_seconds = timeout_seconds
self.check_interval_seconds = check_interval_seconds
self.persistence_path = Path(persistence_path)
self.event_callback = event_callback
# Device status tracking: device_id -> DeviceStatus
self.devices: Dict[str, DeviceStatus] = {}
self.lock = threading.Lock()
# Background thread
self.monitor_thread = None
self.stop_flag = threading.Event()
# Load persisted status
self._load_status()
logger.info(
"device_status_monitor_initialized",
timeout_s=timeout_seconds,
check_interval_s=check_interval_seconds,
persistence_path=str(self.persistence_path)
)
def _load_status(self):
"""Load persisted device status from disk."""
if not self.persistence_path.exists():
logger.info("no_persisted_status_found", path=str(self.persistence_path))
return
try:
with open(self.persistence_path, 'r') as f:
data = json.load(f)
for device_id, status_dict in data.items():
self.devices[device_id] = DeviceStatus(**status_dict)
logger.info(
"device_status_loaded",
device_count=len(self.devices),
path=str(self.persistence_path)
)
except Exception as e:
logger.error("failed_to_load_device_status", error=str(e))
def _save_status(self):
"""Persist device status to disk."""
try:
# Ensure directory exists
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
# Convert to dict
data = {
device_id: asdict(status)
for device_id, status in self.devices.items()
}
# Write atomically (write to temp, then rename)
temp_path = self.persistence_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(data, f, indent=2)
temp_path.replace(self.persistence_path)
logger.debug("device_status_saved", device_count=len(self.devices))
except Exception as e:
logger.error("failed_to_save_device_status", error=str(e))
def update_last_seen(self, device_id: str):
"""
Update last_seen timestamp for a device.
Called on every MQTT message. If device was offline, emit device_online event.
Args:
device_id: Device identifier
"""
now = time.time()
with self.lock:
if device_id not in self.devices:
# New device - mark as online
self.devices[device_id] = DeviceStatus(
device_id=device_id,
last_seen=now,
is_online=True,
last_state_change=now
)
logger.info("device_registered", device_id=device_id)
self._emit_device_online(device_id, now)
self._save_status()
else:
# Existing device - update last_seen
device = self.devices[device_id]
device.last_seen = now
# If was offline, mark online and emit event
if not device.is_online:
device.is_online = True
device.last_state_change = now
logger.info("device_came_online", device_id=device_id)
self._emit_device_online(device_id, now)
self._save_status()
def _emit_device_online(self, device_id: str, timestamp: float):
"""Emit device_online event to Odoo."""
if not self.event_callback:
return
event = {
'event_uid': str(uuid.uuid4()),
'event_type': 'device_online',
'device_id': device_id,
'timestamp': datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat(),
'payload': {
'last_seen': datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat(),
'reason': 'activity_detected'
}
}
try:
self.event_callback(event)
except Exception as e:
logger.error("failed_to_emit_device_online", device_id=device_id, error=str(e))
def _emit_device_offline(self, device_id: str, timestamp: float, reason: str = 'timeout'):
"""Emit device_offline event to Odoo."""
if not self.event_callback:
return
device = self.devices.get(device_id)
if not device:
return
event = {
'event_uid': str(uuid.uuid4()),
'event_type': 'device_offline',
'device_id': device_id,
'timestamp': datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat(),
'payload': {
'last_seen': datetime.fromtimestamp(device.last_seen, tz=timezone.utc).isoformat(),
'reason': reason,
'offline_duration_s': int(timestamp - device.last_seen)
}
}
try:
self.event_callback(event)
except Exception as e:
logger.error("failed_to_emit_device_offline", device_id=device_id, error=str(e))
def _monitor_loop(self):
"""Background thread that checks for device timeouts."""
logger.info("device_status_monitor_started")
while not self.stop_flag.is_set():
try:
now = time.time()
with self.lock:
for device_id, device in self.devices.items():
# Check if device timed out
if device.is_online:
elapsed = now - device.last_seen
if elapsed > self.timeout_seconds:
# Mark offline
device.is_online = False
device.last_state_change = now
logger.warning(
"device_went_offline",
device_id=device_id,
elapsed_s=int(elapsed),
timeout_s=self.timeout_seconds
)
self._emit_device_offline(device_id, now, reason='timeout')
self._save_status()
# Sleep with interruptible wait
self.stop_flag.wait(self.check_interval_seconds)
except Exception as e:
logger.error("monitor_loop_error", error=str(e))
self.stop_flag.wait(1)
logger.info("device_status_monitor_stopped")
def start(self):
"""Start the background monitoring thread."""
if self.monitor_thread and self.monitor_thread.is_alive():
logger.warning("monitor_already_running")
return
self.stop_flag.clear()
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info("device_status_monitor_thread_started")
def stop(self):
"""Stop the background monitoring thread."""
if not self.monitor_thread:
return
logger.info("stopping_device_status_monitor")
self.stop_flag.set()
self.monitor_thread.join(timeout=self.check_interval_seconds + 5)
# Save final state
self._save_status()
logger.info("device_status_monitor_stopped")
def get_status(self, device_id: str) -> Optional[Dict]:
"""
Get status for a specific device.
Returns:
Dict with device status or None if not found
"""
with self.lock:
device = self.devices.get(device_id)
if not device:
return None
return {
'device_id': device.device_id,
'is_online': device.is_online,
'last_seen': datetime.fromtimestamp(device.last_seen, tz=timezone.utc).isoformat(),
'last_state_change': datetime.fromtimestamp(device.last_state_change, tz=timezone.utc).isoformat(),
'seconds_since_seen': int(time.time() - device.last_seen)
}
def get_all_status(self) -> Dict[str, Dict]:
"""Get status for all devices."""
with self.lock:
return {
device_id: self.get_status(device_id)
for device_id in self.devices.keys()
}