feat(M5): Implement MQTT reconnect, state recovery, and error handling

 M5 Complete - All 21 tests passing

**MQTT Auto-Reconnect:**
- Exponential backoff (1s → 60s) in mqtt_client.py
- Automatic reconnection on disconnect
- Reset delay on successful connect

**State Recovery:**
- load_sessions() and get_running_sessions() in event_storage.py
- restore_state() in session_detector.py
- _restore_running_sessions() in main.py before MQTT connect
- Running sessions continue with timeout detection after restart

**Error Handling:**
- All parsers wrapped in try-except (shelly_parser, session_detector)
- Errors logged with exc_info=True for debugging
- Bridge continues running on malformed messages

**Bug Fixes:**
- Fixed field name mismatch: duration_s → total_duration_s
- Fixed None handling in session end methods
- Fixed infinite recursion in _read_sessions()

Test Results: 21 passed in 175.46s (0:02:55)
This commit is contained in:
Matthias Lotz 2026-01-24 19:21:05 +01:00
parent bb3e492d30
commit 5fcaef0336
6 changed files with 310 additions and 166 deletions

View File

@ -123,5 +123,9 @@ python tests/tools/shelly_simulator.py --scenario timeout
- [x] Unit + Integration Tests
- [x] Timeout Detection mit check_timeouts()
- [x] M4: Multi-Device Support (2+ Maschinen parallel)
- [ ] M5: Reconnect + Error Handling
- [x] M5: Reconnect + Error Handling
- MQTT Auto-Reconnect mit Exponential Backoff (1s → 60s)
- State Recovery: Laufende Sessions werden nach Neustart wiederhergestellt
- Robustes Error Handling in allen Parsern
- Alle 21 Tests bestanden ✅
- [ ] M6-M10: Odoo Integration

View File

