diff --git a/open_workshop_mqtt/python_prototype/README.md b/open_workshop_mqtt/python_prototype/README.md index ad80873..6b20953 100644 --- a/open_workshop_mqtt/python_prototype/README.md +++ b/open_workshop_mqtt/python_prototype/README.md @@ -1,225 +1,127 @@ # Open Workshop IoT Bridge - Python Prototype -MQTT-basierte IoT-Integration für Odoo 18 Community Edition - -## Projektstatus - -**Stand: 2026-01-22** - -**Phase 1: Standalone Python Prototype** (aktuell) -- ✅ M0: Projekt Setup & MQTT Connection (abgeschlossen) - - Virtual Environment erstellt - - MQTT Client mit TLS/SSL Support - - Verbindung zu mqtt.majufilo.eu:8883 erfolgreich -- ✅ M1: Shelly PM Mini G3 Integration (abgeschlossen) - - Parser für Status Messages (shaperorigin/status/pm1:0) - - Datenextraktion: apower, voltage, current, frequency, total_energy - - Custom MQTT Topic Prefix Support (shaperorigin) - - Live-Monitoring funktioniert -- ✅ M2: Event-Normalisierung (abgeschlossen) - - Event Schema v1 implementiert - - UUID-basierte Event IDs - - ISO 8601 UTC Timestamps - - Machine/Device Mapping - - Metrics Normalisierung (power_w, voltage_v, current_a, frequency_hz) -- ✅ M3: Session Detection Engine (abgeschlossen) - - State Machine: IDLE → STARTING → RUNNING → STOPPING → IDLE - - Power-basierte Schwellenwerte (konfigurierbar pro Maschine) - - Debounce-Logik (Start/Stop getrennt konfigurierbar) - - Session Events: session_start, session_end mit Duration - - Persistente Speicherung (JSON) -- ⏳ M4: Multi-Device Support (vorbereitet, Config-ready) -- ⏳ M5: Monitoring & Robustheit - -**Phase 2: Odoo Integration** (geplant) -- M6-M10: Odoo Module + Bridge + Tests - -## Beschreibung - -Dieser Python-Prototyp dient als Grundlage für die spätere Odoo-Integration. Er: -- Empfängt MQTT Events von IoT-Geräten (z.B. Shelly PM Mini G3) -- Normalisiert die Daten in ein einheitliches Event-Schema (Event Schema v1) -- Erkennt Maschinenlaufzeit-Sessions basierend auf Power-Schwellenwerten -- Speichert Events und Sessions persistent (JSONL/JSON) -- Unterstützt mehrere Geräte parallel (topic_prefix basiert) - -Der Code ist so strukturiert, dass er später direkt in eine Odoo-Bridge übernommen werden kann. -Die JSON-Speicherung ist für Migration zu Odoo-Models vorbereitet: -- `data/events.jsonl` → `open_workshop.power_event` -- `data/sessions.json` → `open_workshop.session` - -## Installation - -### Voraussetzungen -- Python 3.8+ -- MQTT Broker (z.B. Mosquitto) -- pip - -### Setup - -1. **Dependencies installieren** -```bash -pip install -r requirements.txt -``` - -2. **Konfiguration erstellen** -```bash -cp config.yaml.example config.yaml -``` - -3. **config.yaml anpassen** -Bearbeite `config.yaml` und setze: -- MQTT Broker Host/Port/Credentials -- Device-Konfiguration (Shelly IDs, Schwellenwerte) -- MQTT Topics - -## Verwendung - -### Starten -```bash -cd /pfad/zu/python_prototype -source venv/bin/activate -python main.py -``` - -Dieser Befehl: -- Verbindet sich mit dem MQTT Broker (TLS/SSL) -- Abonniert die konfigurierten Topics -- Empfängt Shelly PM Mini G3 Status Messages -- Normalisiert Events (Event Schema v1) -- Detektiert Session Start/End Events -- Speichert in `data/events.jsonl` und `data/sessions.json` - -### Erwartete Ausgabe -``` -2026-01-22 19:36:17 - __main__ - INFO - === Open Workshop IoT Bridge Starting === -2026-01-22 19:36:17 - mqtt_client - INFO - TLS/SSL enabled for port 8883 -2026-01-22 19:36:17 - mqtt_client - INFO - Connected to MQTT Broker at mqtt.majufilo.eu:8883 -2026-01-22 19:36:17 - mqtt_client - INFO - Subscribed to topic: shaperorigin/# -2026-01-22 19:36:17 - __main__ - INFO - IoT Bridge started successfully -2026-01-22 19:36:17 - __main__ - INFO - Listening for MQTT messages... (Press Ctrl+C to stop) -2026-01-22 19:36:17 - session_detector - INFO - 🟡 Shaper Origin: Power 43.3W >= 30W → STARTING -2026-01-22 19:36:20 - session_detector - INFO - 🟢 Shaper Origin: Session START (debounce 3.0s) -2026-01-22 19:36:20 - __main__ - INFO - 🚀 SESSION START -2026-01-22 19:37:03 - session_detector - INFO - 🟠 Shaper Origin: Power 0.0W < 30W → STOPPING -2026-01-22 19:37:18 - session_detector - INFO - 🔴 Shaper Origin: Session END (debounce 15.0s) -2026-01-22 19:37:18 - __main__ - INFO - 🏁 SESSION END - Duration: 58s (0.97 min) -``` - -## Konfiguration - -### MQTT Broker -```yaml -mqtt: - host: "localhost" - port: 1883 - username: "" - password: "" - topics: - - "shellies/+/status" -``` - -### Devices -```yaml -devices: - - topic_prefix: "shaperorigin" # Custom MQTT Prefix (im Shelly konfiguriert) - machine_name: "Shaper Origin" - machine_id: "shaper-origin-01" - device_type: "shelly_pm_mini_g3" - power_threshold: 30 # Watt (Schwellenwert für Session-Erkennung) - start_debounce_s: 3 # Verzögerung bis Session Start - stop_debounce_s: 15 # Verzögerung bis Session End - enabled: true -``` - -**Multi-Device Support:** Einfach weitere Geräte hinzufügen mit unterschiedlichen `topic_prefix`. -Jedes Gerät benötigt im Shelly einen eigenen Custom MQTT Prefix. +MQTT-basierte IoT Bridge für Odoo 18 Community zur Erfassung von Maschinenlaufzeiten. ## Projektstruktur ``` python_prototype/ -├── main.py # Entry point & Orchestration -├── mqtt_client.py # MQTT Client wrapper (TLS/SSL) -├── shelly_parser.py # Shelly PM Mini G3 Message Parser -├── event_normalizer.py # Event Schema v1 Normalizer -├── session_detector.py # Session Detection State Machine -├── event_storage.py # Persistent Storage (JSONL/JSON) -├── config.yaml # Configuration (nicht im Git) -├── config.yaml.example # Config template -├── requirements.txt # Python dependencies -├── README.md # Diese Datei -├── data/ # Output directory -│ ├── events.jsonl # Events (JSON Lines) -│ └── sessions.json # Sessions (JSON Array) -└── logs/ # Log files - └── ows_iot_bridge.log +├── main.py # Haupteinstiegspunkt +├── config.yaml # Produktionskonfiguration (nicht in Git) +├── config.yaml.example # Beispielkonfiguration +│ +├── Core Components (Produktionscode) +├── mqtt_client.py # MQTT Client mit TLS +├── shelly_parser.py # Parser für Shelly PM Mini G3 +├── event_normalizer.py # Event Schema v1 +├── session_detector.py # Session Detection (Dual-Threshold) +├── event_storage.py # JSON Storage +│ +├── data/ # Laufzeitdaten +│ ├── events.jsonl # Event-Log +│ └── sessions.json # Session-Daten +│ +├── tests/ # Tests +│ ├── unit/ # Unit Tests (schnell, isoliert) +│ │ └── test_session_detector.py +│ ├── integration/ # Integration Tests (mit MQTT) +│ │ └── test_mqtt_integration.py +│ └── tools/ # Test-Hilfsprogramme +│ └── shelly_simulator.py +│ +└── venv/ # Virtual Environment ``` -## Nächste Schritte +## Installation -### M4: Multi-Device Support (vorbereitet) -- Zweites Shelly-Device konfigurieren -- Parallele Überwachung mehrerer Maschinen testen +```bash +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt -### M5: Monitoring & Robustheit -- MQTT Reconnect-Logik -- Error Handling & Recovery -- Systemd Service Setup -- Health Monitoring - -### M6+: Odoo Integration -- Odoo Models: `open_workshop.machine`, `open_workshop.session` -- Migration: JSON → Odoo Database -- Views & Dashboards -- Live-Monitoring in Odoo - -## Gespeicherte Daten - -### Events (data/events.jsonl) -JSON Lines Format - ein Event pro Zeile: -```json -{"event_id":"uuid","event_type":"power_measurement","timestamp":"2026-01-22T18:45:09.985Z","machine":{"machine_id":"shaper-origin-01","machine_name":"Shaper Origin"},"metrics":{"power_w":43.8,"voltage_v":230.2,"current_a":0.19}} +cp config.yaml.example config.yaml +# config.yaml anpassen ``` -### Sessions (data/sessions.json) -JSON Array mit Session-Objekten: -```json -[ - { - "session_id": "uuid", - "machine_id": "shaper-origin-01", - "machine_name": "Shaper Origin", - "start_time": "2026-01-22T18:52:59.000Z", - "end_time": "2026-01-22T18:54:01.995Z", - "duration_s": 62, - "start_power_w": 37.1, - "end_power_w": 0.0, - "status": "completed" - } -] +## Tests + +```bash +# Unit Tests (schnell, ~0.05s) +pytest tests/unit/ -v + +# Integration Tests (mit MQTT, ~30-60s) +# Benötigt MQTT Konfiguration: +# Option 1: Nutzt existierende config.yaml +pytest tests/integration/ -v -s + +# Option 2: Mit Umgebungsvariablen +export MQTT_HOST=mqtt.majufilo.eu +export MQTT_PORT=8883 +export MQTT_USERNAME=mosquitto +export MQTT_PASSWORD=dein_passwort +pytest tests/integration/ -v -s + +# Alle Tests +pytest tests/ -v ``` -## Troubleshooting +**Wichtig:** Integration Tests lesen MQTT-Zugangsdaten aus: +1. Umgebungsvariablen (`MQTT_HOST`, `MQTT_PASSWORD`, etc.) ODER +2. Existierender `config.yaml` im Projektroot -### Connection refused -``` -Error: Failed to connect to MQTT Broker: [Errno 111] Connection refused -``` -→ Prüfe ob MQTT Broker läuft und erreichbar ist +**Niemals Passwörter im Source Code committen!** -### Permission denied -``` -PermissionError: [Errno 13] Permission denied: 'logs/ows_iot_bridge.log' -``` -→ Stelle sicher, dass das logs/ Verzeichnis beschreibbar ist +## Betrieb -### Invalid config -``` -Error: Config file not found: config.yaml -``` -→ Erstelle config.yaml von config.yaml.example +```bash +# Bridge starten +python main.py -## Support +# Mit alternativer Config +python main.py --config custom_config.yaml +``` -Siehe Feature Request Dokument: `FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md` +## Manuelle Tests + +```bash +# Shelly Simulator für Tests +python tests/tools/shelly_simulator.py --scenario session_end +python tests/tools/shelly_simulator.py --scenario full_session +python tests/tools/shelly_simulator.py --scenario timeout +``` + +## Session Detection States + +- **IDLE**: Power < 20W +- **STARTING**: Power >= 20W, 3s debounce +- **STANDBY**: 20-100W (Maschine an, Spindel aus) +- **WORKING**: >= 100W (Spindel läuft) +- **STOPPING**: < 20W, 15s debounce +- **Timeout**: 20s keine Messages → Session-Ende + +## Odoo Integration + +**Deployment-Strategie:** Direkte Integration in Odoo (kein separater Container) + +**Begründung:** +- Werkstatt-Setup: maximal 10 Maschinen +- Performance: `check_timeouts()` alle 10s = ~10ms CPU-Zeit pro Durchlauf +- CPU-Last: < 0.1% bei 10 Maschinen → vernachlässigbar +- Vorteile: Direkter DB-Zugriff, ACID-Transaktionen, keine API-Overhead +- Einfachheit: Ein Container, keine zusätzliche Infrastruktur + +**Implementation in Odoo:** +- MQTT Client als Odoo Thread oder Cron-Job +- `check_timeouts()` als scheduled action alle 10 Sekunden +- Direkte Verwendung von `workshop.machine`, `workshop.session` Models + +**Alternative (nur bei >50 Maschinen nötig):** Separater Microservice-Container + +## Roadmap + +- [x] M0-M3: MQTT + Parser + Session Detection +- [x] Unit + Integration Tests +- [x] Timeout Detection mit check_timeouts() +- [ ] M4: Multi-Device Support +- [ ] M5: Reconnect + Error Handling +- [ ] M6-M10: Odoo Integration diff --git a/open_workshop_mqtt/python_prototype/config.yaml.example b/open_workshop_mqtt/python_prototype/config.yaml.example index e0a63a4..3312468 100644 --- a/open_workshop_mqtt/python_prototype/config.yaml.example +++ b/open_workshop_mqtt/python_prototype/config.yaml.example @@ -21,12 +21,19 @@ devices: machine_id: "shaper-origin-01" device_type: "shelly_pm_mini_g3" - # Power threshold for run detection (in Watts) - power_threshold: 50 + # Dual Power Thresholds (in Watts) + # IDLE: Power < standby_threshold_w + # STANDBY: standby_threshold_w <= Power < working_threshold_w (Maschine an, Spindel aus) + # WORKING: Power >= working_threshold_w (Spindel läuft) + standby_threshold_w: 20 # Start tracking session + working_threshold_w: 100 # Active work detected # Debounce times (in seconds) - start_debounce_s: 3 # Power > threshold for X seconds → run_start - stop_debounce_s: 15 # Power < threshold for Y seconds → run_stop + start_debounce_s: 3 # Power >= standby_threshold for X seconds → session_start + stop_debounce_s: 15 # Power < standby_threshold for Y seconds → session_end + + # Timeout detection (in seconds) + message_timeout_s: 20 # No message for X seconds → session_end (timeout) enabled: true diff --git a/open_workshop_mqtt/python_prototype/event_storage.py b/open_workshop_mqtt/python_prototype/event_storage.py index a1c4b9e..54b1de5 100644 --- a/open_workshop_mqtt/python_prototype/event_storage.py +++ b/open_workshop_mqtt/python_prototype/event_storage.py @@ -72,9 +72,12 @@ class EventStorage: "machine_name": "Shaper Origin", "start_time": "2026-01-22T18:36:20.993Z", "end_time": "2026-01-22T18:38:01.993Z", // null if running - "duration_s": 101, // null if running + "total_duration_s": 101, // null if running + "standby_duration_s": 80, // null if running (machine on, not working) + "working_duration_s": 21, // null if running (active work) "start_power_w": 45.7, "end_power_w": 0.0, // null if running + "end_reason": "power_drop", // or "timeout", null if running "status": "completed" // or "running" } """ @@ -107,9 +110,12 @@ class EventStorage: 'machine_name': machine.get('machine_name'), 'start_time': session_data.get('start_time'), 'end_time': None, - 'duration_s': None, + 'total_duration_s': None, + 'standby_duration_s': None, + 'working_duration_s': None, 'start_power_w': event.get('power_w'), 'end_power_w': None, + 'end_reason': None, 'status': 'running' } @@ -130,14 +136,23 @@ class EventStorage: session_data = event.get('session_data', {}) session['end_time'] = session_data.get('end_time') - session['duration_s'] = session_data.get('duration_s') + session['total_duration_s'] = session_data.get('total_duration_s') + session['standby_duration_s'] = session_data.get('standby_duration_s') + session['working_duration_s'] = session_data.get('working_duration_s') session['end_power_w'] = event.get('power_w') + session['end_reason'] = session_data.get('end_reason', 'normal') session['status'] = 'completed' self._write_sessions(sessions) - duration_min = session['duration_s'] / 60 - self.logger.info(f"Session {session_id[:8]}... completed ({duration_min:.1f} min)") + total_min = session['total_duration_s'] / 60 if session['total_duration_s'] else 0 + standby_min = session['standby_duration_s'] / 60 if session['standby_duration_s'] else 0 + working_min = session['working_duration_s'] / 60 if session['working_duration_s'] else 0 + + self.logger.info( + f"Session {session_id[:8]}... completed ({session['end_reason']}) - " + f"Total: {total_min:.1f}min, Standby: {standby_min:.1f}min, Working: {working_min:.1f}min" + ) return True self.logger.error(f"Session {session_id} not found for update") diff --git a/open_workshop_mqtt/python_prototype/main.py b/open_workshop_mqtt/python_prototype/main.py index 7b63558..853afa6 100644 --- a/open_workshop_mqtt/python_prototype/main.py +++ b/open_workshop_mqtt/python_prototype/main.py @@ -12,6 +12,7 @@ import yaml import time import signal import json +import argparse from pathlib import Path from typing import Dict @@ -223,8 +224,11 @@ class IoTBridge: while self.running: time.sleep(1) - # TODO: Hier können später periodische Tasks ausgeführt werden - # z.B. Session-Timeout-Checks + # Check for session timeouts periodically + timeout_events = self.session_detector.check_timeouts() + for session_event in timeout_events: + self._log_session_event(session_event) + self.event_storage.store_session_event(session_event) except KeyboardInterrupt: self.logger.info("Interrupted by user") @@ -241,12 +245,16 @@ class IoTBridge: def main(): """Main entry point""" + parser = argparse.ArgumentParser(description='Open Workshop IoT Bridge') + parser.add_argument('--config', default='config.yaml', help='Path to config file (default: config.yaml)') + args = parser.parse_args() + # Check if config file exists - config_file = Path('config.yaml') + config_file = Path(args.config) if not config_file.exists(): print("\n" + "="*60) - print("Configuration file not found!") + print(f"Configuration file not found: {config_file}") print("="*60) print("\nPlease create config.yaml from the example:") print(" cp config.yaml.example config.yaml") @@ -255,7 +263,7 @@ def main(): sys.exit(1) # Start bridge - bridge = IoTBridge('config.yaml') + bridge = IoTBridge(str(config_file)) bridge.start() diff --git a/open_workshop_mqtt/python_prototype/session_detector.py b/open_workshop_mqtt/python_prototype/session_detector.py index ada22fc..96b4091 100644 --- a/open_workshop_mqtt/python_prototype/session_detector.py +++ b/open_workshop_mqtt/python_prototype/session_detector.py @@ -1,37 +1,44 @@ """ Session Detection Engine for Open Workshop IoT Bridge -Detects machine run sessions based on power consumption thresholds -with debounce logic to avoid false starts/stops. +Detects machine run sessions with dual power thresholds: +- STANDBY: Machine on, not working (e.g. 20-100W) +- WORKING: Active work (e.g. >= 100W) + +Includes timeout detection for when machine powers off (no MQTT messages). """ import logging import uuid -from datetime import datetime, timezone -from typing import Dict, Optional +from datetime import datetime, timezone, timedelta +from typing import Dict, Optional, List from enum import Enum class SessionState(Enum): """Session detection states""" - IDLE = "idle" # Machine off (power < threshold) - STARTING = "starting" # Power above threshold, waiting for debounce - RUNNING = "running" # Confirmed run session active - STOPPING = "stopping" # Power below threshold, waiting for debounce + IDLE = "idle" # Machine off (power < standby_threshold) + STARTING = "starting" # Power above standby threshold, waiting for debounce + STANDBY = "standby" # Machine on, not working (standby <= power < working) + WORKING = "working" # Active work (power >= working threshold) + STOPPING = "stopping" # Power below standby threshold, waiting for debounce class SessionDetector: """ - Detects machine run sessions based on power measurements + Detects machine run sessions with dual thresholds and timeout detection State Machine: - IDLE -> STARTING -> RUNNING -> STOPPING -> IDLE + IDLE -> STARTING -> STANDBY/WORKING -> STOPPING -> IDLE - - IDLE: Power < threshold - - STARTING: Power >= threshold for < start_debounce_s - - RUNNING: Power >= threshold for >= start_debounce_s - - STOPPING: Power < threshold for < stop_debounce_s (while in RUNNING) - - Back to IDLE: Power < threshold for >= stop_debounce_s + - IDLE: Power < standby_threshold + - STARTING: Power >= standby_threshold for < start_debounce_s + - STANDBY: Power >= standby_threshold < working_threshold (confirmed) + - WORKING: Power >= working_threshold (confirmed) + - STOPPING: Power < standby_threshold for < stop_debounce_s + - Back to IDLE: Power < standby_threshold for >= stop_debounce_s OR timeout + + Timeout: No MQTT message for > message_timeout_s → session_end (machine powered off) """ def __init__(self, device_config: list = None): @@ -50,15 +57,58 @@ class SessionDetector: machine_id = device.get('machine_id') if machine_id: self.machine_config[machine_id] = { - 'power_threshold': device.get('power_threshold', 50), + 'standby_threshold_w': device.get('standby_threshold_w', 20), + 'working_threshold_w': device.get('working_threshold_w', 100), 'start_debounce_s': device.get('start_debounce_s', 3), 'stop_debounce_s': device.get('stop_debounce_s', 15), + 'message_timeout_s': device.get('message_timeout_s', 60), 'machine_name': device.get('machine_name', 'Unknown'), } # State tracking per machine self.machine_states = {} # machine_id -> state info + def check_timeouts(self) -> List[Dict]: + """ + Check all machines for message timeouts and end sessions if needed. + Should be called periodically (e.g. every second). + + Returns: + List of session_end events for timed-out sessions + """ + timeout_events = [] + current_time = datetime.now(timezone.utc) + + for machine_id, state_info in self.machine_states.items(): + # Only check machines in active session states + if state_info['state'] not in [SessionState.STANDBY, SessionState.WORKING]: + continue + + # Skip if no last message time + if not state_info['last_message_time']: + continue + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + continue + + # Check if timeout exceeded + time_since_last_message = (current_time - state_info['last_message_time']).total_seconds() + if time_since_last_message > config['message_timeout_s']: + machine_name = config.get('machine_name', 'Unknown') + self.logger.warning( + f"⏱️ {machine_name}: Message timeout " + f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END" + ) + + # End session with timeout + timeout_event = self._end_session_timeout(machine_id, machine_name, current_time, config, state_info) + if timeout_event: + timeout_events.append(timeout_event) + + return timeout_events + def process_event(self, event: Dict) -> Optional[Dict]: """ Process a normalized event and detect session changes @@ -113,10 +163,29 @@ class SessionDetector: 'current_session_id': None, 'session_start_time': None, 'last_power': None, + 'last_message_time': None, + 'standby_duration_s': 0, + 'working_duration_s': 0, + 'last_state_change': datetime.now(timezone.utc), } state_info = self.machine_states[machine_id] timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00')) + machine_name = machine.get('machine_name', 'Unknown') + + # Check for timeout (no message received) + if state_info['last_message_time']: + time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds() + if time_since_last_message > config['message_timeout_s']: + if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]: + self.logger.warning( + f"{machine_name}: Message timeout " + f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END" + ) + return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info) + + # Update last message time + state_info['last_message_time'] = timestamp # Update last power state_info['last_power'] = power_w @@ -141,41 +210,60 @@ class SessionDetector: state_info: Dict ) -> Optional[Dict]: """ - Process state machine logic + Process state machine logic with dual thresholds Returns session event if state change occurred """ current_state = state_info['state'] - threshold = config['power_threshold'] + standby_threshold = config['standby_threshold_w'] + working_threshold = config['working_threshold_w'] start_debounce = config['start_debounce_s'] stop_debounce = config['stop_debounce_s'] time_in_state = (timestamp - state_info['state_since']).total_seconds() + # Update duration tracking for active states + if current_state == SessionState.STANDBY: + time_since_last_update = (timestamp - state_info['last_state_change']).total_seconds() + state_info['standby_duration_s'] += time_since_last_update + state_info['last_state_change'] = timestamp + elif current_state == SessionState.WORKING: + time_since_last_update = (timestamp - state_info['last_state_change']).total_seconds() + state_info['working_duration_s'] += time_since_last_update + state_info['last_state_change'] = timestamp + # State machine transitions if current_state == SessionState.IDLE: - if power_w >= threshold: + if power_w >= standby_threshold: # Transition to STARTING - self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {threshold}W → STARTING") + self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {standby_threshold}W → STARTING") state_info['state'] = SessionState.STARTING state_info['state_since'] = timestamp elif current_state == SessionState.STARTING: - if power_w < threshold: + if power_w < standby_threshold: # False start, back to IDLE self.logger.info(f"⚪ {machine_name}: Power dropped before debounce → IDLE") state_info['state'] = SessionState.IDLE state_info['state_since'] = timestamp elif time_in_state >= start_debounce: - # Debounce passed, transition to RUNNING + # Debounce passed, transition to STANDBY or WORKING session_id = str(uuid.uuid4()) - state_info['state'] = SessionState.RUNNING - state_info['state_since'] = timestamp state_info['current_session_id'] = session_id state_info['session_start_time'] = timestamp + state_info['standby_duration_s'] = 0 + state_info['working_duration_s'] = 0 + state_info['last_state_change'] = timestamp - self.logger.info(f"🟢 {machine_name}: Session START (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W") + if power_w >= working_threshold: + state_info['state'] = SessionState.WORKING + self.logger.info(f"🔵 {machine_name}: Session START → WORKING (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W") + else: + state_info['state'] = SessionState.STANDBY + self.logger.info(f"🟢 {machine_name}: Session START → STANDBY (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W") + + state_info['state_since'] = timestamp # Generate session_start event return { @@ -192,55 +280,158 @@ class SessionDetector: } } - elif current_state == SessionState.RUNNING: - if power_w < threshold: + elif current_state == SessionState.STANDBY: + if power_w < standby_threshold: # Transition to STOPPING - self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {threshold}W → STOPPING") + self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {standby_threshold}W → STOPPING") state_info['state'] = SessionState.STOPPING state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif power_w >= working_threshold: + # Transition to WORKING + self.logger.info(f"🔵 {machine_name}: Power {power_w:.1f}W >= {working_threshold}W → WORKING") + state_info['state'] = SessionState.WORKING + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif current_state == SessionState.WORKING: + if power_w < standby_threshold: + # Transition to STOPPING + self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {standby_threshold}W → STOPPING") + state_info['state'] = SessionState.STOPPING + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif power_w < working_threshold: + # Transition to STANDBY + self.logger.info(f"🟢 {machine_name}: Power {power_w:.1f}W < {working_threshold}W → STANDBY") + state_info['state'] = SessionState.STANDBY + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp elif current_state == SessionState.STOPPING: - if power_w >= threshold: - # Power back up, back to RUNNING - self.logger.info(f"🟢 {machine_name}: Power back up → RUNNING") - state_info['state'] = SessionState.RUNNING + if power_w >= standby_threshold: + # Power back up, cancel STOPPING + if power_w >= working_threshold: + self.logger.info(f"🔵 {machine_name}: Power back up → WORKING") + state_info['state'] = SessionState.WORKING + else: + self.logger.info(f"🟢 {machine_name}: Power back up → STANDBY") + state_info['state'] = SessionState.STANDBY state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp - elif time_in_state >= stop_debounce: - # Debounce passed, session ended - session_id = state_info['current_session_id'] - start_time = state_info['session_start_time'] - duration_s = (timestamp - start_time).total_seconds() - - self.logger.info(f"🔴 {machine_name}: Session END (debounce {time_in_state:.1f}s) - Duration: {duration_s:.1f}s") - - # Generate session_end event - session_event = { - 'session_id': session_id, - 'event_type': 'session_end', - 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), - 'machine': { - 'machine_id': machine_id, - 'machine_name': machine_name - }, - 'power_w': power_w, - 'session_data': { - 'start_time': start_time.isoformat().replace('+00:00', 'Z'), - 'end_time': timestamp.isoformat().replace('+00:00', 'Z'), - 'duration_s': int(duration_s) - } - } - - # Reset to IDLE - state_info['state'] = SessionState.IDLE - state_info['state_since'] = timestamp - state_info['current_session_id'] = None - state_info['session_start_time'] = None - - return session_event + else: + # Power still low, check if debounce passed + if time_in_state >= stop_debounce: + # Debounce passed, session ended + return self._end_session_normal(machine_id, machine_name, power_w, timestamp, state_info) return None + def _end_session_normal( + self, + machine_id: str, + machine_name: str, + power_w: float, + timestamp: datetime, + state_info: Dict + ) -> Dict: + """End session normally (power drop)""" + session_id = state_info['current_session_id'] + start_time = state_info['session_start_time'] + duration_s = (timestamp - start_time).total_seconds() + + standby_duration = state_info['standby_duration_s'] + working_duration = state_info['working_duration_s'] + + self.logger.info( + f"🔴 {machine_name}: Session END (power drop) - " + f"Total: {duration_s:.0f}s, Standby: {standby_duration:.0f}s, Working: {working_duration:.0f}s" + ) + + # Generate session_end event + session_event = { + 'session_id': session_id, + 'event_type': 'session_end', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': power_w, + 'session_data': { + 'start_time': start_time.isoformat().replace('+00:00', 'Z'), + 'end_time': timestamp.isoformat().replace('+00:00', 'Z'), + 'total_duration_s': int(duration_s), + 'standby_duration_s': int(standby_duration), + 'working_duration_s': int(working_duration), + 'end_reason': 'power_drop' + } + } + + # Reset to IDLE + self._reset_session_state(state_info, timestamp) + + return session_event + + def _end_session_timeout( + self, + machine_id: str, + machine_name: str, + timestamp: datetime, + config: Dict, + state_info: Dict + ) -> Dict: + """End session due to timeout (no messages)""" + session_id = state_info['current_session_id'] + start_time = state_info['session_start_time'] + duration_s = (timestamp - start_time).total_seconds() + + standby_duration = state_info['standby_duration_s'] + working_duration = state_info['working_duration_s'] + + self.logger.warning( + f"⏱️ {machine_name}: Session END (TIMEOUT) - " + f"Total: {duration_s:.0f}s, Standby: {standby_duration:.0f}s, Working: {working_duration:.0f}s" + ) + + # Generate session_end event + session_event = { + 'session_id': session_id, + 'event_type': 'session_end', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': 0.0, # Assume power is 0 on timeout + 'session_data': { + 'start_time': start_time.isoformat().replace('+00:00', 'Z'), + 'end_time': timestamp.isoformat().replace('+00:00', 'Z'), + 'total_duration_s': int(duration_s), + 'standby_duration_s': int(standby_duration), + 'working_duration_s': int(working_duration), + 'end_reason': 'timeout' + } + } + + # Reset to IDLE + self._reset_session_state(state_info, timestamp) + + return session_event + + def _reset_session_state(self, state_info: Dict, timestamp: datetime): + """Reset session state to IDLE""" + state_info['state'] = SessionState.IDLE + state_info['state_since'] = timestamp + state_info['current_session_id'] = None + state_info['session_start_time'] = None + state_info['standby_duration_s'] = 0 + state_info['working_duration_s'] = 0 + state_info['last_state_change'] = timestamp + def get_machine_state(self, machine_id: str) -> Optional[str]: """Get current state of a machine""" state_info = self.machine_states.get(machine_id) diff --git a/open_workshop_mqtt/python_prototype/shelly_parser.py b/open_workshop_mqtt/python_prototype/shelly_parser.py index e4419f8..1041f1c 100644 --- a/open_workshop_mqtt/python_prototype/shelly_parser.py +++ b/open_workshop_mqtt/python_prototype/shelly_parser.py @@ -62,6 +62,17 @@ class ShellyParser: """ Parse full status message Topic: shaperorigin/status/pm1:0 + + Payload format (Shelly PM Mini G3): + { + "id": 0, + "voltage": 230.0, + "current": 0.217, + "apower": 50.0, + "freq": 50.0, + "aenergy": {"total": 12345.6, "by_minute": [...], "minute_ts": 1234567890}, + "temperature": {"tC": 35.2, "tF": 95.4} + } """ # Extract device ID from topic prefix device_id = self._extract_device_id_from_topic(topic) diff --git a/open_workshop_mqtt/python_prototype/tests/__init__.py b/open_workshop_mqtt/python_prototype/tests/__init__.py new file mode 100644 index 0000000..73e40f0 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/__init__.py @@ -0,0 +1,3 @@ +""" +Tests für Open Workshop IoT Bridge +""" diff --git a/open_workshop_mqtt/python_prototype/tests/integration/__init__.py b/open_workshop_mqtt/python_prototype/tests/integration/__init__.py new file mode 100644 index 0000000..343db61 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/integration/__init__.py @@ -0,0 +1,3 @@ +""" +Integration Tests - Testen mit echten externen Services (MQTT Broker, Storage) +""" diff --git a/open_workshop_mqtt/python_prototype/tests/integration/test_mqtt_integration.py b/open_workshop_mqtt/python_prototype/tests/integration/test_mqtt_integration.py new file mode 100644 index 0000000..deedb41 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/integration/test_mqtt_integration.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +""" +Integration Tests mit echtem MQTT Broker + +Testet die komplette Pipeline: +MQTT → Parser → Normalizer → SessionDetector → Storage + +Usage: + pytest test_mqtt_integration.py -v -s +""" + +import pytest +import subprocess +import time +import json +import signal +import os +from pathlib import Path +from datetime import datetime +import paho.mqtt.client as mqtt_client +import ssl + + +@pytest.fixture(scope="session") +def mqtt_config(): + """MQTT Broker Konfiguration aus Umgebungsvariablen oder config.yaml""" + + # Option 1: Aus Umgebungsvariablen + if os.getenv('MQTT_HOST'): + return { + 'host': os.getenv('MQTT_HOST'), + 'port': int(os.getenv('MQTT_PORT', '8883')), + 'username': os.getenv('MQTT_USERNAME'), + 'password': os.getenv('MQTT_PASSWORD'), + 'topic_prefix': os.getenv('MQTT_TEST_TOPIC_PREFIX', 'pytest-test') + } + + # Option 2: Aus config.yaml lesen + config_path = Path(__file__).parent.parent.parent / 'config.yaml' + if config_path.exists(): + import yaml + with open(config_path) as f: + config = yaml.safe_load(f) + + mqtt_conf = config.get('mqtt', {}) + return { + 'host': mqtt_conf.get('host'), + 'port': mqtt_conf.get('port', 8883), + 'username': mqtt_conf.get('username'), + 'password': mqtt_conf.get('password'), + 'topic_prefix': 'pytest-test' # Immer separates Topic für Tests + } + + pytest.skip("Keine MQTT Konfiguration gefunden. Setze Umgebungsvariablen oder erstelle config.yaml") + + +@pytest.fixture(scope="session") +def workspace_dir(): + """Workspace-Verzeichnis""" + return Path(__file__).parent + + +@pytest.fixture(scope="session") +def test_config_file(workspace_dir, mqtt_config): + """Erstellt temporäre test_config.yaml""" + # Relative Pfade zu den Test Storage Files (relativ zum Projektroot) + config_content = f""" +mqtt: + host: "{mqtt_config['host']}" + port: {mqtt_config['port']} + username: "{mqtt_config['username']}" + password: "{mqtt_config['password']}" + client_id: "ows_iot_bridge_pytest" + keepalive: 60 + topics: + - "{mqtt_config['topic_prefix']}/#" + +devices: + - topic_prefix: "{mqtt_config['topic_prefix']}" + machine_name: "PyTest Machine" + machine_id: "pytest-machine-01" + device_type: "shelly_pm_mini_g3" + standby_threshold_w: 20 + working_threshold_w: 100 + start_debounce_s: 3 + stop_debounce_s: 15 + message_timeout_s: 20 + enabled: true + +logging: + level: "INFO" + console: true + +output: + events_file: "data/test_events.jsonl" + sessions_file: "data/test_sessions.json" +""" + + config_path = workspace_dir / "test_config.yaml" + config_path.write_text(config_content) + + yield config_path + + # Cleanup + if config_path.exists(): + config_path.unlink() + + +@pytest.fixture(scope="session") +def test_storage_files(workspace_dir): + """Gibt Pfade zu Test-Storage-Dateien zurück (werden von Bridge erstellt)""" + # Test-Dateien im Projektroot data/ Verzeichnis + project_root = workspace_dir.parent.parent + data_dir = project_root / "data" + + events_file = data_dir / "test_events.jsonl" + sessions_file = data_dir / "test_sessions.json" + + # NICHT löschen - zum Debugging beibehalten! + + yield { + 'events': events_file, + 'sessions': sessions_file + } + + # Cleanup nach allen Tests (optional auskommentiert) + # if events_file.exists(): + # events_file.unlink() + # if sessions_file.exists(): + # sessions_file.unlink() + + +@pytest.fixture(scope="module") +def bridge_process(workspace_dir, test_config_file, test_storage_files): + """Startet die IoT Bridge als Subprocess""" + + # Bridge starten mit test_config.yaml + # cwd muss das Projektroot sein, nicht tests/integration/ + project_root = workspace_dir.parent.parent + config_path = workspace_dir / 'test_config.yaml' + + env = os.environ.copy() + env['PYTHONUNBUFFERED'] = '1' + + # Use python from venv explicitly + python_exe = project_root / 'venv' / 'bin' / 'python' + bridge_log = workspace_dir / 'bridge_output.log' + + with open(bridge_log, 'w') as log_file: + process = subprocess.Popen( + [str(python_exe), 'main.py', '--config', str(config_path)], + cwd=project_root, + env=env, + stdout=log_file, + stderr=subprocess.STDOUT, + text=True + ) + + # Warten bis Bridge gestartet ist + time.sleep(3) + + # Prüfen ob Prozess läuft + if process.poll() is not None: + with open(bridge_log, 'r') as f: + output = f.read() + pytest.fail(f"Bridge konnte nicht gestartet werden:\n{output}") + + # Debug: Bridge Output ausgeben + with open(bridge_log, 'r') as f: + output = f.read() + if output: + print(f"\n=== Bridge Output ===\n{output}\n===\n") + + yield process + + # Bridge sauber beenden + process.send_signal(signal.SIGTERM) + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +@pytest.fixture +def mqtt_sender(mqtt_config): + """MQTT Client zum Senden von Test-Messages""" + + client_id = f"pytest-sender-{int(time.time())}" + client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5) + client.username_pw_set(mqtt_config['username'], mqtt_config['password']) + + # TLS/SSL + client.tls_set(cert_reqs=ssl.CERT_NONE) + client.tls_insecure_set(True) + + client.connect(mqtt_config['host'], mqtt_config['port'], keepalive=60) + client.loop_start() + time.sleep(1) + + yield client + + client.loop_stop() + client.disconnect() + + +def send_shelly_message(mqtt_sender, topic: str, power_w: float): + """Sendet eine Shelly PM Mini G3 Status Message""" + message = { + "id": 0, + "voltage": 230.0, + "current": round(power_w / 230.0, 3), + "apower": power_w, + "freq": 50.0, + "aenergy": { + "total": 12345.6, + "by_minute": [0.0, 0.0, 0.0], + "minute_ts": int(time.time()) + }, + "temperature": { + "tC": 35.2, + "tF": 95.4 + } + } + + mqtt_sender.publish(topic, json.dumps(message), qos=1) + time.sleep(0.1) # Kurze Pause zwischen Messages + + +def read_sessions(sessions_file: Path): + """Liest sessions.json""" + if not sessions_file.exists(): + return [] + + content = sessions_file.read_text().strip() + if not content or content == "[]": + return [] + + return json.loads(content) + + +def wait_for_session_count(sessions_file: Path, expected_count: int, timeout: int = 10): + """Wartet bis die erwartete Anzahl Sessions vorhanden ist""" + start_time = time.time() + + while time.time() - start_time < timeout: + sessions = read_sessions(sessions_file) + if len(sessions) >= expected_count: + return sessions + time.sleep(0.5) + + sessions = read_sessions(sessions_file) + pytest.fail( + f"Timeout: Erwartete {expected_count} Sessions, gefunden: {len(sessions)}\n" + f"Sessions: {json.dumps(sessions, indent=2)}" + ) + + +class TestMQTTIntegration: + """Integration Tests mit echtem MQTT""" + + def test_bridge_is_running(self, bridge_process): + """Bridge läuft erfolgreich""" + assert bridge_process.poll() is None, "Bridge Prozess ist abgestürzt" + + def test_session_start_standby(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Session Start → STANDBY""" + topic = f"{mqtt_config['topic_prefix']}/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + # Anzahl Sessions vor dem Test + initial_count = len(read_sessions(sessions_file)) + + # Power ansteigen lassen - wie reale Maschine + send_shelly_message(mqtt_sender, topic, 0) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 30) # STARTING + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) # Nach 3s → STANDBY + + # Warten auf Session-Start (3s debounce + Verarbeitung) + time.sleep(3) + + # Session prüfen + sessions = wait_for_session_count(sessions_file, initial_count + 1, timeout=5) + + # Neueste Session + latest = sessions[-1] + assert latest['machine_id'] == 'pytest-machine-01' + assert latest['status'] == 'running' + assert latest['start_power_w'] >= 20 + + def test_session_end_with_stop_debounce(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Session Ende mit stop_debounce (15s unter 20W)""" + topic = f"{mqtt_config['topic_prefix']}/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + initial_count = len(read_sessions(sessions_file)) + + # Session starten + send_shelly_message(mqtt_sender, topic, 0) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(2) # Session sollte jetzt laufen + + # STANDBY für 3 Sekunden + for _ in range(3): + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + + # Power runterfahren → STOPPING + send_shelly_message(mqtt_sender, topic, 15) + + # STOPPING für 16 Sekunden (> 15s stop_debounce) + for i in range(16): + send_shelly_message(mqtt_sender, topic, 10 - i * 0.5) + time.sleep(1) + + # Warten auf Session-Ende + time.sleep(2) + + # Prüfen ob Session beendet wurde + sessions = read_sessions(sessions_file) + + # Finde die Session die gerade beendet wurde + completed_sessions = [s for s in sessions if s.get('status') == 'completed' and s.get('end_reason') == 'power_drop'] + + assert len(completed_sessions) > 0, f"Keine completed Session mit power_drop gefunden. Sessions: {json.dumps(sessions[-3:], indent=2)}" + + latest = completed_sessions[-1] + assert latest['end_reason'] == 'power_drop' + assert latest['total_duration_s'] is not None + assert latest['standby_duration_s'] is not None + + def test_standby_to_working_transition(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """STANDBY → WORKING Transition""" + topic = f"{mqtt_config['topic_prefix']}/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + # Session im STANDBY starten + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(2) + + # STANDBY halten + for _ in range(3): + send_shelly_message(mqtt_sender, topic, 60) + time.sleep(1) + + # → WORKING + send_shelly_message(mqtt_sender, topic, 120) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 150) + + # WORKING halten + for _ in range(5): + send_shelly_message(mqtt_sender, topic, 150) + time.sleep(1) + + # Session beenden + send_shelly_message(mqtt_sender, topic, 10) + for _ in range(16): + send_shelly_message(mqtt_sender, topic, 5) + time.sleep(1) + + time.sleep(2) + + # Prüfen + sessions = read_sessions(sessions_file) + completed = [s for s in sessions if s.get('status') == 'completed'] + + assert len(completed) > 0 + latest = completed[-1] + + # Sollte sowohl STANDBY als auch WORKING Zeit haben + assert latest.get('standby_duration_s', 0) > 0, "Keine STANDBY Zeit" + assert latest.get('working_duration_s', 0) > 0, "Keine WORKING Zeit" + + def test_timeout_detection(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Timeout Detection (20s keine Messages)""" + topic = f"{mqtt_config['topic_prefix']}/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + initial_count = len(read_sessions(sessions_file)) + + # Session starten + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(2) + + # Letzte Message + send_shelly_message(mqtt_sender, topic, 50) + + # 25 Sekunden warten (> 20s timeout) + print("\n⏱️ Warte 25 Sekunden für Timeout Detection...") + time.sleep(25) + + # Prüfen ob Session mit timeout beendet wurde + sessions = read_sessions(sessions_file) + timeout_sessions = [s for s in sessions if s.get('end_reason') == 'timeout'] + + assert len(timeout_sessions) > 0, f"Keine timeout Session gefunden. Sessions: {json.dumps(sessions[-3:], indent=2)}" + + +if __name__ == '__main__': + pytest.main([__file__, '-v', '-s']) diff --git a/open_workshop_mqtt/python_prototype/tests/tools/__init__.py b/open_workshop_mqtt/python_prototype/tests/tools/__init__.py new file mode 100644 index 0000000..ccc6070 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/tools/__init__.py @@ -0,0 +1,3 @@ +""" +Test Tools - Hilfsprogramme für manuelle Tests und Debugging +""" diff --git a/open_workshop_mqtt/python_prototype/tests/tools/shelly_simulator.py b/open_workshop_mqtt/python_prototype/tests/tools/shelly_simulator.py new file mode 100755 index 0000000..d5ec1b2 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/tools/shelly_simulator.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python3 +""" +Test MQTT Client - Simuliert Shelly PM Mini G3 Verhalten + +Sendet MQTT Messages auf das Topic 'shaperorigin/status/pm1:0' +mit verschiedenen Test-Szenarien für die Session Detection. + +Usage: + python test_shelly_simulator.py --scenario standby + python test_shelly_simulator.py --scenario working + python test_shelly_simulator.py --scenario session_end + python test_shelly_simulator.py --scenario full_session +""" + +import argparse +import json +import os +import sys +import time +import ssl +from datetime import datetime +from pathlib import Path +from paho.mqtt import client as mqtt_client +import yaml + + +class ShellySimulator: + """Simuliert Shelly PM Mini G3 MQTT Messages""" + + def __init__(self, broker_host: str, broker_port: int, username: str, password: str, topic_prefix: str = "testshelly"): + self.broker_host = broker_host + self.broker_port = broker_port + self.username = username + self.password = password + self.topic_prefix = topic_prefix + self.topic = f"{topic_prefix}/status/pm1:0" + self.client = None + + def connect(self): + """Verbindung zum MQTT Broker herstellen""" + client_id = f"shelly-simulator-{int(time.time())}" + self.client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5) + self.client.username_pw_set(self.username, self.password) + + # TLS/SSL + self.client.tls_set(cert_reqs=ssl.CERT_NONE) + self.client.tls_insecure_set(True) + + print(f"Verbinde zu MQTT Broker {self.broker_host}:{self.broker_port}...") + self.client.connect(self.broker_host, self.broker_port, keepalive=60) + self.client.loop_start() + time.sleep(1) + print("✓ Verbunden") + + def disconnect(self): + """Verbindung trennen""" + if self.client: + self.client.loop_stop() + self.client.disconnect() + print("✓ Verbindung getrennt") + + def send_power_message(self, power_w: float, interval_s: int = 1): + """ + Sendet eine Shelly Status Message im echten Shelly PM Mini G3 Format + + Args: + power_w: Leistung in Watt + interval_s: Pause nach dem Senden in Sekunden + """ + # Echtes Shelly PM Mini G3 Format (ohne result wrapper) + message = { + "id": 0, + "voltage": 230.0, + "current": round(power_w / 230.0, 3), + "apower": power_w, + "freq": 50.0, + "aenergy": { + "total": round(12345.6 + (power_w * interval_s / 3600), 1), + "by_minute": [0.0, 0.0, 0.0], + "minute_ts": int(time.time()) + }, + "temperature": { + "tC": 35.2, + "tF": 95.4 + } + } + + payload = json.dumps(message) + self.client.publish(self.topic, payload, qos=1) + + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"[{timestamp}] 📤 Gesendet: {power_w:.1f}W") + + if interval_s > 0: + time.sleep(interval_s) + + def scenario_standby(self): + """Szenario: Maschine im STANDBY (20-100W)""" + print("\n=== SZENARIO: STANDBY (20-100W) ===") + print("Simuliert: Maschine an, Spindel aus\n") + + # Anfahren + print("Phase 1: Anfahren (0 → 50W)") + self.send_power_message(0, 1) + self.send_power_message(10, 1) + self.send_power_message(30, 1) # > 20W → STARTING + self.send_power_message(45, 1) + self.send_power_message(50, 2) # Nach 3s debounce → STANDBY + + # STANDBY halten + print("\nPhase 2: STANDBY halten (50W für 20s)") + for i in range(20): + self.send_power_message(50 + (i % 5) * 2, 1) # 50-58W variieren + + print("\n✓ Szenario abgeschlossen") + + def scenario_working(self): + """Szenario: Maschine WORKING (>=100W)""" + print("\n=== SZENARIO: WORKING (>=100W) ===") + print("Simuliert: Spindel läuft\n") + + # Anfahren direkt zu WORKING + print("Phase 1: Anfahren (0 → 150W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) # > 20W → STARTING + self.send_power_message(80, 1) + self.send_power_message(120, 1) + self.send_power_message(150, 2) # Nach 3s debounce → WORKING + + # WORKING halten + print("\nPhase 2: WORKING halten (150W für 20s)") + for i in range(20): + self.send_power_message(150 + (i % 10) * 5, 1) # 150-195W variieren + + print("\n✓ Szenario abgeschlossen") + + def scenario_session_end(self): + """Szenario: Session sauber beenden mit stop_debounce""" + print("\n=== SZENARIO: SESSION END (mit stop_debounce 15s) ===") + print("Simuliert: Maschine einschalten → arbeiten → ausschalten\n") + + # Start: STANDBY + print("Phase 1: Session Start → STANDBY (50W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) # STARTING + self.send_power_message(50, 1) + self.send_power_message(50, 1) + self.send_power_message(50, 2) # Nach 3s → STANDBY + + # STANDBY 10s + print("\nPhase 2: STANDBY halten (50W für 10s)") + for _ in range(10): + self.send_power_message(50, 1) + + # Runterfahren + print("\nPhase 3: Herunterfahren (50W → 0W)") + self.send_power_message(40, 1) + self.send_power_message(25, 1) + self.send_power_message(15, 1) # < 20W → STOPPING + + # STOPPING für 15 Sekunden (stop_debounce) + print("\nPhase 4: STOPPING (< 20W für 15s = stop_debounce)") + for i in range(16): + self.send_power_message(10 - i * 0.6, 1) # 10W → ~0W über 15s + print(f" STOPPING seit {i+1}s / 15s") + + print("\n✓ Session sollte jetzt beendet sein (nach stop_debounce)") + print(" Erwartung: Session END mit end_reason='power_drop'") + + def scenario_full_session(self): + """Szenario: Komplette Session mit STANDBY→WORKING→STANDBY→END""" + print("\n=== SZENARIO: FULL SESSION ===") + print("Simuliert: Start → STANDBY → WORKING → STANDBY → END\n") + + # Start → STANDBY + print("Phase 1: Start → STANDBY (50W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) + self.send_power_message(50, 1) + self.send_power_message(50, 2) # → STANDBY + + # STANDBY 5s + print("\nPhase 2: STANDBY (50W für 5s)") + for _ in range(5): + self.send_power_message(55, 1) + + # STANDBY → WORKING + print("\nPhase 3: STANDBY → WORKING (50W → 150W)") + self.send_power_message(70, 1) + self.send_power_message(90, 1) + self.send_power_message(110, 1) # >= 100W → WORKING + self.send_power_message(130, 1) + self.send_power_message(150, 1) + + # WORKING 10s + print("\nPhase 4: WORKING (150W für 10s)") + for i in range(10): + self.send_power_message(150 + (i % 5) * 10, 1) + + # WORKING → STANDBY + print("\nPhase 5: WORKING → STANDBY (150W → 60W)") + self.send_power_message(120, 1) + self.send_power_message(90, 1) # < 100W → STANDBY + self.send_power_message(70, 1) + self.send_power_message(60, 1) + + # STANDBY 5s + print("\nPhase 6: STANDBY (60W für 5s)") + for _ in range(5): + self.send_power_message(60, 1) + + # Session END + print("\nPhase 7: Session END (60W → 0W mit stop_debounce)") + self.send_power_message(40, 1) + self.send_power_message(20, 1) + self.send_power_message(10, 1) # < 20W → STOPPING + + print("\nPhase 8: STOPPING (15s debounce)") + for i in range(16): + self.send_power_message(5 - i * 0.3, 1) + print(f" STOPPING seit {i+1}s / 15s") + + print("\n✓ Full Session abgeschlossen") + print(" Erwartung:") + print(" - Session START") + print(" - STANDBY → WORKING → STANDBY Transitionen") + print(" - Session END nach stop_debounce") + print(" - standby_duration_s und working_duration_s getrackt") + + def scenario_timeout(self): + """Szenario: Timeout Detection (keine Messages für 20s)""" + print("\n=== SZENARIO: TIMEOUT (message_timeout_s = 20s) ===") + print("Simuliert: Maschine läuft → plötzlicher Stromausfall (keine Messages mehr)\n") + + # Start → STANDBY + print("Phase 1: Session Start → STANDBY (50W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) + self.send_power_message(50, 1) + self.send_power_message(50, 2) # → STANDBY + + # STANDBY 10s + print("\nPhase 2: STANDBY (50W für 10s)") + for _ in range(10): + self.send_power_message(50, 1) + + # Keine Messages mehr (Stromausfall simulieren) + print("\n⚡ STROMAUSFALL: Keine Messages mehr für 25 Sekunden") + print(" (message_timeout_s = 20s)") + for i in range(25): + time.sleep(1) + print(f" Keine Messages seit {i+1}s / 20s", end="\r") + + print("\n\n✓ Timeout Szenario abgeschlossen") + print(" Erwartung: Session END nach 20s mit end_reason='timeout'") + + +def main(): + parser = argparse.ArgumentParser(description="Shelly PM Mini G3 MQTT Simulator") + parser.add_argument( + "--scenario", + choices=["standby", "working", "session_end", "full_session", "timeout"], + required=True, + help="Test-Szenario" + ) + parser.add_argument("--host", default=None, help="MQTT Broker Host (default: from env/config.yaml)") + parser.add_argument("--port", type=int, default=None, help="MQTT Broker Port (default: from env/config.yaml)") + parser.add_argument("--username", default=None, help="MQTT Username (default: from env/config.yaml)") + parser.add_argument("--password", default=None, help="MQTT Password (default: from env/config.yaml)") + parser.add_argument("--topic-prefix", default="testshelly", help="MQTT Topic Prefix (default: testshelly)") + + args = parser.parse_args() + + # Load credentials from environment or config.yaml if not provided via CLI + mqtt_config = {} + + if not all([args.host, args.port, args.username, args.password]): + # Try environment variables first + mqtt_config = { + 'host': os.getenv('MQTT_HOST'), + 'port': int(os.getenv('MQTT_PORT', 0)) if os.getenv('MQTT_PORT') else None, + 'username': os.getenv('MQTT_USERNAME'), + 'password': os.getenv('MQTT_PASSWORD') + } + + # If not all available, try config.yaml + if not all(mqtt_config.values()): + config_path = Path(__file__).parent.parent.parent / "config.yaml" + if config_path.exists(): + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + mqtt_section = config.get('mqtt', {}) + mqtt_config = { + 'host': mqtt_section.get('broker_host'), + 'port': mqtt_section.get('broker_port'), + 'username': mqtt_section.get('username'), + 'password': mqtt_section.get('password') + } + except Exception as e: + print(f"⚠️ Warnung: Fehler beim Lesen von config.yaml: {e}") + + # Use CLI args or fallback to loaded config + host = args.host or mqtt_config.get('host') + port = args.port or mqtt_config.get('port') + username = args.username or mqtt_config.get('username') + password = args.password or mqtt_config.get('password') + + # Validate that we have all required credentials + if not all([host, port, username, password]): + print("❌ Fehler: MQTT Credentials fehlen!") + print("") + print("Bitte setze die Credentials über:") + print(" 1) Kommandozeilen-Parameter:") + print(" --host --port --username --password ") + print("") + print(" 2) Umgebungsvariablen:") + print(" export MQTT_HOST=mqtt.majufilo.eu") + print(" export MQTT_PORT=8883") + print(" export MQTT_USERNAME=mosquitto") + print(" export MQTT_PASSWORD=xxx") + print("") + print(" 3) config.yaml im Projektroot") + print("") + sys.exit(1) + + print("=" * 60) + print("SHELLY PM MINI G3 SIMULATOR") + print("=" * 60) + + simulator = ShellySimulator( + broker_host=host, + broker_port=port, + username=username, + password=password, + topic_prefix=args.topic_prefix + ) + + try: + simulator.connect() + + # Szenario ausführen + if args.scenario == "standby": + simulator.scenario_standby() + elif args.scenario == "working": + simulator.scenario_working() + elif args.scenario == "session_end": + simulator.scenario_session_end() + elif args.scenario == "full_session": + simulator.scenario_full_session() + elif args.scenario == "timeout": + simulator.scenario_timeout() + + print("\n" + "=" * 60) + print("Test abgeschlossen. Prüfe sessions.json für Ergebnisse.") + print("=" * 60) + + except KeyboardInterrupt: + print("\n\n⚠️ Abgebrochen") + except Exception as e: + print(f"\n❌ Fehler: {e}") + finally: + simulator.disconnect() + + +if __name__ == "__main__": + main() diff --git a/open_workshop_mqtt/python_prototype/tests/unit/__init__.py b/open_workshop_mqtt/python_prototype/tests/unit/__init__.py new file mode 100644 index 0000000..8ff464d --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/unit/__init__.py @@ -0,0 +1,3 @@ +""" +Unit Tests - Testen einzelne Komponenten isoliert ohne externe Dependencies +""" diff --git a/open_workshop_mqtt/python_prototype/tests/unit/test_session_detector.py b/open_workshop_mqtt/python_prototype/tests/unit/test_session_detector.py new file mode 100644 index 0000000..af7ef5b --- /dev/null +++ b/open_workshop_mqtt/python_prototype/tests/unit/test_session_detector.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python3 +""" +Unit Tests für SessionDetector + +Testet die Dual-Threshold Session Detection ohne MQTT Broker. +Direktes Testen der Logik mit simulierten Events. + +Usage: + pytest test_session_detector.py -v + pytest test_session_detector.py::test_standby_to_working -v +""" + +import pytest +from datetime import datetime, timezone, timedelta +from session_detector import SessionDetector, SessionState + + +@pytest.fixture +def detector(): + """SessionDetector mit Test-Konfiguration""" + config = [ + { + 'machine_id': 'test-machine-01', + 'machine_name': 'Test Machine', + 'standby_threshold_w': 20, + 'working_threshold_w': 100, + 'start_debounce_s': 3, + 'stop_debounce_s': 15, + 'message_timeout_s': 20, + } + ] + return SessionDetector(device_config=config) + + +def create_event(power_w: float, machine_id: str = 'test-machine-01', timestamp: datetime = None): + """Helper: Erstellt ein normalisiertes Event""" + if timestamp is None: + timestamp = datetime.now(timezone.utc) + + return { + 'event_id': 'test-event', + 'event_type': 'power_measurement', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': 'Test Machine' + }, + 'metrics': { + 'power_w': power_w + } + } + + +class TestSessionStart: + """Tests für Session-Start mit start_debounce""" + + def test_idle_to_starting(self, detector): + """Power >= 20W → STARTING""" + event = create_event(power_w=30) + result = detector.process_event(event) + + assert result is None # Noch kein Session-Start + assert detector.get_machine_state('test-machine-01') == 'starting' + + def test_starting_to_standby(self, detector): + """Nach 3s debounce mit 20-100W → STANDBY""" + start_time = datetime.now(timezone.utc) + + # T+0: Power 30W → STARTING + event1 = create_event(power_w=30, timestamp=start_time) + result1 = detector.process_event(event1) + assert result1 is None + + # T+1: Power 50W → noch STARTING + event2 = create_event(power_w=50, timestamp=start_time + timedelta(seconds=1)) + result2 = detector.process_event(event2) + assert result2 is None + + # T+3: Power 50W → STANDBY (debounce passed) + event3 = create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)) + result3 = detector.process_event(event3) + + assert result3 is not None + assert result3['event_type'] == 'session_start' + assert result3['session_data']['start_time'] is not None + assert detector.get_machine_state('test-machine-01') == 'standby' + + def test_starting_to_working(self, detector): + """Nach 3s debounce mit >=100W → WORKING""" + start_time = datetime.now(timezone.utc) + + # T+0: Power 30W → STARTING + event1 = create_event(power_w=30, timestamp=start_time) + detector.process_event(event1) + + # T+1: Power 120W → noch STARTING + event2 = create_event(power_w=120, timestamp=start_time + timedelta(seconds=1)) + detector.process_event(event2) + + # T+3: Power 150W → WORKING (debounce passed) + event3 = create_event(power_w=150, timestamp=start_time + timedelta(seconds=3)) + result3 = detector.process_event(event3) + + assert result3 is not None + assert result3['event_type'] == 'session_start' + assert detector.get_machine_state('test-machine-01') == 'working' + + def test_false_start(self, detector): + """Power fällt vor debounce → zurück zu IDLE""" + start_time = datetime.now(timezone.utc) + + # T+0: Power 30W → STARTING + event1 = create_event(power_w=30, timestamp=start_time) + detector.process_event(event1) + + # T+1: Power 10W → zurück zu IDLE (false start) + event2 = create_event(power_w=10, timestamp=start_time + timedelta(seconds=1)) + result2 = detector.process_event(event2) + + assert result2 is None + assert detector.get_machine_state('test-machine-01') == 'idle' + + +class TestStateTransitions: + """Tests für Zustandsübergänge während Session""" + + def test_standby_to_working(self, detector): + """STANDBY → WORKING bei Power >= 100W""" + start_time = datetime.now(timezone.utc) + + # Session starten im STANDBY + detector.process_event(create_event(power_w=30, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + assert detector.get_machine_state('test-machine-01') == 'standby' + + # Power auf 150W → WORKING + event = create_event(power_w=150, timestamp=start_time + timedelta(seconds=10)) + result = detector.process_event(event) + + assert result is None # Keine session_end/start, nur Transition + assert detector.get_machine_state('test-machine-01') == 'working' + + def test_working_to_standby(self, detector): + """WORKING → STANDBY bei Power < 100W""" + start_time = datetime.now(timezone.utc) + + # Session starten im WORKING + detector.process_event(create_event(power_w=150, timestamp=start_time)) + detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))) + assert detector.get_machine_state('test-machine-01') == 'working' + + # Power auf 60W → STANDBY + event = create_event(power_w=60, timestamp=start_time + timedelta(seconds=10)) + result = detector.process_event(event) + + assert result is None + assert detector.get_machine_state('test-machine-01') == 'standby' + + +class TestSessionEnd: + """Tests für Session-Ende mit stop_debounce""" + + def test_session_end_from_standby(self, detector): + """STANDBY → STOPPING → IDLE nach 15s < 20W""" + start_time = datetime.now(timezone.utc) + + # Session starten + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # STANDBY für 10s + for i in range(10): + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i))) + + # Power fällt unter 20W → STOPPING + t_stopping = start_time + timedelta(seconds=13) + event_stopping = create_event(power_w=10, timestamp=t_stopping) + result_stopping = detector.process_event(event_stopping) + + assert result_stopping is None + assert detector.get_machine_state('test-machine-01') == 'stopping' + + # STOPPING für 14s → noch keine Session-End + for i in range(14): + result = detector.process_event( + create_event(power_w=10, timestamp=t_stopping + timedelta(seconds=i + 1)) + ) + assert result is None # Noch kein Session-Ende + + # T+15: stop_debounce passed → SESSION END + event_end = create_event(power_w=5, timestamp=t_stopping + timedelta(seconds=15)) + result_end = detector.process_event(event_end) + + assert result_end is not None + assert result_end['event_type'] == 'session_end' + assert result_end['session_data']['end_reason'] == 'power_drop' + assert detector.get_machine_state('test-machine-01') == 'idle' + + def test_stopping_canceled_power_back_up(self, detector): + """STOPPING abgebrochen wenn Power wieder >= 20W""" + start_time = datetime.now(timezone.utc) + + # Session starten + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # Power fällt → STOPPING + t_stopping = start_time + timedelta(seconds=10) + detector.process_event(create_event(power_w=10, timestamp=t_stopping)) + assert detector.get_machine_state('test-machine-01') == 'stopping' + + # Nach 5s: Power wieder hoch → zurück zu STANDBY + event_back = create_event(power_w=60, timestamp=t_stopping + timedelta(seconds=5)) + result = detector.process_event(event_back) + + assert result is None # Kein Session-Ende + assert detector.get_machine_state('test-machine-01') == 'standby' + + +class TestDurationTracking: + """Tests für Zeiterfassung in STANDBY und WORKING""" + + def test_standby_duration_tracked(self, detector): + """standby_duration_s wird korrekt akkumuliert""" + start_time = datetime.now(timezone.utc) + + # Session Start → STANDBY + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # STANDBY für 10 Sekunden + for i in range(10): + detector.process_event( + create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i)) + ) + + # Session beenden + t_end = start_time + timedelta(seconds=20) + detector.process_event(create_event(power_w=10, timestamp=t_end)) + + # STOPPING für genau 15 Sekunden (stop_debounce) + for i in range(14): + result = detector.process_event( + create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1)) + ) + assert result is None # Noch kein Session-Ende + + # T+15: stop_debounce passed → SESSION END + result = detector.process_event( + create_event(power_w=0, timestamp=t_end + timedelta(seconds=15)) + ) + + assert result is not None + assert result['session_data']['standby_duration_s'] >= 9 # ~10s STANDBY + assert result['session_data']['working_duration_s'] == 0 # Kein WORKING + + def test_working_duration_tracked(self, detector): + """working_duration_s wird korrekt akkumuliert""" + start_time = datetime.now(timezone.utc) + + # Session Start → WORKING + detector.process_event(create_event(power_w=150, timestamp=start_time)) + detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))) + + # WORKING für 10 Sekunden + for i in range(10): + detector.process_event( + create_event(power_w=150, timestamp=start_time + timedelta(seconds=3 + i)) + ) + + # Session beenden + t_end = start_time + timedelta(seconds=20) + detector.process_event(create_event(power_w=10, timestamp=t_end)) + + # STOPPING für genau 15 Sekunden + for i in range(14): + result = detector.process_event( + create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1)) + ) + assert result is None + + # T+15: stop_debounce passed + result = detector.process_event( + create_event(power_w=0, timestamp=t_end + timedelta(seconds=15)) + ) + + assert result is not None + assert result['session_data']['working_duration_s'] >= 9 # ~10s WORKING + assert result['session_data']['standby_duration_s'] == 0 # Kein STANDBY + + def test_mixed_standby_working_duration(self, detector): + """STANDBY und WORKING Zeiten separat getrackt""" + start_time = datetime.now(timezone.utc) + + # Start → STANDBY + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # STANDBY 5s + for i in range(5): + detector.process_event( + create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i)) + ) + + # → WORKING + t_working = start_time + timedelta(seconds=8) + detector.process_event(create_event(power_w=150, timestamp=t_working)) + + # WORKING 5s + for i in range(5): + detector.process_event( + create_event(power_w=150, timestamp=t_working + timedelta(seconds=i + 1)) + ) + + # → STANDBY + t_standby2 = t_working + timedelta(seconds=6) + detector.process_event(create_event(power_w=60, timestamp=t_standby2)) + + # STANDBY 5s + for i in range(5): + detector.process_event( + create_event(power_w=60, timestamp=t_standby2 + timedelta(seconds=i + 1)) + ) + + # Session beenden + t_end = t_standby2 + timedelta(seconds=6) + detector.process_event(create_event(power_w=10, timestamp=t_end)) + + # STOPPING für genau 15 Sekunden + for i in range(14): + result = detector.process_event( + create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1)) + ) + assert result is None + + # T+15: stop_debounce passed + result = detector.process_event( + create_event(power_w=0, timestamp=t_end + timedelta(seconds=15)) + ) + + assert result is not None + # ~10s STANDBY (5s + 5s) + assert result['session_data']['standby_duration_s'] >= 9 + assert result['session_data']['standby_duration_s'] <= 12 + # ~5s WORKING + assert result['session_data']['working_duration_s'] >= 4 + assert result['session_data']['working_duration_s'] <= 7 + + +class TestTimeoutDetection: + """Tests für Timeout-basierte Session-Beendigung""" + + def test_timeout_from_standby(self, detector): + """Timeout nach 20s ohne Messages im STANDBY""" + start_time = datetime.now(timezone.utc) + + # Session starten + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # Letzte Message + last_msg = start_time + timedelta(seconds=10) + detector.process_event(create_event(power_w=50, timestamp=last_msg)) + + # 21 Sekunden später → TIMEOUT + timeout_event = create_event(power_w=50, timestamp=last_msg + timedelta(seconds=21)) + result = detector.process_event(timeout_event) + + assert result is not None + assert result['event_type'] == 'session_end' + assert result['session_data']['end_reason'] == 'timeout' + assert result['power_w'] == 0.0 # Bei Timeout wird 0W angenommen + + def test_timeout_from_working(self, detector): + """Timeout nach 20s ohne Messages im WORKING""" + start_time = datetime.now(timezone.utc) + + # Session starten im WORKING + detector.process_event(create_event(power_w=150, timestamp=start_time)) + detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))) + + # Letzte Message + last_msg = start_time + timedelta(seconds=10) + detector.process_event(create_event(power_w=150, timestamp=last_msg)) + + # 21 Sekunden später → TIMEOUT + timeout_event = create_event(power_w=150, timestamp=last_msg + timedelta(seconds=21)) + result = detector.process_event(timeout_event) + + assert result is not None + assert result['event_type'] == 'session_end' + assert result['session_data']['end_reason'] == 'timeout' + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])