diff --git a/open_workshop_mqtt/models/mqtt_session.py b/open_workshop_mqtt/models/mqtt_session.py index 72c7897..7e5758a 100644 --- a/open_workshop_mqtt/models/mqtt_session.py +++ b/open_workshop_mqtt/models/mqtt_session.py @@ -111,6 +111,12 @@ class MqttSession(models.Model): digits=(8, 2), help='Power consumption at session end' ) + current_power_w = fields.Float( + string='Current Power (W)', + readonly=True, + digits=(8, 2), + help='Current power consumption (live for running sessions)' + ) # ========== Session Status ========== status = fields.Selection([ @@ -119,6 +125,21 @@ class MqttSession(models.Model): ], string='Status', required=True, default='running', readonly=True, index=True, help='Session status') + current_state = fields.Selection([ + ('idle', 'Idle'), + ('starting', 'Starting'), + ('standby', 'Standby'), + ('working', 'Working'), + ('stopping', 'Stopping'), + ], string='Current State', readonly=True, + help='Current state in session state machine (for running sessions)') + + last_message_time = fields.Datetime( + string='Last Message', + readonly=True, + help='Timestamp of last MQTT message received' + ) + end_reason = fields.Selection([ ('power_drop', 'Power Drop'), ('timeout', 'Message Timeout'), diff --git a/open_workshop_mqtt/services/iot_bridge_service.py b/open_workshop_mqtt/services/iot_bridge_service.py index a5f405c..abcee39 100644 --- a/open_workshop_mqtt/services/iot_bridge_service.py +++ b/open_workshop_mqtt/services/iot_bridge_service.py @@ -12,6 +12,7 @@ from odoo import api, SUPERUSER_ID from .mqtt_client import MqttClient from .parsers.shelly_parser import ShellyParser +from .session_detector import SessionDetector _logger = logging.getLogger(__name__) @@ -36,10 +37,21 @@ class IotBridgeService: self.registry = env.registry self.db_name = env.cr.dbname self._clients: Dict[int, MqttClient] = {} # connection_id -> MqttClient + self._detectors: Dict[int, SessionDetector] = {} # device_id -> SessionDetector self._running_lock = threading.Lock() self._parser = ShellyParser() # For now only Shelly + # Timeout worker thread + self._timeout_worker_running = False + self._timeout_worker_thread = None + _logger.info(f"IoT Bridge Service initialized for database '{self.db_name}'") + + # Start timeout worker + self._start_timeout_worker() + + # Restore detector states from running sessions + self._restore_detector_states() def cleanup(self): """ @@ -48,7 +60,11 @@ class IotBridgeService: """ _logger.info(f"Cleaning up IoT Bridge Service for '{self.db_name}' ({len(self._clients)} active connections)") + # Stop timeout worker + self._stop_timeout_worker() + with self._running_lock: + # Stop all MQTT clients for connection_id in list(self._clients.keys()): try: client = self._clients[connection_id] @@ -57,6 +73,9 @@ class IotBridgeService: del self._clients[connection_id] except Exception as e: _logger.error(f"Error stopping connection {connection_id} during cleanup: {e}") + + # Clear all detectors + self._detectors.clear() _logger.info(f"IoT Bridge Service cleanup completed for '{self.db_name}'") @@ -600,54 +619,151 @@ class IotBridgeService: return connection_id in self._clients def _process_session(self, env, device, topic: str, payload: str): - """Simple session detection based on power > 0""" + """Process session using SessionDetector state machine""" try: + _logger.info(f"🔧 _process_session called for device {device.name}, topic: {topic}") + # Parse message parsed = self._parser.parse_message(topic, payload) if not parsed: + _logger.warning(f"⚠️ Parser returned None for topic {topic}, payload: {payload[:100]}") return + _logger.debug(f"✓ Parsed: {parsed}") + power = self._parser.get_power_value(parsed) if power is None: + _logger.warning(f"⚠️ No power value in parsed data: {parsed}") return + _logger.info(f"✓ Extracted power: {power}W from device {device.name}") + # Update device status - device.write({ - 'last_message_time': datetime.now(), - 'last_power_w': power, - }) + timestamp = datetime.now() + _logger.debug(f"About to write device status: last_message_time={timestamp}, last_power_w={power}") + try: + device.write({ + 'last_message_time': timestamp, + 'last_power_w': power, + }) + _logger.debug(f"✓ Device status updated") + except Exception as write_error: + _logger.error(f"❌ device.write() FAILED: {write_error}", exc_info=True) - # Find running session - running_session = env['mqtt.session'].search([ - ('device_id', '=', device.id), - ('status', '=', 'running') - ], limit=1) + # Get or create detector for this device + detector = self._get_or_create_detector(device) + _logger.info(f"✓ Got detector for device {device.name}, calling process_power_event({power}, {timestamp})") + + # Process power event through state machine (pass env!) + detector.process_power_event(env, power, timestamp) + _logger.info(f"✓ process_power_event completed") - if power > 0: - # Device is on - if not running_session: - # Start new session - env['mqtt.session'].create({ - 'device_id': device.id, - 'status': 'running', - 'start_time': datetime.now(), - 'start_power_w': power, - }) - _logger.info(f"🟢 Session started for {device.name} ({power}W)") - else: - # Device is off - if running_session: - # End session - calculate duration - end_time = datetime.now() - duration_s = int((end_time - running_session.start_time).total_seconds()) - - running_session.write({ - 'status': 'completed', - 'end_time': end_time, - 'end_power_w': power, - 'total_duration_s': duration_s, - 'end_reason': 'power_drop', - }) - _logger.info(f"🔴 Session ended for {device.name} (duration: {duration_s}s)") except Exception as e: - _logger.debug(f"Session processing error: {e}") + _logger.error(f"Session processing error for device {device.name}: {e}", exc_info=True) + + def _get_or_create_detector(self, device) -> SessionDetector: + """Get or create SessionDetector for device""" + if device.id not in self._detectors: + self._detectors[device.id] = SessionDetector(device.id, device.name) + return self._detectors[device.id] + + def _start_timeout_worker(self): + """Start background thread to check for session timeouts""" + if self._timeout_worker_running: + return + + self._timeout_worker_running = True + self._timeout_worker_thread = threading.Thread( + target=self._timeout_worker_loop, + daemon=True, + name=f"SessionTimeout-{self.db_name}" + ) + self._timeout_worker_thread.start() + _logger.info(f"Session timeout worker started for database '{self.db_name}'") + + def _stop_timeout_worker(self): + """Stop timeout worker thread""" + if not self._timeout_worker_running: + return + + _logger.info(f"Stopping session timeout worker for '{self.db_name}'") + self._timeout_worker_running = False + + if self._timeout_worker_thread: + self._timeout_worker_thread.join(timeout=2.0) + self._timeout_worker_thread = None + + def _timeout_worker_loop(self): + """Worker loop to check for session timeouts every second""" + import time + + while self._timeout_worker_running: + try: + # Check timeouts with new cursor + with self.registry.cursor() as new_cr: + new_env = api.Environment(new_cr, SUPERUSER_ID, {}) + current_time = datetime.now() + + # Check timeout for each detector + with self._running_lock: + for device_id, detector in list(self._detectors.items()): + try: + # Refresh device in new environment + device = new_env['mqtt.device'].browse(device_id) + if not device.exists(): + # Device was deleted + del self._detectors[device_id] + continue + + # Check timeout (pass env) + detector.check_timeout(new_env, current_time) + except Exception as e: + _logger.error(f"Error checking timeout for device {device_id}: {e}") + + new_cr.commit() + + # Sleep 1 second + time.sleep(1.0) + + except Exception as e: + _logger.error(f"Error in timeout worker loop: {e}", exc_info=True) + time.sleep(1.0) + + _logger.info(f"Session timeout worker stopped for '{self.db_name}'") + + def _restore_detector_states(self): + """Restore detector states from running sessions in DB""" + try: + with self.registry.cursor() as new_cr: + new_env = api.Environment(new_cr, SUPERUSER_ID, {}) + + # Find all devices with running sessions + running_sessions = new_env['mqtt.session'].search([ + ('status', '=', 'running') + ]) + + if not running_sessions: + _logger.info(f"No running sessions to restore for '{self.db_name}'") + return + + _logger.info(f"Restoring {len(running_sessions)} running session(s) for '{self.db_name}'") + + for session in running_sessions: + try: + device = session.device_id + if not device or not device.exists(): + continue + + # Create detector and restore state + detector = SessionDetector(device.id, device.name) + detector.restore_state_from_db(new_env) + self._detectors[device.id] = detector + + _logger.info(f"Restored detector for device {device.name} (state={detector.state})") + except Exception as e: + _logger.error(f"Error restoring detector for session {session.id}: {e}") + + new_cr.commit() + + except Exception as e: + _logger.error(f"Error restoring detector states: {e}", exc_info=True) diff --git a/open_workshop_mqtt/tests/__init__.py b/open_workshop_mqtt/tests/__init__.py index 8adb4e5..50e9584 100644 --- a/open_workshop_mqtt/tests/__init__.py +++ b/open_workshop_mqtt/tests/__init__.py @@ -5,3 +5,4 @@ from . import test_session_detection from . import test_device_status from . import test_mqtt_mocked # Mock-basierte Tests from . import test_topic_matching # Topic Pattern Matching Tests +from . import test_session_detector # Session Detector Unit Tests