@ -6,7 +6,7 @@ For later migration to Odoo database models
import json
import logging
from pathlib import Path
from typing import Dict, Optional
from typing import Dict, Optional, List
from datetime import datetime
@ -78,55 +78,84 @@ class EventStorage:
"start_power_w": 45.7,
"end_power_w": 0.0, // null if running
"end_reason": "power_drop", // or "timeout", null if running
"status": "completed" // or "running"
}
"""
try:
event_type = session_event.get('event_type')
sessions = self._read_sessions()
session_id = session_event.get('session_id')
event_type = session_event.get('event_type')
machine = session_event.get('machine', {})
session_data = session_event.get('session_data', {})
if event_type == 'session_start':
return self._store_session_start(session_event)
elif event_type == 'session_end':
return self._store_session_end(session_event)
else:
self.logger.warning(f"Unknown session event type: {event_type}")
return False
# Add new session
sessions.append({
"session_id": session_id,
"machine_id": machine.get('machine_id'),
"machine_name": machine.get('machine_name'),
"start_time": session_data.get('start_time'),
"end_time": None,
"total_duration_s": None,
"standby_duration_s": None,
"working_duration_s": None,
"start_power_w": session_event.get('power_w'),
"end_power_w": None,
"end_reason": None,
"status": "running"
})
elif event_type == 'session_end':
# Update existing session
for session in sessions:
if session['session_id'] == session_id:
session.update({
"end_time": session_data.get('end_time'),
"total_duration_s": session_data.get('total_duration_s'),
"standby_duration_s": session_data.get('standby_duration_s'),
"working_duration_s": session_data.get('working_duration_s'),
"end_power_w": session_event.get('power_w'),
"end_reason": session_data.get('end_reason'),
"status": "completed"
})
break
self._write_sessions(sessions)
self.logger.debug(f"Session {event_type} {session_id} stored")
return True
except Exception as e:
self.logger.error(f"Failed to store session event: {e}")
self.logger.error(f"Failed to store session event: {e}", exc_info=True)
return False
def _store_session_start(self, event: Dict) -> bool:
"""Create new session record"""
sessions = self._read_sessions()
def load_sessions(self) -> List[Dict]:
"""
Load all sessions from sessions.json
machine = event.get('machine', {})
session_data = event.get('session_data', {})
new_session = {
'session_id': event['session_id'],
'machine_id': machine.get('machine_id'),
'machine_name': machine.get('machine_name'),
'start_time': session_data.get('start_time'),
'end_time': None,
'total_duration_s': None,
'standby_duration_s': None,
'working_duration_s': None,
'start_power_w': event.get('power_w'),
'end_power_w': None,
'end_reason': None,
'status': 'running'
}
sessions.append(new_session)
self._write_sessions(sessions)
self.logger.info(f"Session {event['session_id'][:8]}... started")
return True
Returns:
List of session dicts (running and completed)
"""
try:
return self._read_sessions()
except Exception as e:
self.logger.error(f"Failed to load sessions: {e}")
return []
def _store_session_end(self, event: Dict) -> bool:
"""Update existing session with end data"""
def get_running_sessions(self) -> List[Dict]:
"""
Get all currently running sessions (end_time is None)
Returns:
List of running session dicts
"""
try:
sessions = self._read_sessions()
return [s for s in sessions if s.get('end_time') is None]
except Exception as e:
self.logger.error(f"Failed to get running sessions: {e}")
return []
def _handle_session_end(self, event: Dict) -> bool:
"""Handle session_end event"""
sessions = self._read_sessions()
session_id = event['session_id']

View File

@ -193,10 +193,25 @@ class IoTBridge:
self.logger.info("=" * 70 + "\n")
def _restore_running_sessions(self):
"""Restore running sessions after bridge restart"""
self.logger.info("Checking for running sessions to restore...")
running_sessions = self.event_storage.get_running_sessions()
if running_sessions:
self.logger.info(f"Found {len(running_sessions)} running session(s)")
self.session_detector.restore_state(running_sessions)
else:
self.logger.info("No running sessions to restore")
def start(self):
"""Start the IoT Bridge"""
self.logger.info("Starting IoT Bridge...")
# Restore running sessions after restart
self._restore_running_sessions()
# Create data directory
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)

View File

@ -53,11 +53,16 @@ class MQTTClient:
self.connected = False
self.topics = config.get('topics', [])
self.reconnect_delay = 1 # Start with 1 second
self.max_reconnect_delay = 60 # Max 60 seconds
self.reconnecting = False
def _on_connect(self, client, userdata, flags, rc, properties=None):
"""Callback when connected to MQTT broker"""
if rc == 0:
self.connected = True
self.reconnecting = False
self.reconnect_delay = 1 # Reset delay on successful connect
self.logger.info(f"Connected to MQTT Broker at {self.config['host']}:{self.config['port']}")
# Subscribe to all configured topics
@ -67,15 +72,35 @@ class MQTTClient:
else:
self.logger.error(f"Failed to connect to MQTT Broker, return code {rc}")
self.connected = False
self._schedule_reconnect()
def _on_disconnect(self, client, userdata, rc, properties=None):
"""Callback when disconnected from MQTT broker"""
self.connected = False
if rc != 0:
self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc}). Reconnecting...")
self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc})")
self._schedule_reconnect()
else:
self.logger.info("Disconnected from MQTT Broker")
def _schedule_reconnect(self):
"""Schedule automatic reconnection with exponential backoff"""
if self.reconnecting:
return # Already reconnecting
self.reconnecting = True
self.logger.info(f"Attempting reconnect in {self.reconnect_delay}s...")
time.sleep(self.reconnect_delay)
try:
self.client.reconnect()
self.logger.info("Reconnection initiated")
except Exception as e:
self.logger.error(f"Reconnection failed: {e}")
# Exponential backoff
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
self.reconnecting = False
def _on_message(self, client, userdata, msg):
"""Callback when message received"""
try:

View File

@ -68,6 +68,57 @@ class SessionDetector:
# State tracking per machine
self.machine_states = {} # machine_id -> state info
def restore_state(self, running_sessions: List[Dict]) -> None:
"""
Restore state from running sessions after bridge restart
Args:
running_sessions: List of session dicts with end_time=None from EventStorage
"""
if not running_sessions:
self.logger.info("No running sessions to restore")
return
self.logger.info(f"Restoring state for {len(running_sessions)} running sessions")
for session in running_sessions:
machine_id = session.get('machine_id')
session_id = session.get('session_id')
if not machine_id or not session_id:
self.logger.warning(f"Invalid session data, skipping: {session}")
continue
# Get machine config
config = self.machine_config.get(machine_id)
if not config:
self.logger.warning(f"No config for machine {machine_id}, cannot restore")
continue
# Parse start time
try:
start_time = datetime.fromisoformat(session['start_time'].replace('Z', '+00:00'))
except Exception as e:
self.logger.error(f"Invalid start_time in session {session_id}: {e}")
continue
# Restore machine state
self.machine_states[machine_id] = {
'state': SessionState.STANDBY, # Start in STANDBY, will update on next message
'state_since': start_time,
'current_session_id': session_id,
'session_start_time': start_time,
'last_power': session.get('start_power_w'),
'last_message_time': start_time,
'standby_duration_s': session.get('standby_duration_s', 0),
'working_duration_s': session.get('working_duration_s', 0),
'last_state_change': start_time,
}
self.logger.info(
f"✓ Restored session {session_id} for {session.get('machine_name', machine_id)}"
)
def check_timeouts(self) -> List[Dict]:
"""
Check all machines for message timeouts and end sessions if needed.
@ -133,72 +184,77 @@ class SessionDetector:
}
}
"""
# Extract machine info
machine = event.get('machine', {})
machine_id = machine.get('machine_id')
if not machine_id:
self.logger.warning("Event missing machine_id, skipping")
try:
# Extract machine info
machine = event.get('machine', {})
machine_id = machine.get('machine_id')
if not machine_id:
self.logger.warning("Event missing machine_id, skipping")
return None
# Get machine config
config = self.machine_config.get(machine_id)
if not config:
self.logger.warning(f"No config found for machine {machine_id}")
return None
# Extract power measurement
metrics = event.get('metrics', {})
power_w = metrics.get('power_w')
if power_w is None:
self.logger.debug(f"Event missing power_w, skipping")
return None
# Initialize machine state if needed
if machine_id not in self.machine_states:
self.machine_states[machine_id] = {
'state': SessionState.IDLE,
'state_since': datetime.now(timezone.utc),
'current_session_id': None,
'session_start_time': None,
'last_power': None,
'last_message_time': None,
'standby_duration_s': 0,
'working_duration_s': 0,
'last_state_change': datetime.now(timezone.utc),
}
state_info = self.machine_states[machine_id]
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
machine_name = machine.get('machine_name', 'Unknown')
# Check for timeout (no message received)
if state_info['last_message_time']:
time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds()
if time_since_last_message > config['message_timeout_s']:
if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]:
self.logger.warning(
f"{machine_name}: Message timeout "
f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END"
)
return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info)
# Update last message time
state_info['last_message_time'] = timestamp
# Update last power
state_info['last_power'] = power_w
# Process state machine
return self._process_state_machine(
machine_id=machine_id,
machine_name=machine.get('machine_name'),
power_w=power_w,
timestamp=timestamp,
config=config,
state_info=state_info
)
except Exception as e:
self.logger.error(f"Error processing event: {e}", exc_info=True)
return None
# Get machine config
config = self.machine_config.get(machine_id)
if not config:
self.logger.warning(f"No config found for machine {machine_id}")
return None
# Extract power measurement
metrics = event.get('metrics', {})
power_w = metrics.get('power_w')
if power_w is None:
self.logger.debug(f"Event missing power_w, skipping")
return None
# Initialize machine state if needed
if machine_id not in self.machine_states:
self.machine_states[machine_id] = {
'state': SessionState.IDLE,
'state_since': datetime.now(timezone.utc),
'current_session_id': None,
'session_start_time': None,
'last_power': None,
'last_message_time': None,
'standby_duration_s': 0,
'working_duration_s': 0,
'last_state_change': datetime.now(timezone.utc),
}
state_info = self.machine_states[machine_id]
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
machine_name = machine.get('machine_name', 'Unknown')
# Check for timeout (no message received)
if state_info['last_message_time']:
time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds()
if time_since_last_message > config['message_timeout_s']:
if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]:
self.logger.warning(
f"{machine_name}: Message timeout "
f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END"
)
return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info)
# Update last message time
state_info['last_message_time'] = timestamp
# Update last power
state_info['last_power'] = power_w
# Process state machine
return self._process_state_machine(
machine_id=machine_id,
machine_name=machine.get('machine_name'),
power_w=power_w,
timestamp=timestamp,
config=config,
state_info=state_info
)
def _process_state_machine(
self,
@ -343,8 +399,8 @@ class SessionDetector:
start_time = state_info['session_start_time']
duration_s = (timestamp - start_time).total_seconds()
standby_duration = state_info['standby_duration_s']
working_duration = state_info['working_duration_s']
standby_duration = state_info.get('standby_duration_s') or 0
working_duration = state_info.get('working_duration_s') or 0
self.logger.info(
f"🔴 {machine_name}: Session END (power drop) - "
@ -389,8 +445,8 @@ class SessionDetector:
start_time = state_info['session_start_time']
duration_s = (timestamp - start_time).total_seconds()
standby_duration = state_info['standby_duration_s']
working_duration = state_info['working_duration_s']
standby_duration = state_info.get('standby_duration_s') or 0
working_duration = state_info.get('working_duration_s') or 0
self.logger.warning(
f"⏱️ {machine_name}: Session END (TIMEOUT) - "

View File

@ -44,19 +44,24 @@ class ShellyParser:
Returns:
Parsed data dict or None if not a relevant message
"""
# Ignore debug logs
if 'debug/log' in topic:
try:
# Ignore debug logs
if 'debug/log' in topic:
return None
# Parse different message types
if '/status/pm1:0' in topic:
return self._parse_status_message(topic, payload)
elif '/events/rpc' in topic:
return self._parse_rpc_event(topic, payload)
elif '/telemetry' in topic:
return self._parse_telemetry(topic, payload)
return None
except Exception as e:
self.logger.error(f"Error parsing message from {topic}: {e}", exc_info=True)
return None
# Parse different message types
if '/status/pm1:0' in topic:
return self._parse_status_message(topic, payload)
elif '/events/rpc' in topic:
return self._parse_rpc_event(topic, payload)
elif '/telemetry' in topic:
return self._parse_telemetry(topic, payload)
return None
def _parse_status_message(self, topic: str, payload: dict) -> Dict:
"""
@ -74,57 +79,67 @@ class ShellyParser:
"temperature": {"tC": 35.2, "tF": 95.4}
}
"""
# Extract device ID from topic prefix
device_id = self._extract_device_id_from_topic(topic)
data = {
'message_type': 'status',
'device_id': device_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'voltage': payload.get('voltage'),
'current': payload.get('current'),
'apower': payload.get('apower'), # Active Power in Watts
'frequency': payload.get('freq'),
'total_energy': payload.get('aenergy', {}).get('total'),
}
self.logger.debug(f"Parsed status message: apower={data['apower']}W")
return data
try:
# Extract device ID from topic prefix
device_id = self._extract_device_id_from_topic(topic)
data = {
'message_type': 'status',
'device_id': device_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'voltage': payload.get('voltage'),
'current': payload.get('current'),
'apower': payload.get('apower'), # Active Power in Watts
'frequency': payload.get('freq'),
'total_energy': payload.get('aenergy', {}).get('total'),
}
self.logger.debug(f"Parsed status message: apower={data['apower']}W")
return data
except Exception as e:
self.logger.error(f"Error parsing status message: {e}", exc_info=True)
return None
def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]:
"""
Parse RPC NotifyStatus event
Topic: shellypmminig3/events/rpc
"""
if payload.get('method') != 'NotifyStatus':
try:
if payload.get('method') != 'NotifyStatus':
return None
device_id = payload.get('src', '').replace('shellypmminig3-', '')
params = payload.get('params', {})
pm_data = params.get('pm1:0', {})
# Get timestamp from params or use current time
ts = params.get('ts')
if ts:
timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z'
else:
timestamp = datetime.utcnow().isoformat() + 'Z'
data = {
'message_type': 'event',
'device_id': device_id,
'timestamp': timestamp,
'apower': pm_data.get('apower'),
'current': pm_data.get('current'),
'voltage': pm_data.get('voltage'),
}
# Only return if we have actual data
if data['apower'] is not None or data['current'] is not None:
self.logger.debug(f"Parsed RPC event: {pm_data}")
return data
return None
except Exception as e:
self.logger.error(f"Error parsing RPC event: {e}", exc_info=True)
return None
device_id = payload.get('src', '').replace('shellypmminig3-', '')
params = payload.get('params', {})
pm_data = params.get('pm1:0', {})
# Get timestamp from params or use current time
ts = params.get('ts')
if ts:
timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z'
else:
timestamp = datetime.utcnow().isoformat() + 'Z'
data = {
'message_type': 'event',
'device_id': device_id,
'timestamp': timestamp,
'apower': pm_data.get('apower'),
'current': pm_data.get('current'),
'voltage': pm_data.get('voltage'),
}
# Only return if we have actual data
if data['apower'] is not None or data['current'] is not None:
self.logger.debug(f"Parsed RPC event: {pm_data}")
return data
return None
def _parse_telemetry(self, topic: str, payload: dict) -> Dict:
"""