feat: SessionDetector Integration - Live-Tracking & Timeout Worker

Integriert SessionDetector vollständig in iot_bridge_service:

iot_bridge_service.py:
- SessionDetector Import & _detectors Dictionary
- Timeout Worker Thread für automatisches Session-Ende
- _restore_detector_states() bei Service-Start
- _get_or_create_detector() Factory Methode
- _process_session() ruft detector.process_power_event(env, power, ts)
- _timeout_worker_loop() prüft alle 10s auf Timeouts

models/mqtt_session.py:
- current_power_w Field für Live-Power Updates
- current_state Field für State Machine Status (idle/starting/standby/working/stopping)
- last_message_time Field für Timeout Detection

tests/__init__.py:
- Import test_session_detector Module

Diese Änderungen sind essentiell für SessionDetector!
Ohne sie würde der Detector nicht aufgerufen werden.
This commit is contained in:
Matthias Lotz 2026-01-30 16:49:50 +01:00
parent 90e3422e8b
commit b46fed0f8e
3 changed files with 175 additions and 37 deletions

View File

@ -111,6 +111,12 @@ class MqttSession(models.Model):
digits=(8, 2),
help='Power consumption at session end'
)
current_power_w = fields.Float(
string='Current Power (W)',
readonly=True,
digits=(8, 2),
help='Current power consumption (live for running sessions)'
)
# ========== Session Status ==========
status = fields.Selection([
@ -119,6 +125,21 @@ class MqttSession(models.Model):
], string='Status', required=True, default='running', readonly=True,
index=True, help='Session status')
current_state = fields.Selection([
('idle', 'Idle'),
('starting', 'Starting'),
('standby', 'Standby'),
('working', 'Working'),
('stopping', 'Stopping'),
], string='Current State', readonly=True,
help='Current state in session state machine (for running sessions)')
last_message_time = fields.Datetime(
string='Last Message',
readonly=True,
help='Timestamp of last MQTT message received'
)
end_reason = fields.Selection([
('power_drop', 'Power Drop'),
('timeout', 'Message Timeout'),

View File

@ -12,6 +12,7 @@ from odoo import api, SUPERUSER_ID
from .mqtt_client import MqttClient
from .parsers.shelly_parser import ShellyParser
from .session_detector import SessionDetector
_logger = logging.getLogger(__name__)
@ -36,10 +37,21 @@ class IotBridgeService:
self.registry = env.registry
self.db_name = env.cr.dbname
self._clients: Dict[int, MqttClient] = {} # connection_id -> MqttClient
self._detectors: Dict[int, SessionDetector] = {} # device_id -> SessionDetector
self._running_lock = threading.Lock()
self._parser = ShellyParser() # For now only Shelly
# Timeout worker thread
self._timeout_worker_running = False
self._timeout_worker_thread = None
_logger.info(f"IoT Bridge Service initialized for database '{self.db_name}'")
# Start timeout worker
self._start_timeout_worker()
# Restore detector states from running sessions
self._restore_detector_states()
def cleanup(self):
"""
@ -48,7 +60,11 @@ class IotBridgeService:
"""
_logger.info(f"Cleaning up IoT Bridge Service for '{self.db_name}' ({len(self._clients)} active connections)")
# Stop timeout worker
self._stop_timeout_worker()
with self._running_lock:
# Stop all MQTT clients
for connection_id in list(self._clients.keys()):
try:
client = self._clients[connection_id]
@ -57,6 +73,9 @@ class IotBridgeService:
del self._clients[connection_id]
except Exception as e:
_logger.error(f"Error stopping connection {connection_id} during cleanup: {e}")
# Clear all detectors
self._detectors.clear()
_logger.info(f"IoT Bridge Service cleanup completed for '{self.db_name}'")
@ -600,54 +619,151 @@ class IotBridgeService:
return connection_id in self._clients
def _process_session(self, env, device, topic: str, payload: str):
"""Simple session detection based on power > 0"""
"""Process session using SessionDetector state machine"""
try:
_logger.info(f"🔧 _process_session called for device {device.name}, topic: {topic}")
# Parse message
parsed = self._parser.parse_message(topic, payload)
if not parsed:
_logger.warning(f"⚠️ Parser returned None for topic {topic}, payload: {payload[:100]}")
return
_logger.debug(f"✓ Parsed: {parsed}")
power = self._parser.get_power_value(parsed)
if power is None:
_logger.warning(f"⚠️ No power value in parsed data: {parsed}")
return
_logger.info(f"✓ Extracted power: {power}W from device {device.name}")
# Update device status
device.write({
'last_message_time': datetime.now(),
'last_power_w': power,
})
timestamp = datetime.now()
_logger.debug(f"About to write device status: last_message_time={timestamp}, last_power_w={power}")
try:
device.write({
'last_message_time': timestamp,
'last_power_w': power,
})
_logger.debug(f"✓ Device status updated")
except Exception as write_error:
_logger.error(f"❌ device.write() FAILED: {write_error}", exc_info=True)
# Find running session
running_session = env['mqtt.session'].search([
('device_id', '=', device.id),
('status', '=', 'running')
], limit=1)
# Get or create detector for this device
detector = self._get_or_create_detector(device)
_logger.info(f"✓ Got detector for device {device.name}, calling process_power_event({power}, {timestamp})")
# Process power event through state machine (pass env!)
detector.process_power_event(env, power, timestamp)
_logger.info(f"✓ process_power_event completed")
if power > 0:
# Device is on
if not running_session:
# Start new session
env['mqtt.session'].create({
'device_id': device.id,
'status': 'running',
'start_time': datetime.now(),
'start_power_w': power,
})
_logger.info(f"🟢 Session started for {device.name} ({power}W)")
else:
# Device is off
if running_session:
# End session - calculate duration
end_time = datetime.now()
duration_s = int((end_time - running_session.start_time).total_seconds())
running_session.write({
'status': 'completed',
'end_time': end_time,
'end_power_w': power,
'total_duration_s': duration_s,
'end_reason': 'power_drop',
})
_logger.info(f"🔴 Session ended for {device.name} (duration: {duration_s}s)")
except Exception as e:
_logger.debug(f"Session processing error: {e}")
_logger.error(f"Session processing error for device {device.name}: {e}", exc_info=True)
def _get_or_create_detector(self, device) -> SessionDetector:
"""Get or create SessionDetector for device"""
if device.id not in self._detectors:
self._detectors[device.id] = SessionDetector(device.id, device.name)
return self._detectors[device.id]
def _start_timeout_worker(self):
"""Start background thread to check for session timeouts"""
if self._timeout_worker_running:
return
self._timeout_worker_running = True
self._timeout_worker_thread = threading.Thread(
target=self._timeout_worker_loop,
daemon=True,
name=f"SessionTimeout-{self.db_name}"
)
self._timeout_worker_thread.start()
_logger.info(f"Session timeout worker started for database '{self.db_name}'")
def _stop_timeout_worker(self):
"""Stop timeout worker thread"""
if not self._timeout_worker_running:
return
_logger.info(f"Stopping session timeout worker for '{self.db_name}'")
self._timeout_worker_running = False
if self._timeout_worker_thread:
self._timeout_worker_thread.join(timeout=2.0)
self._timeout_worker_thread = None
def _timeout_worker_loop(self):
"""Worker loop to check for session timeouts every second"""
import time
while self._timeout_worker_running:
try:
# Check timeouts with new cursor
with self.registry.cursor() as new_cr:
new_env = api.Environment(new_cr, SUPERUSER_ID, {})
current_time = datetime.now()
# Check timeout for each detector
with self._running_lock:
for device_id, detector in list(self._detectors.items()):
try:
# Refresh device in new environment
device = new_env['mqtt.device'].browse(device_id)
if not device.exists():
# Device was deleted
del self._detectors[device_id]
continue
# Check timeout (pass env)
detector.check_timeout(new_env, current_time)
except Exception as e:
_logger.error(f"Error checking timeout for device {device_id}: {e}")
new_cr.commit()
# Sleep 1 second
time.sleep(1.0)
except Exception as e:
_logger.error(f"Error in timeout worker loop: {e}", exc_info=True)
time.sleep(1.0)
_logger.info(f"Session timeout worker stopped for '{self.db_name}'")
def _restore_detector_states(self):
"""Restore detector states from running sessions in DB"""
try:
with self.registry.cursor() as new_cr:
new_env = api.Environment(new_cr, SUPERUSER_ID, {})
# Find all devices with running sessions
running_sessions = new_env['mqtt.session'].search([
('status', '=', 'running')
])
if not running_sessions:
_logger.info(f"No running sessions to restore for '{self.db_name}'")
return
_logger.info(f"Restoring {len(running_sessions)} running session(s) for '{self.db_name}'")
for session in running_sessions:
try:
device = session.device_id
if not device or not device.exists():
continue
# Create detector and restore state
detector = SessionDetector(device.id, device.name)
detector.restore_state_from_db(new_env)
self._detectors[device.id] = detector
_logger.info(f"Restored detector for device {device.name} (state={detector.state})")
except Exception as e:
_logger.error(f"Error restoring detector for session {session.id}: {e}")
new_cr.commit()
except Exception as e:
_logger.error(f"Error restoring detector states: {e}", exc_info=True)

View File

@ -5,3 +5,4 @@ from . import test_session_detection
from . import test_device_status
from . import test_mqtt_mocked # Mock-basierte Tests
from . import test_topic_matching # Topic Pattern Matching Tests
from . import test_session_detector # Session Detector Unit Tests