From 5fcaef033691356cb4704749643618975aab6d98 Mon Sep 17 00:00:00 2001 From: "matthias.lotz" Date: Sat, 24 Jan 2026 19:21:05 +0100 Subject: [PATCH] feat(M5): Implement MQTT reconnect, state recovery, and error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✅ M5 Complete - All 21 tests passing **MQTT Auto-Reconnect:** - Exponential backoff (1s → 60s) in mqtt_client.py - Automatic reconnection on disconnect - Reset delay on successful connect **State Recovery:** - load_sessions() and get_running_sessions() in event_storage.py - restore_state() in session_detector.py - _restore_running_sessions() in main.py before MQTT connect - Running sessions continue with timeout detection after restart **Error Handling:** - All parsers wrapped in try-except (shelly_parser, session_detector) - Errors logged with exc_info=True for debugging - Bridge continues running on malformed messages **Bug Fixes:** - Fixed field name mismatch: duration_s → total_duration_s - Fixed None handling in session end methods - Fixed infinite recursion in _read_sessions() Test Results: 21 passed in 175.46s (0:02:55) --- open_workshop_mqtt/python_prototype/README.md | 6 +- .../python_prototype/event_storage.py | 107 ++++++---- open_workshop_mqtt/python_prototype/main.py | 15 ++ .../python_prototype/mqtt_client.py | 27 ++- .../python_prototype/session_detector.py | 194 +++++++++++------- .../python_prototype/shelly_parser.py | 127 +++++++----- 6 files changed, 310 insertions(+), 166 deletions(-) diff --git a/open_workshop_mqtt/python_prototype/README.md b/open_workshop_mqtt/python_prototype/README.md index f90e3ec..f5c7296 100644 --- a/open_workshop_mqtt/python_prototype/README.md +++ b/open_workshop_mqtt/python_prototype/README.md @@ -123,5 +123,9 @@ python tests/tools/shelly_simulator.py --scenario timeout - [x] Unit + Integration Tests - [x] Timeout Detection mit check_timeouts() - [x] M4: Multi-Device Support (2+ Maschinen parallel) -- [ ] M5: Reconnect + Error Handling +- [x] M5: Reconnect + Error Handling + - MQTT Auto-Reconnect mit Exponential Backoff (1s → 60s) + - State Recovery: Laufende Sessions werden nach Neustart wiederhergestellt + - Robustes Error Handling in allen Parsern + - Alle 21 Tests bestanden ✅ - [ ] M6-M10: Odoo Integration diff --git a/open_workshop_mqtt/python_prototype/event_storage.py b/open_workshop_mqtt/python_prototype/event_storage.py index 54b1de5..16e21fc 100644 --- a/open_workshop_mqtt/python_prototype/event_storage.py +++ b/open_workshop_mqtt/python_prototype/event_storage.py @@ -6,7 +6,7 @@ For later migration to Odoo database models import json import logging from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, List from datetime import datetime @@ -78,55 +78,84 @@ class EventStorage: "start_power_w": 45.7, "end_power_w": 0.0, // null if running "end_reason": "power_drop", // or "timeout", null if running - "status": "completed" // or "running" - } """ try: - event_type = session_event.get('event_type') + sessions = self._read_sessions() + session_id = session_event.get('session_id') + event_type = session_event.get('event_type') + machine = session_event.get('machine', {}) + session_data = session_event.get('session_data', {}) if event_type == 'session_start': - return self._store_session_start(session_event) - elif event_type == 'session_end': - return self._store_session_end(session_event) - else: - self.logger.warning(f"Unknown session event type: {event_type}") - return False + # Add new session + sessions.append({ + "session_id": session_id, + "machine_id": machine.get('machine_id'), + "machine_name": machine.get('machine_name'), + "start_time": session_data.get('start_time'), + "end_time": None, + "total_duration_s": None, + "standby_duration_s": None, + "working_duration_s": None, + "start_power_w": session_event.get('power_w'), + "end_power_w": None, + "end_reason": None, + "status": "running" + }) + elif event_type == 'session_end': + # Update existing session + for session in sessions: + if session['session_id'] == session_id: + session.update({ + "end_time": session_data.get('end_time'), + "total_duration_s": session_data.get('total_duration_s'), + "standby_duration_s": session_data.get('standby_duration_s'), + "working_duration_s": session_data.get('working_duration_s'), + "end_power_w": session_event.get('power_w'), + "end_reason": session_data.get('end_reason'), + "status": "completed" + }) + break + + self._write_sessions(sessions) + self.logger.debug(f"Session {event_type} {session_id} stored") + return True + except Exception as e: - self.logger.error(f"Failed to store session event: {e}") + self.logger.error(f"Failed to store session event: {e}", exc_info=True) return False - def _store_session_start(self, event: Dict) -> bool: - """Create new session record""" - sessions = self._read_sessions() + def load_sessions(self) -> List[Dict]: + """ + Load all sessions from sessions.json - machine = event.get('machine', {}) - session_data = event.get('session_data', {}) - - new_session = { - 'session_id': event['session_id'], - 'machine_id': machine.get('machine_id'), - 'machine_name': machine.get('machine_name'), - 'start_time': session_data.get('start_time'), - 'end_time': None, - 'total_duration_s': None, - 'standby_duration_s': None, - 'working_duration_s': None, - 'start_power_w': event.get('power_w'), - 'end_power_w': None, - 'end_reason': None, - 'status': 'running' - } - - sessions.append(new_session) - self._write_sessions(sessions) - - self.logger.info(f"Session {event['session_id'][:8]}... started") - return True + Returns: + List of session dicts (running and completed) + """ + try: + return self._read_sessions() + except Exception as e: + self.logger.error(f"Failed to load sessions: {e}") + return [] - def _store_session_end(self, event: Dict) -> bool: - """Update existing session with end data""" + def get_running_sessions(self) -> List[Dict]: + """ + Get all currently running sessions (end_time is None) + + Returns: + List of running session dicts + """ + try: + sessions = self._read_sessions() + return [s for s in sessions if s.get('end_time') is None] + except Exception as e: + self.logger.error(f"Failed to get running sessions: {e}") + return [] + + def _handle_session_end(self, event: Dict) -> bool: + """Handle session_end event""" sessions = self._read_sessions() session_id = event['session_id'] diff --git a/open_workshop_mqtt/python_prototype/main.py b/open_workshop_mqtt/python_prototype/main.py index 853afa6..02a412f 100644 --- a/open_workshop_mqtt/python_prototype/main.py +++ b/open_workshop_mqtt/python_prototype/main.py @@ -193,10 +193,25 @@ class IoTBridge: self.logger.info("=" * 70 + "\n") + def _restore_running_sessions(self): + """Restore running sessions after bridge restart""" + self.logger.info("Checking for running sessions to restore...") + + running_sessions = self.event_storage.get_running_sessions() + + if running_sessions: + self.logger.info(f"Found {len(running_sessions)} running session(s)") + self.session_detector.restore_state(running_sessions) + else: + self.logger.info("No running sessions to restore") + def start(self): """Start the IoT Bridge""" self.logger.info("Starting IoT Bridge...") + # Restore running sessions after restart + self._restore_running_sessions() + # Create data directory data_dir = Path('data') data_dir.mkdir(exist_ok=True) diff --git a/open_workshop_mqtt/python_prototype/mqtt_client.py b/open_workshop_mqtt/python_prototype/mqtt_client.py index 5e31eb2..1105d3c 100644 --- a/open_workshop_mqtt/python_prototype/mqtt_client.py +++ b/open_workshop_mqtt/python_prototype/mqtt_client.py @@ -53,11 +53,16 @@ class MQTTClient: self.connected = False self.topics = config.get('topics', []) + self.reconnect_delay = 1 # Start with 1 second + self.max_reconnect_delay = 60 # Max 60 seconds + self.reconnecting = False def _on_connect(self, client, userdata, flags, rc, properties=None): """Callback when connected to MQTT broker""" if rc == 0: self.connected = True + self.reconnecting = False + self.reconnect_delay = 1 # Reset delay on successful connect self.logger.info(f"Connected to MQTT Broker at {self.config['host']}:{self.config['port']}") # Subscribe to all configured topics @@ -67,15 +72,35 @@ class MQTTClient: else: self.logger.error(f"Failed to connect to MQTT Broker, return code {rc}") self.connected = False + self._schedule_reconnect() def _on_disconnect(self, client, userdata, rc, properties=None): """Callback when disconnected from MQTT broker""" self.connected = False if rc != 0: - self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc}). Reconnecting...") + self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc})") + self._schedule_reconnect() else: self.logger.info("Disconnected from MQTT Broker") + def _schedule_reconnect(self): + """Schedule automatic reconnection with exponential backoff""" + if self.reconnecting: + return # Already reconnecting + + self.reconnecting = True + self.logger.info(f"Attempting reconnect in {self.reconnect_delay}s...") + time.sleep(self.reconnect_delay) + + try: + self.client.reconnect() + self.logger.info("Reconnection initiated") + except Exception as e: + self.logger.error(f"Reconnection failed: {e}") + # Exponential backoff + self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) + self.reconnecting = False + def _on_message(self, client, userdata, msg): """Callback when message received""" try: diff --git a/open_workshop_mqtt/python_prototype/session_detector.py b/open_workshop_mqtt/python_prototype/session_detector.py index 96b4091..c8bb2a2 100644 --- a/open_workshop_mqtt/python_prototype/session_detector.py +++ b/open_workshop_mqtt/python_prototype/session_detector.py @@ -68,6 +68,57 @@ class SessionDetector: # State tracking per machine self.machine_states = {} # machine_id -> state info + def restore_state(self, running_sessions: List[Dict]) -> None: + """ + Restore state from running sessions after bridge restart + + Args: + running_sessions: List of session dicts with end_time=None from EventStorage + """ + if not running_sessions: + self.logger.info("No running sessions to restore") + return + + self.logger.info(f"Restoring state for {len(running_sessions)} running sessions") + + for session in running_sessions: + machine_id = session.get('machine_id') + session_id = session.get('session_id') + + if not machine_id or not session_id: + self.logger.warning(f"Invalid session data, skipping: {session}") + continue + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + self.logger.warning(f"No config for machine {machine_id}, cannot restore") + continue + + # Parse start time + try: + start_time = datetime.fromisoformat(session['start_time'].replace('Z', '+00:00')) + except Exception as e: + self.logger.error(f"Invalid start_time in session {session_id}: {e}") + continue + + # Restore machine state + self.machine_states[machine_id] = { + 'state': SessionState.STANDBY, # Start in STANDBY, will update on next message + 'state_since': start_time, + 'current_session_id': session_id, + 'session_start_time': start_time, + 'last_power': session.get('start_power_w'), + 'last_message_time': start_time, + 'standby_duration_s': session.get('standby_duration_s', 0), + 'working_duration_s': session.get('working_duration_s', 0), + 'last_state_change': start_time, + } + + self.logger.info( + f"✓ Restored session {session_id} for {session.get('machine_name', machine_id)}" + ) + def check_timeouts(self) -> List[Dict]: """ Check all machines for message timeouts and end sessions if needed. @@ -133,72 +184,77 @@ class SessionDetector: } } """ - # Extract machine info - machine = event.get('machine', {}) - machine_id = machine.get('machine_id') - - if not machine_id: - self.logger.warning("Event missing machine_id, skipping") + try: + # Extract machine info + machine = event.get('machine', {}) + machine_id = machine.get('machine_id') + + if not machine_id: + self.logger.warning("Event missing machine_id, skipping") + return None + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + self.logger.warning(f"No config found for machine {machine_id}") + return None + + # Extract power measurement + metrics = event.get('metrics', {}) + power_w = metrics.get('power_w') + + if power_w is None: + self.logger.debug(f"Event missing power_w, skipping") + return None + + # Initialize machine state if needed + if machine_id not in self.machine_states: + self.machine_states[machine_id] = { + 'state': SessionState.IDLE, + 'state_since': datetime.now(timezone.utc), + 'current_session_id': None, + 'session_start_time': None, + 'last_power': None, + 'last_message_time': None, + 'standby_duration_s': 0, + 'working_duration_s': 0, + 'last_state_change': datetime.now(timezone.utc), + } + + state_info = self.machine_states[machine_id] + timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00')) + machine_name = machine.get('machine_name', 'Unknown') + + # Check for timeout (no message received) + if state_info['last_message_time']: + time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds() + if time_since_last_message > config['message_timeout_s']: + if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]: + self.logger.warning( + f"{machine_name}: Message timeout " + f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END" + ) + return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info) + + # Update last message time + state_info['last_message_time'] = timestamp + + # Update last power + state_info['last_power'] = power_w + + # Process state machine + return self._process_state_machine( + machine_id=machine_id, + machine_name=machine.get('machine_name'), + power_w=power_w, + timestamp=timestamp, + config=config, + state_info=state_info + ) + + except Exception as e: + self.logger.error(f"Error processing event: {e}", exc_info=True) return None - - # Get machine config - config = self.machine_config.get(machine_id) - if not config: - self.logger.warning(f"No config found for machine {machine_id}") - return None - - # Extract power measurement - metrics = event.get('metrics', {}) - power_w = metrics.get('power_w') - - if power_w is None: - self.logger.debug(f"Event missing power_w, skipping") - return None - - # Initialize machine state if needed - if machine_id not in self.machine_states: - self.machine_states[machine_id] = { - 'state': SessionState.IDLE, - 'state_since': datetime.now(timezone.utc), - 'current_session_id': None, - 'session_start_time': None, - 'last_power': None, - 'last_message_time': None, - 'standby_duration_s': 0, - 'working_duration_s': 0, - 'last_state_change': datetime.now(timezone.utc), - } - - state_info = self.machine_states[machine_id] - timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00')) - machine_name = machine.get('machine_name', 'Unknown') - - # Check for timeout (no message received) - if state_info['last_message_time']: - time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds() - if time_since_last_message > config['message_timeout_s']: - if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]: - self.logger.warning( - f"{machine_name}: Message timeout " - f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END" - ) - return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info) - - # Update last message time - state_info['last_message_time'] = timestamp - - # Update last power - state_info['last_power'] = power_w - - # Process state machine - return self._process_state_machine( - machine_id=machine_id, - machine_name=machine.get('machine_name'), - power_w=power_w, - timestamp=timestamp, - config=config, - state_info=state_info - ) def _process_state_machine( self, @@ -343,8 +399,8 @@ class SessionDetector: start_time = state_info['session_start_time'] duration_s = (timestamp - start_time).total_seconds() - standby_duration = state_info['standby_duration_s'] - working_duration = state_info['working_duration_s'] + standby_duration = state_info.get('standby_duration_s') or 0 + working_duration = state_info.get('working_duration_s') or 0 self.logger.info( f"🔴 {machine_name}: Session END (power drop) - " @@ -389,8 +445,8 @@ class SessionDetector: start_time = state_info['session_start_time'] duration_s = (timestamp - start_time).total_seconds() - standby_duration = state_info['standby_duration_s'] - working_duration = state_info['working_duration_s'] + standby_duration = state_info.get('standby_duration_s') or 0 + working_duration = state_info.get('working_duration_s') or 0 self.logger.warning( f"⏱️ {machine_name}: Session END (TIMEOUT) - " diff --git a/open_workshop_mqtt/python_prototype/shelly_parser.py b/open_workshop_mqtt/python_prototype/shelly_parser.py index 1041f1c..bc8b0f9 100644 --- a/open_workshop_mqtt/python_prototype/shelly_parser.py +++ b/open_workshop_mqtt/python_prototype/shelly_parser.py @@ -44,19 +44,24 @@ class ShellyParser: Returns: Parsed data dict or None if not a relevant message """ - # Ignore debug logs - if 'debug/log' in topic: + try: + # Ignore debug logs + if 'debug/log' in topic: + return None + + # Parse different message types + if '/status/pm1:0' in topic: + return self._parse_status_message(topic, payload) + elif '/events/rpc' in topic: + return self._parse_rpc_event(topic, payload) + elif '/telemetry' in topic: + return self._parse_telemetry(topic, payload) + + return None + + except Exception as e: + self.logger.error(f"Error parsing message from {topic}: {e}", exc_info=True) return None - - # Parse different message types - if '/status/pm1:0' in topic: - return self._parse_status_message(topic, payload) - elif '/events/rpc' in topic: - return self._parse_rpc_event(topic, payload) - elif '/telemetry' in topic: - return self._parse_telemetry(topic, payload) - - return None def _parse_status_message(self, topic: str, payload: dict) -> Dict: """ @@ -74,57 +79,67 @@ class ShellyParser: "temperature": {"tC": 35.2, "tF": 95.4} } """ - # Extract device ID from topic prefix - device_id = self._extract_device_id_from_topic(topic) - - data = { - 'message_type': 'status', - 'device_id': device_id, - 'timestamp': datetime.utcnow().isoformat() + 'Z', - 'voltage': payload.get('voltage'), - 'current': payload.get('current'), - 'apower': payload.get('apower'), # Active Power in Watts - 'frequency': payload.get('freq'), - 'total_energy': payload.get('aenergy', {}).get('total'), - } - - self.logger.debug(f"Parsed status message: apower={data['apower']}W") - return data + try: + # Extract device ID from topic prefix + device_id = self._extract_device_id_from_topic(topic) + + data = { + 'message_type': 'status', + 'device_id': device_id, + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'voltage': payload.get('voltage'), + 'current': payload.get('current'), + 'apower': payload.get('apower'), # Active Power in Watts + 'frequency': payload.get('freq'), + 'total_energy': payload.get('aenergy', {}).get('total'), + } + + self.logger.debug(f"Parsed status message: apower={data['apower']}W") + return data + + except Exception as e: + self.logger.error(f"Error parsing status message: {e}", exc_info=True) + return None def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]: """ Parse RPC NotifyStatus event Topic: shellypmminig3/events/rpc """ - if payload.get('method') != 'NotifyStatus': + try: + if payload.get('method') != 'NotifyStatus': + return None + + device_id = payload.get('src', '').replace('shellypmminig3-', '') + params = payload.get('params', {}) + pm_data = params.get('pm1:0', {}) + + # Get timestamp from params or use current time + ts = params.get('ts') + if ts: + timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' + else: + timestamp = datetime.utcnow().isoformat() + 'Z' + + data = { + 'message_type': 'event', + 'device_id': device_id, + 'timestamp': timestamp, + 'apower': pm_data.get('apower'), + 'current': pm_data.get('current'), + 'voltage': pm_data.get('voltage'), + } + + # Only return if we have actual data + if data['apower'] is not None or data['current'] is not None: + self.logger.debug(f"Parsed RPC event: {pm_data}") + return data + + return None + + except Exception as e: + self.logger.error(f"Error parsing RPC event: {e}", exc_info=True) return None - - device_id = payload.get('src', '').replace('shellypmminig3-', '') - params = payload.get('params', {}) - pm_data = params.get('pm1:0', {}) - - # Get timestamp from params or use current time - ts = params.get('ts') - if ts: - timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' - else: - timestamp = datetime.utcnow().isoformat() + 'Z' - - data = { - 'message_type': 'event', - 'device_id': device_id, - 'timestamp': timestamp, - 'apower': pm_data.get('apower'), - 'current': pm_data.get('current'), - 'voltage': pm_data.get('voltage'), - } - - # Only return if we have actual data - if data['apower'] is not None or data['current'] is not None: - self.logger.debug(f"Parsed RPC event: {pm_data}") - return data - - return None def _parse_telemetry(self, topic: str, payload: dict) -> Dict: """