From a7897ecc9bb4d367ee07bf32db820d3ac15be83d Mon Sep 17 00:00:00 2001 From: "matthias.lotz" Date: Tue, 10 Feb 2026 20:13:58 +0100 Subject: [PATCH] added python_proto_type --- README.md | 2 +- TODO.md | 2 +- python_proto_type/.gitignore | 31 + python_proto_type/README.md | 133 +++++ python_proto_type/config.yaml.example | 70 +++ python_proto_type/event_normalizer.py | 192 ++++++ python_proto_type/event_storage.py | 232 ++++++++ python_proto_type/main.py | 286 +++++++++ python_proto_type/mqtt_client.py | 198 +++++++ python_proto_type/requirements.txt | 17 + python_proto_type/session_detector.py | 514 ++++++++++++++++ python_proto_type/setup.sh | 73 +++ python_proto_type/shelly_parser.py | 201 +++++++ python_proto_type/tests/__init__.py | 3 + .../tests/integration/__init__.py | 3 + .../integration/test_mqtt_integration.py | 550 ++++++++++++++++++ python_proto_type/tests/tools/__init__.py | 3 + .../tests/tools/shelly_simulator.py | 375 ++++++++++++ python_proto_type/tests/unit/__init__.py | 3 + .../tests/unit/test_session_detector.py | 396 +++++++++++++ 20 files changed, 3282 insertions(+), 2 deletions(-) create mode 100644 python_proto_type/.gitignore create mode 100644 python_proto_type/README.md create mode 100644 python_proto_type/config.yaml.example create mode 100644 python_proto_type/event_normalizer.py create mode 100644 python_proto_type/event_storage.py create mode 100644 python_proto_type/main.py create mode 100644 python_proto_type/mqtt_client.py create mode 100644 python_proto_type/requirements.txt create mode 100644 python_proto_type/session_detector.py create mode 100755 python_proto_type/setup.sh create mode 100644 python_proto_type/shelly_parser.py create mode 100644 python_proto_type/tests/__init__.py create mode 100644 python_proto_type/tests/integration/__init__.py create mode 100644 python_proto_type/tests/integration/test_mqtt_integration.py create mode 100644 python_proto_type/tests/tools/__init__.py create mode 100755 python_proto_type/tests/tools/shelly_simulator.py create mode 100644 python_proto_type/tests/unit/__init__.py create mode 100644 python_proto_type/tests/unit/test_session_detector.py diff --git a/README.md b/README.md index 87cf9d5..f4fc31b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# IoT MQTT Bridge for Odoo +# IoT MQTT Bridge for Odoo ../odoo **Separater Docker Container für MQTT-IoT-Device-Integration** diff --git a/TODO.md b/TODO.md index 40c6ac5..f434fd5 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,7 @@ ## Status-Übersicht -### ✅ Phase 1: Python Prototyp (ABGESCHLOSSEN) +### ✅ Phase 1: Python Prototyp (ABGESCHLOSSEN) -> ./python_prototype - [x] M0: Projekt Setup & MQTT Verbindung - [x] M1: Shelly PM Mini G3 Integration - [x] M2: Event-Normalisierung & Unified Schema diff --git a/python_proto_type/.gitignore b/python_proto_type/.gitignore new file mode 100644 index 0000000..6e8b16f --- /dev/null +++ b/python_proto_type/.gitignore @@ -0,0 +1,31 @@ +# Config with secrets +config.yaml + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +build/ +dist/ +*.egg-info/ + +# Data & Logs +data/ +logs/ +*.log + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/python_proto_type/README.md b/python_proto_type/README.md new file mode 100644 index 0000000..9a0e82f --- /dev/null +++ b/python_proto_type/README.md @@ -0,0 +1,133 @@ +# Open Workshop IoT Bridge - Python Prototype + +MQTT-basierte IoT Bridge für Odoo 18 Community zur Erfassung von Maschinenlaufzeiten. + +## Projektstruktur + +``` +python_prototype/ +├── main.py # Haupteinstiegspunkt +├── config.yaml # Produktionskonfiguration (nicht in Git) +├── config.yaml.example # Beispielkonfiguration +│ +├── Core Components (Produktionscode) +├── mqtt_client.py # MQTT Client mit TLS +├── shelly_parser.py # Parser für Shelly PM Mini G3 +├── event_normalizer.py # Event Schema v1 +├── session_detector.py # Session Detection (Dual-Threshold) +├── event_storage.py # JSON Storage +│ +├── data/ # Laufzeitdaten +│ ├── events.jsonl # Event-Log +│ └── sessions.json # Session-Daten +│ +├── tests/ # Tests +│ ├── unit/ # Unit Tests (schnell, isoliert) +│ │ └── test_session_detector.py +│ ├── integration/ # Integration Tests (mit MQTT) +│ │ └── test_mqtt_integration.py +│ └── tools/ # Test-Hilfsprogramme +│ └── shelly_simulator.py +│ +└── venv/ # Virtual Environment +``` + +## Installation + +```bash +python3 -m venv venv +source venv/bin/activate +pip install -r requirements.txt + +cp config.yaml.example config.yaml +# config.yaml anpassen +``` + +## Tests + +```bash +# Unit Tests (schnell, ~0.05s) +pytest tests/unit/ -v + +# Integration Tests (mit MQTT, ~30-60s) +# Benötigt MQTT Konfiguration: +# Option 1: Nutzt existierende config.yaml +pytest tests/integration/ -v -s + +# Option 2: Mit Umgebungsvariablen +export MQTT_HOST=mqtt.majufilo.eu +export MQTT_PORT=8883 +export MQTT_USERNAME=mosquitto +export MQTT_PASSWORD=dein_passwort +pytest tests/integration/ -v -s + +# Alle Tests +pytest tests/ -v +``` + +**Wichtig:** Integration Tests lesen MQTT-Zugangsdaten aus: +1. Umgebungsvariablen (`MQTT_HOST`, `MQTT_PASSWORD`, etc.) ODER +2. Existierender `config.yaml` im Projektroot + +**Niemals Passwörter im Source Code committen!** + +## Betrieb + +```bash +# Bridge starten +python main.py + +# Mit alternativer Config +python main.py --config custom_config.yaml +``` + +## Manuelle Tests + +```bash +# Shelly Simulator für Tests +python tests/tools/shelly_simulator.py --scenario session_end +python tests/tools/shelly_simulator.py --scenario full_session +python tests/tools/shelly_simulator.py --scenario timeout + +python3 tests/tools/shelly_simulator.py --broker localhost --port 1883 --no-tls --username "" --password "" --scenario full_session +``` + +## Session Detection States + +- **IDLE**: Power < 20W +- **STARTING**: Power >= 20W, 3s debounce +- **STANDBY**: 20-100W (Maschine an, Spindel aus) +- **WORKING**: >= 100W (Spindel läuft) +- **STOPPING**: < 20W, 15s debounce +- **Timeout**: 20s keine Messages → Session-Ende + +## Odoo Integration + +**Deployment-Strategie:** Direkte Integration in Odoo (kein separater Container) + +**Begründung:** +- Werkstatt-Setup: maximal 10 Maschinen +- Performance: `check_timeouts()` alle 10s = ~10ms CPU-Zeit pro Durchlauf +- CPU-Last: < 0.1% bei 10 Maschinen → vernachlässigbar +- Vorteile: Direkter DB-Zugriff, ACID-Transaktionen, keine API-Overhead +- Einfachheit: Ein Container, keine zusätzliche Infrastruktur + +**Implementation in Odoo:** +- MQTT Client als Odoo Thread oder Cron-Job +- `check_timeouts()` als scheduled action alle 10 Sekunden +- Direkte Verwendung von `workshop.machine`, `workshop.session` Models + +**Alternative (nur bei >50 Maschinen nötig):** Separater Microservice-Container + +## Roadmap + +- [x] M0-M3: MQTT + Parser + Session Detection +- [x] Unit + Integration Tests +- [x] Timeout Detection mit check_timeouts() +- [x] M4: Multi-Device Support (2+ Maschinen parallel) +- [x] M5: Reconnect + Error Handling + - MQTT Auto-Reconnect mit Exponential Backoff (1s → 60s) + - State Recovery: Laufende Sessions werden nach Neustart wiederhergestellt + - Robustes Error Handling in allen Parsern + - Alle 21 Tests bestanden ✅ +- [ ] M6-M10: Odoo Integration diff --git a/python_proto_type/config.yaml.example b/python_proto_type/config.yaml.example new file mode 100644 index 0000000..a6cd58c --- /dev/null +++ b/python_proto_type/config.yaml.example @@ -0,0 +1,70 @@ +# MQTT Broker Configuration +mqtt: + host: "localhost" # MQTT Broker IP/Hostname + port: 1883 # Standard MQTT Port (1883 unencrypted, 8883 encrypted) + username: "" # Optional: MQTT Username + password: "" # Optional: MQTT Password + client_id: "ows_iot_bridge_prototype" + keepalive: 60 + + # Topics to subscribe + topics: + - "shellies/+/status" # Shelly Status Updates + - "shellypmminig3/events" # Shelly Events + - "shellies/+/online" # Shelly Online Status + # Add more topics as needed + +# Device Configuration +# Multi-Device Support: Jedes Shelly PM Mini G3 Gerät = 1 Maschine +# Unterscheide Geräte durch verschiedene topic_prefix in MQTT Topics +devices: + - shelly_id: "shellypmminig3-48f6eeb73a1c" + machine_name: "Shaper Origin" + machine_id: "shaper-origin-01" + device_type: "shelly_pm_mini_g3" + + # Dual Power Thresholds (in Watts) + # IDLE: Power < standby_threshold_w + # STANDBY: standby_threshold_w <= Power < working_threshold_w (Maschine an, Spindel aus) + # WORKING: Power >= working_threshold_w (Spindel läuft) + standby_threshold_w: 20 # Start tracking session + working_threshold_w: 100 # Active work detected + + # Debounce times (in seconds) + start_debounce_s: 3 # Power >= standby_threshold for X seconds → session_start + stop_debounce_s: 15 # Power < standby_threshold for Y seconds → session_end + + # Timeout detection (in seconds) + message_timeout_s: 20 # No message for X seconds → session_end (timeout) + + enabled: true + + # Zweite Maschine - anderes Shelly-Gerät, andere Thresholds + # Wichtig: Jedes Shelly muss eigenen topic_prefix haben! + - shelly_id: "shellypmminig3-48f6eeb73a2d" + machine_name: "CNC Fräse" + machine_id: "cnc-mill-01" + device_type: "shelly_pm_mini_g3" + standby_threshold_w: 30 # Höherer Standby-Wert für CNC + working_threshold_w: 150 # Spindel braucht mehr Power + start_debounce_s: 3 + stop_debounce_s: 15 + message_timeout_s: 20 + enabled: true + +# Logging Configuration +logging: + level: "INFO" # DEBUG, INFO, WARNING, ERROR + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "logs/ows_iot_bridge.log" # Optional: log to file + console: true + +# Output Configuration +output: + # Where to store events and sessions + events_file: "data/events.json" + sessions_file: "data/sessions.json" + + # Console output + print_events: true + print_sessions: true diff --git a/python_proto_type/event_normalizer.py b/python_proto_type/event_normalizer.py new file mode 100644 index 0000000..a99cfbd --- /dev/null +++ b/python_proto_type/event_normalizer.py @@ -0,0 +1,192 @@ +""" +Event Normalizer for Open Workshop IoT Bridge + +Converts device-specific data formats into a unified Event Schema v1 +for consistent processing across different IoT device types. +""" + +import logging +import uuid +from datetime import datetime, timezone +from typing import Dict, Optional + + +class EventNormalizer: + """ + Normalizes device-specific events into unified Event Schema v1 + + Event Schema v1: + { + "event_id": "uuid4", + "event_type": "power_measurement", + "timestamp": "2026-01-22T18:30:45.123456Z", + "machine": { + "machine_id": "shaper-origin-01", + "machine_name": "Shaper Origin" + }, + "device": { + "device_id": "48f6eeb73a1c", + "device_type": "shelly_pm_mini_g3", + "topic_prefix": "shaperorigin" + }, + "metrics": { + "power_w": 52.9, + "voltage_v": 234.8, + "current_a": 0.267, + "frequency_hz": 49.9 + }, + "raw_data": { ... } # Optional: Original device data + } + """ + + def __init__(self, device_config: list = None): + """ + Initialize normalizer with device configuration + + Args: + device_config: List of device configurations from config.yaml + """ + self.logger = logging.getLogger(__name__) + self.device_config = device_config or [] + + # Build topic_prefix to machine mapping + self.prefix_machine_map = {} + for device in self.device_config: + topic_prefix = device.get('topic_prefix') + if topic_prefix: + self.prefix_machine_map[topic_prefix] = { + 'machine_id': device.get('machine_id'), + 'machine_name': device.get('machine_name'), + 'device_type': device.get('device_type', 'unknown') + } + + def normalize_shelly_event(self, shelly_data: Dict) -> Optional[Dict]: + """ + Convert Shelly PM Mini G3 data to Event Schema v1 + + Args: + shelly_data: Parsed Shelly data from ShellyParser + + Returns: + Normalized event dict or None if data incomplete + """ + try: + # Extract device ID and find topic prefix + device_id = shelly_data.get('device_id', 'unknown') + topic_prefix = self._find_topic_prefix(device_id, shelly_data) + + if not topic_prefix: + self.logger.warning(f"Could not determine topic_prefix for device {device_id}") + return None + + # Get machine info from config + machine_info = self.prefix_machine_map.get(topic_prefix) + if not machine_info: + self.logger.warning(f"No machine config found for topic_prefix: {topic_prefix}") + return None + + # Build normalized event + event = { + "event_id": str(uuid.uuid4()), + "event_type": self._determine_event_type(shelly_data), + "timestamp": self._normalize_timestamp(shelly_data.get('timestamp')), + "machine": { + "machine_id": machine_info['machine_id'], + "machine_name": machine_info['machine_name'] + }, + "device": { + "device_id": device_id, + "device_type": machine_info['device_type'], + "topic_prefix": topic_prefix + }, + "metrics": self._extract_metrics(shelly_data), + "raw_data": shelly_data # Keep original for debugging + } + + return event + + except Exception as e: + self.logger.error(f"Failed to normalize Shelly event: {e}", exc_info=True) + return None + + def _find_topic_prefix(self, device_id: str, shelly_data: Dict) -> Optional[str]: + """ + Find topic_prefix for device + + Device ID can be: + - topic_prefix itself (e.g. 'shaperorigin' from status messages) + - actual device ID (e.g. '48f6eeb73a1c' from RPC events) + """ + # Check if device_id is already a known topic_prefix + if device_id in self.prefix_machine_map: + return device_id + + # Otherwise, we need to infer it (currently only one device configured) + # For multiple devices, we'd need more sophisticated matching + if len(self.prefix_machine_map) == 1: + return list(self.prefix_machine_map.keys())[0] + + # TODO: For multi-device setups, implement device_id to topic_prefix mapping + self.logger.warning(f"Cannot map device_id {device_id} to topic_prefix") + return None + + def _determine_event_type(self, shelly_data: Dict) -> str: + """Determine event type from Shelly data""" + msg_type = shelly_data.get('message_type', 'unknown') + + if msg_type in ['status', 'event']: + return 'power_measurement' + + return 'unknown' + + def _normalize_timestamp(self, timestamp_str: Optional[str]) -> str: + """ + Normalize timestamp to ISO 8601 UTC format + + Args: + timestamp_str: ISO timestamp string or None + + Returns: + ISO 8601 UTC timestamp string + """ + if timestamp_str: + try: + # Parse and ensure UTC + dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + return dt.astimezone(timezone.utc).isoformat().replace('+00:00', 'Z') + except Exception as e: + self.logger.warning(f"Failed to parse timestamp {timestamp_str}: {e}") + + # Fallback: current time in UTC + return datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') + + def _extract_metrics(self, shelly_data: Dict) -> Dict: + """ + Extract metrics from Shelly data + + Returns dict with standardized metric names + """ + metrics = {} + + # Power (always present in our use case) + # Shelly uses 'apower' (active power) + if 'apower' in shelly_data: + metrics['power_w'] = shelly_data['apower'] + + # Voltage (only in status messages) + if 'voltage' in shelly_data and shelly_data['voltage'] is not None: + metrics['voltage_v'] = shelly_data['voltage'] + + # Current (in status and some events) + if 'current' in shelly_data and shelly_data['current'] is not None: + metrics['current_a'] = shelly_data['current'] + + # Frequency (only in status messages) + if 'frequency' in shelly_data and shelly_data['frequency'] is not None: + metrics['frequency_hz'] = shelly_data['frequency'] + + # Energy counters (if available) + if 'total_energy' in shelly_data and shelly_data['total_energy'] is not None: + metrics['energy_total_wh'] = shelly_data['total_energy'] + + return metrics diff --git a/python_proto_type/event_storage.py b/python_proto_type/event_storage.py new file mode 100644 index 0000000..16e21fc --- /dev/null +++ b/python_proto_type/event_storage.py @@ -0,0 +1,232 @@ +""" +Event Storage Module - Persists events and sessions to JSON files +For later migration to Odoo database models +""" + +import json +import logging +from pathlib import Path +from typing import Dict, Optional, List +from datetime import datetime + + +class EventStorage: + """ + Stores events and sessions to JSON files + + Format designed for easy migration to Odoo models: + - events.jsonl: JSON Lines format (one event per line) → open_workshop.power_event + - sessions.json: JSON array of sessions → open_workshop.session + """ + + def __init__(self, events_file: str = "data/events.jsonl", + sessions_file: str = "data/sessions.json"): + self.logger = logging.getLogger(__name__) + self.events_file = Path(events_file) + self.sessions_file = Path(sessions_file) + + # Ensure data directory exists + self.events_file.parent.mkdir(parents=True, exist_ok=True) + + # Initialize sessions file if it doesn't exist + if not self.sessions_file.exists(): + self._write_sessions([]) + + def store_event(self, event: Dict) -> bool: + """ + Append event to JSONL file (one JSON object per line) + + Format for Odoo migration: + { + "event_id": "uuid", + "event_type": "power_measurement", + "timestamp": "ISO 8601", + "machine": {"machine_id": "...", "machine_name": "..."}, + "device": {...}, + "metrics": {"power_w": 45.7, "voltage_v": 230.2, ...} + } + """ + try: + with open(self.events_file, 'a', encoding='utf-8') as f: + # Write one JSON object per line (JSONL format) + json.dump(event, f, ensure_ascii=False) + f.write('\n') + + self.logger.debug(f"Event {event.get('event_id', 'N/A')} stored") + return True + + except Exception as e: + self.logger.error(f"Failed to store event: {e}") + return False + + def store_session_event(self, session_event: Dict) -> bool: + """ + Store session_start or session_end event + + Updates sessions.json with structured session data + + Format for Odoo migration (open_workshop.session): + { + "session_id": "uuid", + "machine_id": "shaper-origin-01", + "machine_name": "Shaper Origin", + "start_time": "2026-01-22T18:36:20.993Z", + "end_time": "2026-01-22T18:38:01.993Z", // null if running + "total_duration_s": 101, // null if running + "standby_duration_s": 80, // null if running (machine on, not working) + "working_duration_s": 21, // null if running (active work) + "start_power_w": 45.7, + "end_power_w": 0.0, // null if running + "end_reason": "power_drop", // or "timeout", null if running + """ + try: + sessions = self._read_sessions() + + session_id = session_event.get('session_id') + event_type = session_event.get('event_type') + machine = session_event.get('machine', {}) + session_data = session_event.get('session_data', {}) + + if event_type == 'session_start': + # Add new session + sessions.append({ + "session_id": session_id, + "machine_id": machine.get('machine_id'), + "machine_name": machine.get('machine_name'), + "start_time": session_data.get('start_time'), + "end_time": None, + "total_duration_s": None, + "standby_duration_s": None, + "working_duration_s": None, + "start_power_w": session_event.get('power_w'), + "end_power_w": None, + "end_reason": None, + "status": "running" + }) + + elif event_type == 'session_end': + # Update existing session + for session in sessions: + if session['session_id'] == session_id: + session.update({ + "end_time": session_data.get('end_time'), + "total_duration_s": session_data.get('total_duration_s'), + "standby_duration_s": session_data.get('standby_duration_s'), + "working_duration_s": session_data.get('working_duration_s'), + "end_power_w": session_event.get('power_w'), + "end_reason": session_data.get('end_reason'), + "status": "completed" + }) + break + + self._write_sessions(sessions) + self.logger.debug(f"Session {event_type} {session_id} stored") + return True + + except Exception as e: + self.logger.error(f"Failed to store session event: {e}", exc_info=True) + return False + + def load_sessions(self) -> List[Dict]: + """ + Load all sessions from sessions.json + + Returns: + List of session dicts (running and completed) + """ + try: + return self._read_sessions() + except Exception as e: + self.logger.error(f"Failed to load sessions: {e}") + return [] + + def get_running_sessions(self) -> List[Dict]: + """ + Get all currently running sessions (end_time is None) + + Returns: + List of running session dicts + """ + try: + sessions = self._read_sessions() + return [s for s in sessions if s.get('end_time') is None] + except Exception as e: + self.logger.error(f"Failed to get running sessions: {e}") + return [] + + def _handle_session_end(self, event: Dict) -> bool: + """Handle session_end event""" + sessions = self._read_sessions() + session_id = event['session_id'] + + # Find and update session + for session in sessions: + if session['session_id'] == session_id: + session_data = event.get('session_data', {}) + + session['end_time'] = session_data.get('end_time') + session['total_duration_s'] = session_data.get('total_duration_s') + session['standby_duration_s'] = session_data.get('standby_duration_s') + session['working_duration_s'] = session_data.get('working_duration_s') + session['end_power_w'] = event.get('power_w') + session['end_reason'] = session_data.get('end_reason', 'normal') + session['status'] = 'completed' + + self._write_sessions(sessions) + + total_min = session['total_duration_s'] / 60 if session['total_duration_s'] else 0 + standby_min = session['standby_duration_s'] / 60 if session['standby_duration_s'] else 0 + working_min = session['working_duration_s'] / 60 if session['working_duration_s'] else 0 + + self.logger.info( + f"Session {session_id[:8]}... completed ({session['end_reason']}) - " + f"Total: {total_min:.1f}min, Standby: {standby_min:.1f}min, Working: {working_min:.1f}min" + ) + return True + + self.logger.error(f"Session {session_id} not found for update") + return False + + def _read_sessions(self) -> list: + """Read all sessions from JSON file""" + try: + if self.sessions_file.exists(): + with open(self.sessions_file, 'r', encoding='utf-8') as f: + return json.load(f) + return [] + except Exception as e: + self.logger.error(f"Failed to read sessions: {e}") + return [] + + def _write_sessions(self, sessions: list) -> bool: + """Write sessions to JSON file""" + try: + with open(self.sessions_file, 'w', encoding='utf-8') as f: + json.dump(sessions, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + self.logger.error(f"Failed to write sessions: {e}") + return False + + def get_active_sessions(self) -> list: + """Get all running sessions""" + sessions = self._read_sessions() + return [s for s in sessions if s['status'] == 'running'] + + def get_session_by_id(self, session_id: str) -> Optional[Dict]: + """Get session by ID""" + sessions = self._read_sessions() + for session in sessions: + if session['session_id'] == session_id: + return session + return None + + def get_sessions_by_machine(self, machine_id: str, limit: int = 10) -> list: + """Get recent sessions for a machine""" + sessions = self._read_sessions() + machine_sessions = [s for s in sessions if s['machine_id'] == machine_id] + + # Sort by start_time descending + machine_sessions.sort(key=lambda x: x['start_time'], reverse=True) + + return machine_sessions[:limit] diff --git a/python_proto_type/main.py b/python_proto_type/main.py new file mode 100644 index 0000000..02a412f --- /dev/null +++ b/python_proto_type/main.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +""" +Open Workshop IoT Bridge - Python Prototype +Main entry point for MQTT to Odoo bridge + +Phase 1: Standalone prototype (without Odoo) +""" + +import sys +import logging +import yaml +import time +import signal +import json +import argparse +from pathlib import Path +from typing import Dict + +from mqtt_client import MQTTClient +from shelly_parser import ShellyParser +from event_normalizer import EventNormalizer +from session_detector import SessionDetector +from event_storage import EventStorage + + +class IoTBridge: + """Main IoT Bridge Application""" + + def __init__(self, config_path: str = 'config.yaml'): + """ + Initialize IoT Bridge + + Args: + config_path: Path to config.yaml file + """ + # Load configuration + self.config = self._load_config(config_path) + + # Setup logging + self._setup_logging() + + self.logger = logging.getLogger(__name__) + self.logger.info("=== Open Workshop IoT Bridge Starting ===") + + # Initialize Shelly Parser with device config + self.shelly_parser = ShellyParser(device_config=self.config.get('devices', [])) + + # Initialize Event Normalizer + self.event_normalizer = EventNormalizer(device_config=self.config.get('devices', [])) + + # Initialize Session Detector + self.session_detector = SessionDetector(device_config=self.config.get('devices', [])) + + # Initialize Event Storage + output_config = self.config.get('output', {}) + self.event_storage = EventStorage( + events_file=output_config.get('events_file', 'data/events.jsonl'), + sessions_file=output_config.get('sessions_file', 'data/sessions.json') + ) + + # Initialize MQTT Client + self.mqtt_client = MQTTClient( + config=self.config['mqtt'], + message_callback=self.on_mqtt_message + ) + + self.running = False + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _load_config(self, config_path: str) -> dict: + """Load configuration from YAML file""" + config_file = Path(config_path) + + if not config_file.exists(): + print(f"Error: Config file not found: {config_path}") + print(f"Please copy config.yaml.example to config.yaml and adjust settings") + sys.exit(1) + + with open(config_file, 'r') as f: + config = yaml.safe_load(f) + + return config + + def _setup_logging(self): + """Setup logging configuration""" + log_config = self.config.get('logging', {}) + + # Create logs directory if logging to file + if log_config.get('file'): + log_file = Path(log_config['file']) + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Configure logging + log_level = getattr(logging, log_config.get('level', 'INFO').upper()) + log_format = log_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + handlers = [] + + # Console handler + if log_config.get('console', True): + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter(log_format)) + handlers.append(console_handler) + + # File handler + if log_config.get('file'): + file_handler = logging.FileHandler(log_config['file']) + file_handler.setFormatter(logging.Formatter(log_format)) + handlers.append(file_handler) + + logging.basicConfig( + level=log_level, + format=log_format, + handlers=handlers + ) + + def _signal_handler(self, signum, frame): + """Handle shutdown signals""" + self.logger.info(f"Received signal {signum}, shutting down...") + self.stop() + + def on_mqtt_message(self, topic: str, payload): + """ + Callback for incoming MQTT messages + + Args: + topic: MQTT topic + payload: Message payload (parsed JSON or raw string) + """ + # Log RAW topic for analysis + if 'shellypmminig3' in topic and 'debug' not in topic: + self.logger.info(f"RAW Topic: {topic}") + + # Only process JSON payloads + if not isinstance(payload, dict): + return + + # Parse Shelly message + parsed_data = self.shelly_parser.parse_message(topic, payload) + + if parsed_data: + # Normalize to Event Schema v1 + normalized_event = self.event_normalizer.normalize_shelly_event(parsed_data) + + if normalized_event: + # Store event (for later Odoo migration) + self.event_storage.store_event(normalized_event) + + # Log normalized event (condensed) + metrics = normalized_event.get('metrics', {}) + power_w = metrics.get('power_w', 'N/A') + self.logger.debug( + f"Event: {normalized_event['machine']['machine_name']} - " + f"Power: {power_w}W" + ) + + # Process through Session Detector + session_event = self.session_detector.process_event(normalized_event) + + if session_event: + # Session state change occurred! + self._log_session_event(session_event) + + # Store session event (for Odoo migration) + self.event_storage.store_session_event(session_event) + + def _log_session_event(self, session_event: Dict): + """Log session start/end events""" + self.logger.info("\n" + "=" * 70) + + if session_event['event_type'] == 'session_start': + self.logger.info("🚀 SESSION START") + else: + self.logger.info("🏁 SESSION END") + + self.logger.info("=" * 70) + self.logger.info(f"Session ID: {session_event['session_id']}") + self.logger.info(f"Machine: {session_event['machine']['machine_name']} ({session_event['machine']['machine_id']})") + self.logger.info(f"Timestamp: {session_event['timestamp']}") + self.logger.info(f"Power: {session_event['power_w']:.1f}W") + + session_data = session_event.get('session_data', {}) + self.logger.info(f"Start Time: {session_data.get('start_time')}") + + if 'end_time' in session_data: + self.logger.info(f"End Time: {session_data['end_time']}") + duration_s = session_data.get('duration_s', 0) + duration_min = duration_s / 60 + self.logger.info(f"Duration: {duration_s}s ({duration_min:.2f} min)") + + self.logger.info("=" * 70 + "\n") + + def _restore_running_sessions(self): + """Restore running sessions after bridge restart""" + self.logger.info("Checking for running sessions to restore...") + + running_sessions = self.event_storage.get_running_sessions() + + if running_sessions: + self.logger.info(f"Found {len(running_sessions)} running session(s)") + self.session_detector.restore_state(running_sessions) + else: + self.logger.info("No running sessions to restore") + + def start(self): + """Start the IoT Bridge""" + self.logger.info("Starting IoT Bridge...") + + # Restore running sessions after restart + self._restore_running_sessions() + + # Create data directory + data_dir = Path('data') + data_dir.mkdir(exist_ok=True) + + # Connect to MQTT + if not self.mqtt_client.connect(): + self.logger.error("Failed to connect to MQTT Broker") + return False + + # Start MQTT loop + self.mqtt_client.start() + + # Wait for connection + if not self.mqtt_client.wait_for_connection(timeout=10): + self.logger.error("MQTT connection timeout") + return False + + self.logger.info("IoT Bridge started successfully") + self.logger.info("Listening for MQTT messages... (Press Ctrl+C to stop)") + + self.running = True + + # Main loop + try: + while self.running: + time.sleep(1) + + # Check for session timeouts periodically + timeout_events = self.session_detector.check_timeouts() + for session_event in timeout_events: + self._log_session_event(session_event) + self.event_storage.store_session_event(session_event) + + except KeyboardInterrupt: + self.logger.info("Interrupted by user") + + return True + + def stop(self): + """Stop the IoT Bridge""" + self.running = False + self.logger.info("Stopping IoT Bridge...") + self.mqtt_client.stop() + self.logger.info("IoT Bridge stopped") + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser(description='Open Workshop IoT Bridge') + parser.add_argument('--config', default='config.yaml', help='Path to config file (default: config.yaml)') + args = parser.parse_args() + + # Check if config file exists + config_file = Path(args.config) + + if not config_file.exists(): + print("\n" + "="*60) + print(f"Configuration file not found: {config_file}") + print("="*60) + print("\nPlease create config.yaml from the example:") + print(" cp config.yaml.example config.yaml") + print("\nThen edit config.yaml with your MQTT broker settings.") + print("="*60 + "\n") + sys.exit(1) + + # Start bridge + bridge = IoTBridge(str(config_file)) + bridge.start() + + +if __name__ == '__main__': + main() diff --git a/python_proto_type/mqtt_client.py b/python_proto_type/mqtt_client.py new file mode 100644 index 0000000..1105d3c --- /dev/null +++ b/python_proto_type/mqtt_client.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +""" +MQTT Client für Open Workshop IoT Bridge +Verbindet sich mit MQTT Broker und empfängt Device Events +""" + +import logging +import time +import json +import ssl +from typing import Callable, Optional +import paho.mqtt.client as mqtt + + +class MQTTClient: + """MQTT Client Wrapper für Device Event Empfang""" + + def __init__(self, config: dict, message_callback: Optional[Callable] = None): + """ + Initialize MQTT Client + + Args: + config: MQTT configuration dict from config.yaml + message_callback: Callback function for incoming messages + """ + self.config = config + self.message_callback = message_callback + self.logger = logging.getLogger(__name__) + + # Create MQTT Client + self.client = mqtt.Client( + client_id=config.get('client_id', 'ows_iot_bridge'), + protocol=mqtt.MQTTv5 + ) + + # Set callbacks + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + + # Set username/password if provided + if config.get('username') and config.get('password'): + self.client.username_pw_set( + config['username'], + config['password'] + ) + + # Enable TLS/SSL if port is 8883 + if config.get('port') == 8883: + self.client.tls_set(cert_reqs=ssl.CERT_NONE) + self.client.tls_insecure_set(True) + self.logger.info("TLS/SSL enabled for port 8883") + + self.connected = False + self.topics = config.get('topics', []) + self.reconnect_delay = 1 # Start with 1 second + self.max_reconnect_delay = 60 # Max 60 seconds + self.reconnecting = False + + def _on_connect(self, client, userdata, flags, rc, properties=None): + """Callback when connected to MQTT broker""" + if rc == 0: + self.connected = True + self.reconnecting = False + self.reconnect_delay = 1 # Reset delay on successful connect + self.logger.info(f"Connected to MQTT Broker at {self.config['host']}:{self.config['port']}") + + # Subscribe to all configured topics + for topic in self.topics: + self.client.subscribe(topic) + self.logger.info(f"Subscribed to topic: {topic}") + else: + self.logger.error(f"Failed to connect to MQTT Broker, return code {rc}") + self.connected = False + self._schedule_reconnect() + + def _on_disconnect(self, client, userdata, rc, properties=None): + """Callback when disconnected from MQTT broker""" + self.connected = False + if rc != 0: + self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc})") + self._schedule_reconnect() + else: + self.logger.info("Disconnected from MQTT Broker") + + def _schedule_reconnect(self): + """Schedule automatic reconnection with exponential backoff""" + if self.reconnecting: + return # Already reconnecting + + self.reconnecting = True + self.logger.info(f"Attempting reconnect in {self.reconnect_delay}s...") + time.sleep(self.reconnect_delay) + + try: + self.client.reconnect() + self.logger.info("Reconnection initiated") + except Exception as e: + self.logger.error(f"Reconnection failed: {e}") + # Exponential backoff + self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) + self.reconnecting = False + + def _on_message(self, client, userdata, msg): + """Callback when message received""" + try: + topic = msg.topic + payload = msg.payload.decode('utf-8') + + # Log all shellypmminig3 topics (exclude debug) + if 'shellypmminig3' in topic and 'debug' not in topic: + self.logger.info(f"📡 TOPIC: {topic}") + + self.logger.debug(f"Message received on topic '{topic}'") + + # Try to parse as JSON + try: + payload_json = json.loads(payload) + except json.JSONDecodeError: + # Debug logs are expected to be non-JSON, only warn for non-debug topics + if 'debug' not in topic: + self.logger.warning(f"Message on '{topic}' is not valid JSON: {payload[:100]}") + payload_json = None + + # Call user callback if provided + if self.message_callback: + self.message_callback(topic, payload_json or payload) + else: + # Default: just log + self.logger.info(f"Topic: {topic}") + if payload_json: + self.logger.info(f"Payload: {json.dumps(payload_json, indent=2)}") + else: + self.logger.info(f"Payload: {payload}") + + except Exception as e: + self.logger.error(f"Error processing message: {e}", exc_info=True) + + def connect(self): + """Connect to MQTT broker""" + try: + self.logger.info(f"Connecting to MQTT Broker {self.config['host']}:{self.config['port']}...") + self.client.connect( + self.config['host'], + self.config['port'], + self.config.get('keepalive', 60) + ) + return True + except Exception as e: + self.logger.error(f"Failed to connect to MQTT Broker: {e}") + return False + + def start(self): + """Start the MQTT client loop (non-blocking)""" + self.client.loop_start() + + def stop(self): + """Stop the MQTT client loop""" + self.client.loop_stop() + self.client.disconnect() + self.logger.info("MQTT Client stopped") + + def publish(self, topic: str, payload: dict): + """ + Publish message to MQTT topic + + Args: + topic: MQTT topic + payload: Message payload (will be JSON encoded) + """ + try: + payload_json = json.dumps(payload) + result = self.client.publish(topic, payload_json) + if result.rc == mqtt.MQTT_ERR_SUCCESS: + self.logger.debug(f"Published to {topic}") + return True + else: + self.logger.error(f"Failed to publish to {topic}, rc={result.rc}") + return False + except Exception as e: + self.logger.error(f"Error publishing to {topic}: {e}") + return False + + def wait_for_connection(self, timeout: int = 10) -> bool: + """ + Wait for connection to be established + + Args: + timeout: Maximum seconds to wait + + Returns: + True if connected, False if timeout + """ + start_time = time.time() + while not self.connected and (time.time() - start_time) < timeout: + time.sleep(0.1) + + return self.connected diff --git a/python_proto_type/requirements.txt b/python_proto_type/requirements.txt new file mode 100644 index 0000000..63fd606 --- /dev/null +++ b/python_proto_type/requirements.txt @@ -0,0 +1,17 @@ +# MQTT Client +paho-mqtt>=2.0.0 + +# Configuration +pyyaml>=6.0 + +# Logging & Utilities +python-dateutil>=2.8.2 + +# For UUID generation +uuid>=1.30 + +# Optional: für spätere JSON Schema Validation +jsonschema>=4.17.0 + +# Optional: für besseres Logging +colorlog>=6.7.0 diff --git a/python_proto_type/session_detector.py b/python_proto_type/session_detector.py new file mode 100644 index 0000000..c8bb2a2 --- /dev/null +++ b/python_proto_type/session_detector.py @@ -0,0 +1,514 @@ +""" +Session Detection Engine for Open Workshop IoT Bridge + +Detects machine run sessions with dual power thresholds: +- STANDBY: Machine on, not working (e.g. 20-100W) +- WORKING: Active work (e.g. >= 100W) + +Includes timeout detection for when machine powers off (no MQTT messages). +""" + +import logging +import uuid +from datetime import datetime, timezone, timedelta +from typing import Dict, Optional, List +from enum import Enum + + +class SessionState(Enum): + """Session detection states""" + IDLE = "idle" # Machine off (power < standby_threshold) + STARTING = "starting" # Power above standby threshold, waiting for debounce + STANDBY = "standby" # Machine on, not working (standby <= power < working) + WORKING = "working" # Active work (power >= working threshold) + STOPPING = "stopping" # Power below standby threshold, waiting for debounce + + +class SessionDetector: + """ + Detects machine run sessions with dual thresholds and timeout detection + + State Machine: + IDLE -> STARTING -> STANDBY/WORKING -> STOPPING -> IDLE + + - IDLE: Power < standby_threshold + - STARTING: Power >= standby_threshold for < start_debounce_s + - STANDBY: Power >= standby_threshold < working_threshold (confirmed) + - WORKING: Power >= working_threshold (confirmed) + - STOPPING: Power < standby_threshold for < stop_debounce_s + - Back to IDLE: Power < standby_threshold for >= stop_debounce_s OR timeout + + Timeout: No MQTT message for > message_timeout_s → session_end (machine powered off) + """ + + def __init__(self, device_config: list = None): + """ + Initialize session detector + + Args: + device_config: List of device configurations from config.yaml + """ + self.logger = logging.getLogger(__name__) + self.device_config = device_config or [] + + # Build machine_id to config mapping + self.machine_config = {} + for device in self.device_config: + machine_id = device.get('machine_id') + if machine_id: + self.machine_config[machine_id] = { + 'standby_threshold_w': device.get('standby_threshold_w', 20), + 'working_threshold_w': device.get('working_threshold_w', 100), + 'start_debounce_s': device.get('start_debounce_s', 3), + 'stop_debounce_s': device.get('stop_debounce_s', 15), + 'message_timeout_s': device.get('message_timeout_s', 60), + 'machine_name': device.get('machine_name', 'Unknown'), + } + + # State tracking per machine + self.machine_states = {} # machine_id -> state info + + def restore_state(self, running_sessions: List[Dict]) -> None: + """ + Restore state from running sessions after bridge restart + + Args: + running_sessions: List of session dicts with end_time=None from EventStorage + """ + if not running_sessions: + self.logger.info("No running sessions to restore") + return + + self.logger.info(f"Restoring state for {len(running_sessions)} running sessions") + + for session in running_sessions: + machine_id = session.get('machine_id') + session_id = session.get('session_id') + + if not machine_id or not session_id: + self.logger.warning(f"Invalid session data, skipping: {session}") + continue + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + self.logger.warning(f"No config for machine {machine_id}, cannot restore") + continue + + # Parse start time + try: + start_time = datetime.fromisoformat(session['start_time'].replace('Z', '+00:00')) + except Exception as e: + self.logger.error(f"Invalid start_time in session {session_id}: {e}") + continue + + # Restore machine state + self.machine_states[machine_id] = { + 'state': SessionState.STANDBY, # Start in STANDBY, will update on next message + 'state_since': start_time, + 'current_session_id': session_id, + 'session_start_time': start_time, + 'last_power': session.get('start_power_w'), + 'last_message_time': start_time, + 'standby_duration_s': session.get('standby_duration_s', 0), + 'working_duration_s': session.get('working_duration_s', 0), + 'last_state_change': start_time, + } + + self.logger.info( + f"✓ Restored session {session_id} for {session.get('machine_name', machine_id)}" + ) + + def check_timeouts(self) -> List[Dict]: + """ + Check all machines for message timeouts and end sessions if needed. + Should be called periodically (e.g. every second). + + Returns: + List of session_end events for timed-out sessions + """ + timeout_events = [] + current_time = datetime.now(timezone.utc) + + for machine_id, state_info in self.machine_states.items(): + # Only check machines in active session states + if state_info['state'] not in [SessionState.STANDBY, SessionState.WORKING]: + continue + + # Skip if no last message time + if not state_info['last_message_time']: + continue + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + continue + + # Check if timeout exceeded + time_since_last_message = (current_time - state_info['last_message_time']).total_seconds() + if time_since_last_message > config['message_timeout_s']: + machine_name = config.get('machine_name', 'Unknown') + self.logger.warning( + f"⏱️ {machine_name}: Message timeout " + f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END" + ) + + # End session with timeout + timeout_event = self._end_session_timeout(machine_id, machine_name, current_time, config, state_info) + if timeout_event: + timeout_events.append(timeout_event) + + return timeout_events + + def process_event(self, event: Dict) -> Optional[Dict]: + """ + Process a normalized event and detect session changes + + Args: + event: Normalized event from EventNormalizer + + Returns: + Session event dict if state change occurred, None otherwise + + Session Event Format: + { + "session_id": "uuid4", + "event_type": "session_start" | "session_end", + "timestamp": "ISO 8601 UTC", + "machine": { "machine_id": "...", "machine_name": "..." }, + "power_w": 123.4, + "session_data": { + "start_time": "ISO 8601 UTC", + "end_time": "ISO 8601 UTC", # only for session_end + "duration_s": 123 # only for session_end + } + } + """ + try: + # Extract machine info + machine = event.get('machine', {}) + machine_id = machine.get('machine_id') + + if not machine_id: + self.logger.warning("Event missing machine_id, skipping") + return None + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + self.logger.warning(f"No config found for machine {machine_id}") + return None + + # Extract power measurement + metrics = event.get('metrics', {}) + power_w = metrics.get('power_w') + + if power_w is None: + self.logger.debug(f"Event missing power_w, skipping") + return None + + # Initialize machine state if needed + if machine_id not in self.machine_states: + self.machine_states[machine_id] = { + 'state': SessionState.IDLE, + 'state_since': datetime.now(timezone.utc), + 'current_session_id': None, + 'session_start_time': None, + 'last_power': None, + 'last_message_time': None, + 'standby_duration_s': 0, + 'working_duration_s': 0, + 'last_state_change': datetime.now(timezone.utc), + } + + state_info = self.machine_states[machine_id] + timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00')) + machine_name = machine.get('machine_name', 'Unknown') + + # Check for timeout (no message received) + if state_info['last_message_time']: + time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds() + if time_since_last_message > config['message_timeout_s']: + if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]: + self.logger.warning( + f"{machine_name}: Message timeout " + f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END" + ) + return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info) + + # Update last message time + state_info['last_message_time'] = timestamp + + # Update last power + state_info['last_power'] = power_w + + # Process state machine + return self._process_state_machine( + machine_id=machine_id, + machine_name=machine.get('machine_name'), + power_w=power_w, + timestamp=timestamp, + config=config, + state_info=state_info + ) + + except Exception as e: + self.logger.error(f"Error processing event: {e}", exc_info=True) + return None + + def _process_state_machine( + self, + machine_id: str, + machine_name: str, + power_w: float, + timestamp: datetime, + config: Dict, + state_info: Dict + ) -> Optional[Dict]: + """ + Process state machine logic with dual thresholds + + Returns session event if state change occurred + """ + current_state = state_info['state'] + standby_threshold = config['standby_threshold_w'] + working_threshold = config['working_threshold_w'] + start_debounce = config['start_debounce_s'] + stop_debounce = config['stop_debounce_s'] + + time_in_state = (timestamp - state_info['state_since']).total_seconds() + + # Update duration tracking for active states + if current_state == SessionState.STANDBY: + time_since_last_update = (timestamp - state_info['last_state_change']).total_seconds() + state_info['standby_duration_s'] += time_since_last_update + state_info['last_state_change'] = timestamp + elif current_state == SessionState.WORKING: + time_since_last_update = (timestamp - state_info['last_state_change']).total_seconds() + state_info['working_duration_s'] += time_since_last_update + state_info['last_state_change'] = timestamp + + # State machine transitions + if current_state == SessionState.IDLE: + if power_w >= standby_threshold: + # Transition to STARTING + self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {standby_threshold}W → STARTING") + state_info['state'] = SessionState.STARTING + state_info['state_since'] = timestamp + + elif current_state == SessionState.STARTING: + if power_w < standby_threshold: + # False start, back to IDLE + self.logger.info(f"⚪ {machine_name}: Power dropped before debounce → IDLE") + state_info['state'] = SessionState.IDLE + state_info['state_since'] = timestamp + + elif time_in_state >= start_debounce: + # Debounce passed, transition to STANDBY or WORKING + session_id = str(uuid.uuid4()) + state_info['current_session_id'] = session_id + state_info['session_start_time'] = timestamp + state_info['standby_duration_s'] = 0 + state_info['working_duration_s'] = 0 + state_info['last_state_change'] = timestamp + + if power_w >= working_threshold: + state_info['state'] = SessionState.WORKING + self.logger.info(f"🔵 {machine_name}: Session START → WORKING (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W") + else: + state_info['state'] = SessionState.STANDBY + self.logger.info(f"🟢 {machine_name}: Session START → STANDBY (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W") + + state_info['state_since'] = timestamp + + # Generate session_start event + return { + 'session_id': session_id, + 'event_type': 'session_start', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': power_w, + 'session_data': { + 'start_time': timestamp.isoformat().replace('+00:00', 'Z') + } + } + + elif current_state == SessionState.STANDBY: + if power_w < standby_threshold: + # Transition to STOPPING + self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {standby_threshold}W → STOPPING") + state_info['state'] = SessionState.STOPPING + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif power_w >= working_threshold: + # Transition to WORKING + self.logger.info(f"🔵 {machine_name}: Power {power_w:.1f}W >= {working_threshold}W → WORKING") + state_info['state'] = SessionState.WORKING + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif current_state == SessionState.WORKING: + if power_w < standby_threshold: + # Transition to STOPPING + self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {standby_threshold}W → STOPPING") + state_info['state'] = SessionState.STOPPING + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif power_w < working_threshold: + # Transition to STANDBY + self.logger.info(f"🟢 {machine_name}: Power {power_w:.1f}W < {working_threshold}W → STANDBY") + state_info['state'] = SessionState.STANDBY + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + elif current_state == SessionState.STOPPING: + if power_w >= standby_threshold: + # Power back up, cancel STOPPING + if power_w >= working_threshold: + self.logger.info(f"🔵 {machine_name}: Power back up → WORKING") + state_info['state'] = SessionState.WORKING + else: + self.logger.info(f"🟢 {machine_name}: Power back up → STANDBY") + state_info['state'] = SessionState.STANDBY + state_info['state_since'] = timestamp + state_info['last_state_change'] = timestamp + + else: + # Power still low, check if debounce passed + if time_in_state >= stop_debounce: + # Debounce passed, session ended + return self._end_session_normal(machine_id, machine_name, power_w, timestamp, state_info) + + return None + + def _end_session_normal( + self, + machine_id: str, + machine_name: str, + power_w: float, + timestamp: datetime, + state_info: Dict + ) -> Dict: + """End session normally (power drop)""" + session_id = state_info['current_session_id'] + start_time = state_info['session_start_time'] + duration_s = (timestamp - start_time).total_seconds() + + standby_duration = state_info.get('standby_duration_s') or 0 + working_duration = state_info.get('working_duration_s') or 0 + + self.logger.info( + f"🔴 {machine_name}: Session END (power drop) - " + f"Total: {duration_s:.0f}s, Standby: {standby_duration:.0f}s, Working: {working_duration:.0f}s" + ) + + # Generate session_end event + session_event = { + 'session_id': session_id, + 'event_type': 'session_end', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': power_w, + 'session_data': { + 'start_time': start_time.isoformat().replace('+00:00', 'Z'), + 'end_time': timestamp.isoformat().replace('+00:00', 'Z'), + 'total_duration_s': int(duration_s), + 'standby_duration_s': int(standby_duration), + 'working_duration_s': int(working_duration), + 'end_reason': 'power_drop' + } + } + + # Reset to IDLE + self._reset_session_state(state_info, timestamp) + + return session_event + + def _end_session_timeout( + self, + machine_id: str, + machine_name: str, + timestamp: datetime, + config: Dict, + state_info: Dict + ) -> Dict: + """End session due to timeout (no messages)""" + session_id = state_info['current_session_id'] + start_time = state_info['session_start_time'] + duration_s = (timestamp - start_time).total_seconds() + + standby_duration = state_info.get('standby_duration_s') or 0 + working_duration = state_info.get('working_duration_s') or 0 + + self.logger.warning( + f"⏱️ {machine_name}: Session END (TIMEOUT) - " + f"Total: {duration_s:.0f}s, Standby: {standby_duration:.0f}s, Working: {working_duration:.0f}s" + ) + + # Generate session_end event + session_event = { + 'session_id': session_id, + 'event_type': 'session_end', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': 0.0, # Assume power is 0 on timeout + 'session_data': { + 'start_time': start_time.isoformat().replace('+00:00', 'Z'), + 'end_time': timestamp.isoformat().replace('+00:00', 'Z'), + 'total_duration_s': int(duration_s), + 'standby_duration_s': int(standby_duration), + 'working_duration_s': int(working_duration), + 'end_reason': 'timeout' + } + } + + # Reset to IDLE + self._reset_session_state(state_info, timestamp) + + return session_event + + def _reset_session_state(self, state_info: Dict, timestamp: datetime): + """Reset session state to IDLE""" + state_info['state'] = SessionState.IDLE + state_info['state_since'] = timestamp + state_info['current_session_id'] = None + state_info['session_start_time'] = None + state_info['standby_duration_s'] = 0 + state_info['working_duration_s'] = 0 + state_info['last_state_change'] = timestamp + + def get_machine_state(self, machine_id: str) -> Optional[str]: + """Get current state of a machine""" + state_info = self.machine_states.get(machine_id) + if state_info: + return state_info['state'].value + return None + + def get_active_sessions(self) -> Dict: + """ + Get all currently active sessions + + Returns dict: machine_id -> session info + """ + active_sessions = {} + + for machine_id, state_info in self.machine_states.items(): + if state_info['state'] == SessionState.RUNNING: + active_sessions[machine_id] = { + 'session_id': state_info['current_session_id'], + 'start_time': state_info['session_start_time'].isoformat().replace('+00:00', 'Z'), + 'current_power': state_info['last_power'] + } + + return active_sessions diff --git a/python_proto_type/setup.sh b/python_proto_type/setup.sh new file mode 100755 index 0000000..a02a885 --- /dev/null +++ b/python_proto_type/setup.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Setup script for Open Workshop IoT Bridge Python Prototype + +echo "===================================" +echo "Open Workshop IoT Bridge - Setup" +echo "===================================" +echo "" + +# Check if Python 3 is installed +if ! command -v python3 &> /dev/null; then + echo "Error: Python 3 is not installed" + exit 1 +fi + +echo "Python version: $(python3 --version)" +echo "" + +# Create virtual environment +echo "Creating virtual environment..." +python3 -m venv venv + +if [ $? -ne 0 ]; then + echo "Error: Failed to create virtual environment" + exit 1 +fi + +echo "✓ Virtual environment created" +echo "" + +# Activate virtual environment +echo "Activating virtual environment..." +source venv/bin/activate + +# Upgrade pip +echo "Upgrading pip..." +pip install --upgrade pip + +# Install requirements +echo "" +echo "Installing dependencies from requirements.txt..." +pip install -r requirements.txt + +if [ $? -ne 0 ]; then + echo "Error: Failed to install requirements" + exit 1 +fi + +echo "" +echo "✓ Dependencies installed" +echo "" + +# Check if config.yaml exists +if [ ! -f "config.yaml" ]; then + echo "Creating config.yaml from template..." + cp config.yaml.example config.yaml + echo "✓ config.yaml created" + echo "" + echo "⚠️ Please edit config.yaml with your MQTT broker settings!" + echo "" +fi + +# Create data and logs directories +mkdir -p data logs + +echo "===================================" +echo "Setup complete!" +echo "===================================" +echo "" +echo "Next steps:" +echo " 1. Edit config.yaml with your MQTT broker settings" +echo " 2. Activate venv: source venv/bin/activate" +echo " 3. Run the bridge: python main.py" +echo "" diff --git a/python_proto_type/shelly_parser.py b/python_proto_type/shelly_parser.py new file mode 100644 index 0000000..bc8b0f9 --- /dev/null +++ b/python_proto_type/shelly_parser.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +""" +Shelly PM Mini G3 Parser +Parst MQTT Messages vom Shelly PM Mini G3 +""" + +import logging +from typing import Dict, Optional +from datetime import datetime + + +class ShellyParser: + """Parser für Shelly PM Mini G3 MQTT Messages""" + + def __init__(self, device_config: list = None): + """ + Initialize parser + + Args: + device_config: List of device configurations from config.yaml + """ + self.logger = logging.getLogger(__name__) + self.device_config = device_config or [] + + # Build topic-prefix to machine mapping + self.prefix_machine_map = {} + for device in self.device_config: + topic_prefix = device.get('topic_prefix') + if topic_prefix: + self.prefix_machine_map[topic_prefix] = { + 'machine_id': device.get('machine_id'), + 'machine_name': device.get('machine_name'), + 'device_id': None # Will be extracted from messages + } + + def parse_message(self, topic: str, payload: dict) -> Optional[Dict]: + """ + Parse Shelly MQTT message + + Args: + topic: MQTT topic + payload: Message payload (already parsed as JSON) + + Returns: + Parsed data dict or None if not a relevant message + """ + try: + # Ignore debug logs + if 'debug/log' in topic: + return None + + # Parse different message types + if '/status/pm1:0' in topic: + return self._parse_status_message(topic, payload) + elif '/events/rpc' in topic: + return self._parse_rpc_event(topic, payload) + elif '/telemetry' in topic: + return self._parse_telemetry(topic, payload) + + return None + + except Exception as e: + self.logger.error(f"Error parsing message from {topic}: {e}", exc_info=True) + return None + + def _parse_status_message(self, topic: str, payload: dict) -> Dict: + """ + Parse full status message + Topic: shaperorigin/status/pm1:0 + + Payload format (Shelly PM Mini G3): + { + "id": 0, + "voltage": 230.0, + "current": 0.217, + "apower": 50.0, + "freq": 50.0, + "aenergy": {"total": 12345.6, "by_minute": [...], "minute_ts": 1234567890}, + "temperature": {"tC": 35.2, "tF": 95.4} + } + """ + try: + # Extract device ID from topic prefix + device_id = self._extract_device_id_from_topic(topic) + + data = { + 'message_type': 'status', + 'device_id': device_id, + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'voltage': payload.get('voltage'), + 'current': payload.get('current'), + 'apower': payload.get('apower'), # Active Power in Watts + 'frequency': payload.get('freq'), + 'total_energy': payload.get('aenergy', {}).get('total'), + } + + self.logger.debug(f"Parsed status message: apower={data['apower']}W") + return data + + except Exception as e: + self.logger.error(f"Error parsing status message: {e}", exc_info=True) + return None + + def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]: + """ + Parse RPC NotifyStatus event + Topic: shellypmminig3/events/rpc + """ + try: + if payload.get('method') != 'NotifyStatus': + return None + + device_id = payload.get('src', '').replace('shellypmminig3-', '') + params = payload.get('params', {}) + pm_data = params.get('pm1:0', {}) + + # Get timestamp from params or use current time + ts = params.get('ts') + if ts: + timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' + else: + timestamp = datetime.utcnow().isoformat() + 'Z' + + data = { + 'message_type': 'event', + 'device_id': device_id, + 'timestamp': timestamp, + 'apower': pm_data.get('apower'), + 'current': pm_data.get('current'), + 'voltage': pm_data.get('voltage'), + } + + # Only return if we have actual data + if data['apower'] is not None or data['current'] is not None: + self.logger.debug(f"Parsed RPC event: {pm_data}") + return data + + return None + + except Exception as e: + self.logger.error(f"Error parsing RPC event: {e}", exc_info=True) + return None + + def _parse_telemetry(self, topic: str, payload: dict) -> Dict: + """ + Parse telemetry message + Topic: shelly/pmmini/shellypmminig3-48f6eeb73a1c/telemetry + """ + # Extract device ID from topic + parts = topic.split('/') + device_id = parts[2] if len(parts) > 2 else 'unknown' + device_id = device_id.replace('shellypmminig3-', '') + + # Get timestamp + ts = payload.get('ts') + if ts: + timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' + else: + timestamp = datetime.utcnow().isoformat() + 'Z' + + data = { + 'message_type': 'telemetry', + 'device_id': device_id, + 'timestamp': timestamp, + 'voltage': payload.get('voltage_v'), + 'current': payload.get('current_a'), + 'apower': payload.get('power_w'), # Active Power in Watts + 'frequency': payload.get('freq_hz'), + 'total_energy': payload.get('energy_wh'), + } + + self.logger.debug(f"Parsed telemetry: apower={data['apower']}W") + return data + + def _extract_device_id_from_topic(self, topic: str) -> str: + """Extract device ID (topic prefix) from topic string""" + # Topic format: shaperorigin/status/pm1:0 + # Extract the prefix (shaperorigin) + parts = topic.split('/') + if len(parts) > 0: + topic_prefix = parts[0] + # Check if this prefix is in our config + if topic_prefix in self.prefix_machine_map: + # Use the actual device_id if we've learned it, otherwise use prefix + device_info = self.prefix_machine_map[topic_prefix] + return device_info.get('device_id') or topic_prefix + return topic_prefix + return 'unknown' + + def get_power_value(self, parsed_data: Dict) -> Optional[float]: + """ + Extract power value from parsed data + + Returns: + Power in Watts or None + """ + return parsed_data.get('apower') + + def get_device_id(self, parsed_data: Dict) -> str: + """Get device ID from parsed data""" + return parsed_data.get('device_id', 'unknown') diff --git a/python_proto_type/tests/__init__.py b/python_proto_type/tests/__init__.py new file mode 100644 index 0000000..73e40f0 --- /dev/null +++ b/python_proto_type/tests/__init__.py @@ -0,0 +1,3 @@ +""" +Tests für Open Workshop IoT Bridge +""" diff --git a/python_proto_type/tests/integration/__init__.py b/python_proto_type/tests/integration/__init__.py new file mode 100644 index 0000000..343db61 --- /dev/null +++ b/python_proto_type/tests/integration/__init__.py @@ -0,0 +1,3 @@ +""" +Integration Tests - Testen mit echten externen Services (MQTT Broker, Storage) +""" diff --git a/python_proto_type/tests/integration/test_mqtt_integration.py b/python_proto_type/tests/integration/test_mqtt_integration.py new file mode 100644 index 0000000..ce61021 --- /dev/null +++ b/python_proto_type/tests/integration/test_mqtt_integration.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python3 +""" +Integration Tests mit echtem MQTT Broker + +Testet die komplette Pipeline: +MQTT → Parser → Normalizer → SessionDetector → Storage + +Usage: + pytest test_mqtt_integration.py -v -s +""" + +import pytest +import subprocess +import time +import json +import signal +import os +from pathlib import Path +from datetime import datetime +import paho.mqtt.client as mqtt_client +import ssl + + +@pytest.fixture(scope="session") +def mqtt_config(): + """MQTT Broker Konfiguration aus Umgebungsvariablen oder config.yaml""" + + # Option 1: Aus Umgebungsvariablen + if os.getenv('MQTT_HOST'): + return { + 'host': os.getenv('MQTT_HOST'), + 'port': int(os.getenv('MQTT_PORT', '8883')), + 'username': os.getenv('MQTT_USERNAME'), + 'password': os.getenv('MQTT_PASSWORD'), + 'topic_prefix': os.getenv('MQTT_TEST_TOPIC_PREFIX', 'pytest-test') + } + + # Option 2: Aus config.yaml lesen + config_path = Path(__file__).parent.parent.parent / 'config.yaml' + if config_path.exists(): + import yaml + with open(config_path) as f: + config = yaml.safe_load(f) + + mqtt_conf = config.get('mqtt', {}) + return { + 'host': mqtt_conf.get('host'), + 'port': mqtt_conf.get('port', 8883), + 'username': mqtt_conf.get('username'), + 'password': mqtt_conf.get('password'), + 'topic_prefix': 'pytest-test' # Immer separates Topic für Tests + } + + pytest.skip("Keine MQTT Konfiguration gefunden. Setze Umgebungsvariablen oder erstelle config.yaml") + + +@pytest.fixture(scope="session") +def workspace_dir(): + """Workspace-Verzeichnis""" + return Path(__file__).parent + + +@pytest.fixture(scope="session") +def test_config_file(workspace_dir, mqtt_config): + """Erstellt temporäre test_config.yaml""" + # Relative Pfade zu den Test Storage Files (relativ zum Projektroot) + config_content = f""" +mqtt: + host: "{mqtt_config['host']}" + port: {mqtt_config['port']} + username: "{mqtt_config['username']}" + password: "{mqtt_config['password']}" + client_id: "ows_iot_bridge_pytest" + keepalive: 60 + topics: + - "{mqtt_config['topic_prefix']}-m1/#" + - "{mqtt_config['topic_prefix']}-m2/#" +devices: + - topic_prefix: "{mqtt_config['topic_prefix']}-m1" + machine_name: "PyTest Machine" + machine_id: "pytest-machine-01" + device_type: "shelly_pm_mini_g3" + standby_threshold_w: 20 + working_threshold_w: 100 + start_debounce_s: 3 + stop_debounce_s: 15 + message_timeout_s: 20 + enabled: true + - topic_prefix: "{mqtt_config['topic_prefix']}-m2" + machine_name: "PyTest CNC" + machine_id: "pytest-machine-02" + device_type: "shelly_pm_mini_g3" + standby_threshold_w: 30 + working_threshold_w: 150 + start_debounce_s: 3 + stop_debounce_s: 15 + message_timeout_s: 20 + enabled: true + +logging: + level: "INFO" + console: true + +output: + events_file: "data/test_events.jsonl" + sessions_file: "data/test_sessions.json" +""" + + config_path = workspace_dir / "test_config.yaml" + config_path.write_text(config_content) + + yield config_path + + # Cleanup + if config_path.exists(): + config_path.unlink() + + +@pytest.fixture(scope="session") +def test_storage_files(workspace_dir): + """Gibt Pfade zu Test-Storage-Dateien zurück (werden von Bridge erstellt)""" + # Test-Dateien im Projektroot data/ Verzeichnis + project_root = workspace_dir.parent.parent + data_dir = project_root / "data" + + events_file = data_dir / "test_events.jsonl" + sessions_file = data_dir / "test_sessions.json" + + # NICHT löschen - zum Debugging beibehalten! + + yield { + 'events': events_file, + 'sessions': sessions_file + } + + # Cleanup nach allen Tests (optional auskommentiert) + # if events_file.exists(): + # events_file.unlink() + # if sessions_file.exists(): + # sessions_file.unlink() + + +@pytest.fixture(scope="module") +def bridge_process(workspace_dir, test_config_file, test_storage_files): + """Startet die IoT Bridge als Subprocess""" + + # Bridge starten mit test_config.yaml + # cwd muss das Projektroot sein, nicht tests/integration/ + project_root = workspace_dir.parent.parent + config_path = workspace_dir / 'test_config.yaml' + + env = os.environ.copy() + env['PYTHONUNBUFFERED'] = '1' + + # Use python from venv explicitly + python_exe = project_root / 'venv' / 'bin' / 'python' + bridge_log = workspace_dir / 'bridge_output.log' + + with open(bridge_log, 'w') as log_file: + process = subprocess.Popen( + [str(python_exe), 'main.py', '--config', str(config_path)], + cwd=project_root, + env=env, + stdout=log_file, + stderr=subprocess.STDOUT, + text=True + ) + + # Warten bis Bridge gestartet ist + time.sleep(3) + + # Prüfen ob Prozess läuft + if process.poll() is not None: + with open(bridge_log, 'r') as f: + output = f.read() + pytest.fail(f"Bridge konnte nicht gestartet werden:\n{output}") + + # Debug: Bridge Output ausgeben + with open(bridge_log, 'r') as f: + output = f.read() + if output: + print(f"\n=== Bridge Output ===\n{output}\n===\n") + + yield process + + # Bridge sauber beenden + process.send_signal(signal.SIGTERM) + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + + +@pytest.fixture +def mqtt_sender(mqtt_config): + """MQTT Client zum Senden von Test-Messages""" + + client_id = f"pytest-sender-{int(time.time())}" + client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5) + client.username_pw_set(mqtt_config['username'], mqtt_config['password']) + + # TLS/SSL + client.tls_set(cert_reqs=ssl.CERT_NONE) + client.tls_insecure_set(True) + + client.connect(mqtt_config['host'], mqtt_config['port'], keepalive=60) + client.loop_start() + time.sleep(1) + + yield client + + client.loop_stop() + client.disconnect() + + +def send_shelly_message(mqtt_sender, topic: str, power_w: float): + """Sendet eine Shelly PM Mini G3 Status Message""" + message = { + "id": 0, + "voltage": 230.0, + "current": round(power_w / 230.0, 3), + "apower": power_w, + "freq": 50.0, + "aenergy": { + "total": 12345.6, + "by_minute": [0.0, 0.0, 0.0], + "minute_ts": int(time.time()) + }, + "temperature": { + "tC": 35.2, + "tF": 95.4 + } + } + + mqtt_sender.publish(topic, json.dumps(message), qos=1) + time.sleep(0.1) # Kurze Pause zwischen Messages + + +def read_sessions(sessions_file: Path): + """Liest sessions.json""" + if not sessions_file.exists(): + return [] + + content = sessions_file.read_text().strip() + if not content or content == "[]": + return [] + + return json.loads(content) + + +def wait_for_session_count(sessions_file: Path, expected_count: int, timeout: int = 10): + """Wartet bis die erwartete Anzahl Sessions vorhanden ist""" + start_time = time.time() + + while time.time() - start_time < timeout: + sessions = read_sessions(sessions_file) + if len(sessions) >= expected_count: + return sessions + time.sleep(0.5) + + sessions = read_sessions(sessions_file) + pytest.fail( + f"Timeout: Erwartete {expected_count} Sessions, gefunden: {len(sessions)}\n" + f"Sessions: {json.dumps(sessions, indent=2)}" + ) + + +class TestMQTTIntegration: + """Integration Tests mit echtem MQTT""" + + def test_bridge_is_running(self, bridge_process): + """Bridge läuft erfolgreich""" + assert bridge_process.poll() is None, "Bridge Prozess ist abgestürzt" + + def test_session_start_standby(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Session Start → STANDBY""" + topic = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + # Anzahl Sessions vor dem Test + initial_count = len(read_sessions(sessions_file)) + + # Power ansteigen lassen - wie reale Maschine + send_shelly_message(mqtt_sender, topic, 0) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 30) # STARTING + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) # Nach 3s → STANDBY + + # Warten auf Session-Start (3s debounce + Verarbeitung) + time.sleep(3) + + # Session prüfen + sessions = wait_for_session_count(sessions_file, initial_count + 1, timeout=5) + + # Neueste Session + latest = sessions[-1] + assert latest['machine_id'] == 'pytest-machine-01' + assert latest['status'] == 'running' + assert latest['start_power_w'] >= 20 + + def test_session_end_with_stop_debounce(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Session Ende mit stop_debounce (15s unter 20W)""" + topic = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + initial_count = len(read_sessions(sessions_file)) + + # Session starten + send_shelly_message(mqtt_sender, topic, 0) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(2) # Session sollte jetzt laufen + + # STANDBY für 3 Sekunden + for _ in range(3): + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + + # Power runterfahren → STOPPING + send_shelly_message(mqtt_sender, topic, 15) + + # STOPPING für 16 Sekunden (> 15s stop_debounce) + for i in range(16): + send_shelly_message(mqtt_sender, topic, 10 - i * 0.5) + time.sleep(1) + + # Warten auf Session-Ende + time.sleep(2) + + # Prüfen ob Session beendet wurde + sessions = read_sessions(sessions_file) + + # Finde die Session die gerade beendet wurde + completed_sessions = [s for s in sessions if s.get('status') == 'completed' and s.get('end_reason') == 'power_drop'] + + assert len(completed_sessions) > 0, f"Keine completed Session mit power_drop gefunden. Sessions: {json.dumps(sessions[-3:], indent=2)}" + + latest = completed_sessions[-1] + assert latest['end_reason'] == 'power_drop' + assert latest['total_duration_s'] is not None + assert latest['standby_duration_s'] is not None + + def test_standby_to_working_transition(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """STANDBY → WORKING Transition""" + topic = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + # Session im STANDBY starten + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(2) + + # STANDBY halten + for _ in range(3): + send_shelly_message(mqtt_sender, topic, 60) + time.sleep(1) + + # → WORKING + send_shelly_message(mqtt_sender, topic, 120) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 150) + + # WORKING halten + for _ in range(5): + send_shelly_message(mqtt_sender, topic, 150) + time.sleep(1) + + # Session beenden + send_shelly_message(mqtt_sender, topic, 10) + for _ in range(16): + send_shelly_message(mqtt_sender, topic, 5) + time.sleep(1) + + time.sleep(2) + + # Prüfen + sessions = read_sessions(sessions_file) + completed = [s for s in sessions if s.get('status') == 'completed'] + + assert len(completed) > 0 + latest = completed[-1] + + # Sollte sowohl STANDBY als auch WORKING Zeit haben + assert latest.get('standby_duration_s', 0) > 0, "Keine STANDBY Zeit" + assert latest.get('working_duration_s', 0) > 0, "Keine WORKING Zeit" + + def test_timeout_detection(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Timeout Detection (20s keine Messages)""" + topic = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + initial_count = len(read_sessions(sessions_file)) + + # Session starten + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(1) + send_shelly_message(mqtt_sender, topic, 50) + time.sleep(2) + + # Letzte Message + send_shelly_message(mqtt_sender, topic, 50) + + # 25 Sekunden warten (> 20s timeout) + print("\n⏱️ Warte 25 Sekunden für Timeout Detection...") + time.sleep(25) + + # Prüfen ob Session mit timeout beendet wurde + sessions = read_sessions(sessions_file) + timeout_sessions = [s for s in sessions if s.get('end_reason') == 'timeout'] + + assert len(timeout_sessions) > 0, f"Keine timeout Session gefunden. Sessions: {json.dumps(sessions[-3:], indent=2)}" + + def test_multi_device_parallel_sessions(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Zwei Maschinen starten parallel Sessions""" + topic_machine1 = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + topic_machine2 = f"{mqtt_config['topic_prefix']}-m2/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + initial_count = len(read_sessions(sessions_file)) + + print("\n🔄 Starte parallele Sessions auf 2 Maschinen...") + + # Maschine 1: Session starten (20W threshold) + send_shelly_message(mqtt_sender, topic_machine1, 50) + time.sleep(0.5) + + # Maschine 2: Session starten (30W threshold) + send_shelly_message(mqtt_sender, topic_machine2, 80) + time.sleep(0.5) + + # Beide Maschinen halten + for _ in range(4): + send_shelly_message(mqtt_sender, topic_machine1, 60) + send_shelly_message(mqtt_sender, topic_machine2, 100) + time.sleep(1) + + # Warten auf Sessions + time.sleep(3) + + # Beide Maschinen sollten jetzt laufende Sessions haben + sessions = read_sessions(sessions_file) + new_sessions = [s for s in sessions if len(sessions) > initial_count][-2:] # Letzte 2 neue Sessions + running_sessions = [s for s in new_sessions if s.get('status') == 'running'] + + assert len(running_sessions) == 2, f"Erwartet exakt 2 neue laufende Sessions, gefunden: {len(running_sessions)}" + + # Prüfen dass beide machine_ids vorhanden sind + machine_ids = [s['machine_id'] for s in running_sessions] + assert 'pytest-machine-01' in machine_ids, "Maschine 1 Session fehlt" + assert 'pytest-machine-02' in machine_ids, "Maschine 2 Session fehlt" + + print(f"✅ 2 parallele Sessions erfolgreich: {machine_ids}") + + def test_multi_device_independent_timeouts(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Maschine 1 Timeout, Maschine 2 läuft weiter""" + topic_machine1 = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + topic_machine2 = f"{mqtt_config['topic_prefix']}-m2/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + print("\n⏱️ Teste unabhängige Timeouts...") + + # Beide Maschinen starten + send_shelly_message(mqtt_sender, topic_machine1, 50) + send_shelly_message(mqtt_sender, topic_machine2, 80) + time.sleep(1) + + for _ in range(3): + send_shelly_message(mqtt_sender, topic_machine1, 50) + send_shelly_message(mqtt_sender, topic_machine2, 80) + time.sleep(1) + + time.sleep(2) + + # Maschine 1: Letzte Message, dann Timeout + send_shelly_message(mqtt_sender, topic_machine1, 50) + + # Maschine 2: Läuft weiter + for _ in range(26): # 26 Sekunden + send_shelly_message(mqtt_sender, topic_machine2, 80) + time.sleep(1) + + # Prüfen + sessions = read_sessions(sessions_file) + + # Maschine 1 sollte timeout haben + m1_sessions = [s for s in sessions if s['machine_id'] == 'pytest-machine-01'] + timeout_session = [s for s in m1_sessions if s.get('end_reason') == 'timeout'] + assert len(timeout_session) > 0, "Maschine 1 sollte Timeout-Session haben" + + # Maschine 2 sollte noch laufen + m2_sessions = [s for s in sessions if s['machine_id'] == 'pytest-machine-02'] + running_session = [s for s in m2_sessions if s.get('status') == 'running'] + assert len(running_session) > 0, "Maschine 2 sollte noch laufen" + + print("✅ Unabhängige Timeouts funktionieren") + + def test_multi_device_different_thresholds(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files): + """Unterschiedliche Power-Thresholds pro Maschine""" + topic_machine1 = f"{mqtt_config['topic_prefix']}-m1/status/pm1:0" + topic_machine2 = f"{mqtt_config['topic_prefix']}-m2/status/pm1:0" + sessions_file = test_storage_files['sessions'] + + # Cleanup: Alte Sessions löschen für sauberen Test + if sessions_file.exists(): + sessions_file.unlink() + + print("\n🔧 Teste unterschiedliche Thresholds...") + + # Maschine 1: 25W (über 20W threshold) → sollte starten + for _ in range(4): + send_shelly_message(mqtt_sender, topic_machine1, 25) + time.sleep(1) + + time.sleep(3) + + # Maschine 2: 25W (unter 30W threshold) → sollte NICHT starten + for _ in range(4): + send_shelly_message(mqtt_sender, topic_machine2, 25) + time.sleep(1) + + time.sleep(3) + + sessions = read_sessions(sessions_file) + + # Maschine 1 sollte Session haben + m1_sessions = [s for s in sessions if s['machine_id'] == 'pytest-machine-01' and s.get('status') == 'running'] + assert len(m1_sessions) > 0, "Maschine 1 sollte bei 25W Session haben (threshold 20W)" + + # Maschine 2 sollte KEINE Session haben + m2_sessions = [s for s in sessions if s['machine_id'] == 'pytest-machine-02' and s.get('status') == 'running'] + assert len(m2_sessions) == 0, "Maschine 2 sollte bei 25W KEINE Session haben (threshold 30W)" + + print("✅ Unterschiedliche Thresholds funktionieren") + + +if __name__ == '__main__': + pytest.main([__file__, '-v', '-s']) diff --git a/python_proto_type/tests/tools/__init__.py b/python_proto_type/tests/tools/__init__.py new file mode 100644 index 0000000..ccc6070 --- /dev/null +++ b/python_proto_type/tests/tools/__init__.py @@ -0,0 +1,3 @@ +""" +Test Tools - Hilfsprogramme für manuelle Tests und Debugging +""" diff --git a/python_proto_type/tests/tools/shelly_simulator.py b/python_proto_type/tests/tools/shelly_simulator.py new file mode 100755 index 0000000..50fdf15 --- /dev/null +++ b/python_proto_type/tests/tools/shelly_simulator.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python3 +""" +Test MQTT Client - Simuliert Shelly PM Mini G3 Verhalten + +Sendet MQTT Messages auf das Topic 'shaperorigin/status/pm1:0' +mit verschiedenen Test-Szenarien für die Session Detection. + +Usage: + python test_shelly_simulator.py --scenario standby + python test_shelly_simulator.py --scenario working + python test_shelly_simulator.py --scenario session_end + python test_shelly_simulator.py --scenario full_session +""" + +import argparse +import json +import os +import sys +import time +import ssl +from datetime import datetime +from pathlib import Path +from paho.mqtt import client as mqtt_client +import yaml + + +class ShellySimulator: + """Simuliert Shelly PM Mini G3 MQTT Messages""" + + def __init__(self, broker_host: str, broker_port: int, username: str, password: str, topic_prefix: str = "testshelly"): + self.broker_host = broker_host + self.broker_port = broker_port + self.username = username + self.password = password + self.topic_prefix = topic_prefix + self.topic = f"{topic_prefix}/status/pm1:0" + self.client = None + + def connect(self): + """Verbindung zum MQTT Broker herstellen""" + client_id = f"shelly-simulator-{int(time.time())}" + self.client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5) + self.client.username_pw_set(self.username, self.password) + + # TLS/SSL + self.client.tls_set(cert_reqs=ssl.CERT_NONE) + self.client.tls_insecure_set(True) + + print(f"Verbinde zu MQTT Broker {self.broker_host}:{self.broker_port}...") + self.client.connect(self.broker_host, self.broker_port, keepalive=60) + self.client.loop_start() + time.sleep(1) + print("✓ Verbunden") + + def disconnect(self): + """Verbindung trennen""" + if self.client: + self.client.loop_stop() + self.client.disconnect() + print("✓ Verbindung getrennt") + + def send_power_message(self, power_w: float, interval_s: int = 1): + """ + Sendet eine Shelly Status Message im echten Shelly PM Mini G3 Format + + Args: + power_w: Leistung in Watt + interval_s: Pause nach dem Senden in Sekunden + """ + # Echtes Shelly PM Mini G3 Format (ohne result wrapper) + message = { + "id": 0, + "voltage": 230.0, + "current": round(power_w / 230.0, 3), + "apower": power_w, + "freq": 50.0, + "aenergy": { + "total": round(12345.6 + (power_w * interval_s / 3600), 1), + "by_minute": [0.0, 0.0, 0.0], + "minute_ts": int(time.time()) + }, + "temperature": { + "tC": 35.2, + "tF": 95.4 + } + } + + payload = json.dumps(message) + self.client.publish(self.topic, payload, qos=1) + + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"[{timestamp}] 📤 Gesendet: {power_w:.1f}W") + + if interval_s > 0: + time.sleep(interval_s) + + def scenario_standby(self): + """Szenario: Maschine im STANDBY (20-100W)""" + print("\n=== SZENARIO: STANDBY (20-100W) ===") + print("Simuliert: Maschine an, Spindel aus\n") + + # Anfahren + print("Phase 1: Anfahren (0 → 50W)") + self.send_power_message(0, 1) + self.send_power_message(10, 1) + self.send_power_message(30, 1) # > 20W → STARTING + self.send_power_message(45, 1) + self.send_power_message(50, 2) # Nach 3s debounce → STANDBY + + # STANDBY halten + print("\nPhase 2: STANDBY halten (50W für 20s)") + for i in range(20): + self.send_power_message(50 + (i % 5) * 2, 1) # 50-58W variieren + + print("\n✓ Szenario abgeschlossen") + + def scenario_working(self): + """Szenario: Maschine WORKING (>=100W)""" + print("\n=== SZENARIO: WORKING (>=100W) ===") + print("Simuliert: Spindel läuft\n") + + # Anfahren direkt zu WORKING + print("Phase 1: Anfahren (0 → 150W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) # > 20W → STARTING + self.send_power_message(80, 1) + self.send_power_message(120, 1) + self.send_power_message(150, 2) # Nach 3s debounce → WORKING + + # WORKING halten + print("\nPhase 2: WORKING halten (150W für 20s)") + for i in range(20): + self.send_power_message(150 + (i % 10) * 5, 1) # 150-195W variieren + + print("\n✓ Szenario abgeschlossen") + + def scenario_session_end(self): + """Szenario: Session sauber beenden mit stop_debounce""" + print("\n=== SZENARIO: SESSION END (mit stop_debounce 15s) ===") + print("Simuliert: Maschine einschalten → arbeiten → ausschalten\n") + + # Start: STANDBY + print("Phase 1: Session Start → STANDBY (50W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) # STARTING + self.send_power_message(50, 1) + self.send_power_message(50, 1) + self.send_power_message(50, 2) # Nach 3s → STANDBY + + # STANDBY 10s + print("\nPhase 2: STANDBY halten (50W für 10s)") + for _ in range(10): + self.send_power_message(50, 1) + + # Runterfahren + print("\nPhase 3: Herunterfahren (50W → 0W)") + self.send_power_message(40, 1) + self.send_power_message(25, 1) + self.send_power_message(15, 1) # < 20W → STOPPING + + # STOPPING für 15 Sekunden (stop_debounce) + print("\nPhase 4: STOPPING (< 20W für 15s = stop_debounce)") + for i in range(16): + self.send_power_message(10 - i * 0.6, 1) # 10W → ~0W über 15s + print(f" STOPPING seit {i+1}s / 15s") + + print("\n✓ Session sollte jetzt beendet sein (nach stop_debounce)") + print(" Erwartung: Session END mit end_reason='power_drop'") + + def scenario_full_session(self): + """Szenario: Komplette Session mit STANDBY→WORKING→STANDBY→END""" + print("\n=== SZENARIO: FULL SESSION ===") + print("Simuliert: Start → STANDBY → WORKING → STANDBY → END\n") + + # Start → STANDBY + print("Phase 1: Start → STANDBY (50W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) + self.send_power_message(50, 1) + self.send_power_message(50, 2) # → STANDBY + + # STANDBY 5s + print("\nPhase 2: STANDBY (50W für 5s)") + for _ in range(5): + self.send_power_message(55, 1) + + # STANDBY → WORKING + print("\nPhase 3: STANDBY → WORKING (50W → 150W)") + self.send_power_message(70, 1) + self.send_power_message(90, 1) + self.send_power_message(110, 1) # >= 100W → WORKING + self.send_power_message(130, 1) + self.send_power_message(150, 1) + + # WORKING 10s + print("\nPhase 4: WORKING (150W für 10s)") + for i in range(10): + self.send_power_message(150 + (i % 5) * 10, 1) + + # WORKING → STANDBY + print("\nPhase 5: WORKING → STANDBY (150W → 60W)") + self.send_power_message(120, 1) + self.send_power_message(90, 1) # < 100W → STANDBY + self.send_power_message(70, 1) + self.send_power_message(60, 1) + + # STANDBY 5s + print("\nPhase 6: STANDBY (60W für 5s)") + for _ in range(5): + self.send_power_message(60, 1) + + # Session END + print("\nPhase 7: Session END (60W → 0W mit stop_debounce)") + self.send_power_message(40, 1) + self.send_power_message(20, 1) + self.send_power_message(10, 1) # < 20W → STOPPING + + print("\nPhase 8: STOPPING (15s debounce)") + for i in range(16): + self.send_power_message(5 - i * 0.3, 1) + print(f" STOPPING seit {i+1}s / 15s") + + print("\n✓ Full Session abgeschlossen") + print(" Erwartung:") + print(" - Session START") + print(" - STANDBY → WORKING → STANDBY Transitionen") + print(" - Session END nach stop_debounce") + print(" - standby_duration_s und working_duration_s getrackt") + + def scenario_timeout(self): + """Szenario: Timeout Detection (keine Messages für 20s)""" + print("\n=== SZENARIO: TIMEOUT (message_timeout_s = 20s) ===") + print("Simuliert: Maschine läuft → plötzlicher Stromausfall (keine Messages mehr)\n") + + # Start → STANDBY + print("Phase 1: Session Start → STANDBY (50W)") + self.send_power_message(0, 1) + self.send_power_message(30, 1) + self.send_power_message(50, 1) + self.send_power_message(50, 2) # → STANDBY + + # STANDBY 10s + print("\nPhase 2: STANDBY (50W für 10s)") + for _ in range(10): + self.send_power_message(50, 1) + + # Keine Messages mehr (Stromausfall simulieren) + print("\n⚡ STROMAUSFALL: Keine Messages mehr für 25 Sekunden") + print(" (message_timeout_s = 20s)") + for i in range(25): + time.sleep(1) + print(f" Keine Messages seit {i+1}s / 20s", end="\r") + + print("\n\n✓ Timeout Szenario abgeschlossen") + print(" Erwartung: Session END nach 20s mit end_reason='timeout'") + + +def main(): + parser = argparse.ArgumentParser(description="Shelly PM Mini G3 MQTT Simulator") + parser.add_argument( + "--scenario", + choices=["standby", "working", "session_end", "full_session", "timeout", "loop"], + required=True, + help="Test-Szenario" + ) + parser.add_argument("--host", default=None, help="MQTT Broker Host (default: from env/config.yaml)") + parser.add_argument("--port", type=int, default=None, help="MQTT Broker Port (default: from env/config.yaml)") + parser.add_argument("--username", default=None, help="MQTT Username (default: from env/config.yaml)") + parser.add_argument("--password", default=None, help="MQTT Password (default: from env/config.yaml)") + parser.add_argument("--topic-prefix", default="testshelly", help="MQTT Topic Prefix (default: testshelly)") + + args = parser.parse_args() + + # Load credentials from environment or config.yaml if not provided via CLI + mqtt_config = {} + + if not all([args.host, args.port, args.username, args.password]): + # Try environment variables first + mqtt_config = { + 'host': os.getenv('MQTT_HOST'), + 'port': int(os.getenv('MQTT_PORT', 0)) if os.getenv('MQTT_PORT') else None, + 'username': os.getenv('MQTT_USERNAME'), + 'password': os.getenv('MQTT_PASSWORD') + } + + # If not all available, try config.yaml + if not all(mqtt_config.values()): + config_path = Path(__file__).parent.parent.parent / "config.yaml" + print(f"Lade MQTT Credentials aus {config_path}...") + if config_path.exists(): + try: + with open(config_path, 'r') as f: + config = yaml.safe_load(f) + mqtt_section = config.get('mqtt', {}) + mqtt_config = { + 'host': mqtt_section.get('host'), + 'port': mqtt_section.get('port'), + 'username': mqtt_section.get('username'), + 'password': mqtt_section.get('password') + } + except Exception as e: + print(f"⚠️ Warnung: Fehler beim Lesen von config.yaml: {e}") + + # Use CLI args or fallback to loaded config + host = args.host or mqtt_config.get('host') + port = args.port or mqtt_config.get('port') + username = args.username or mqtt_config.get('username') + password = args.password or mqtt_config.get('password') + + # Validate that we have all required credentials + if not all([host, port, username, password]): + print("❌ Fehler: MQTT Credentials fehlen!") + print("") + print("Bitte setze die Credentials über:") + print(" 1) Kommandozeilen-Parameter:") + print(" --host --port --username --password ") + print("") + print(" 2) Umgebungsvariablen:") + print(" export MQTT_HOST=mqtt.majufilo.eu") + print(" export MQTT_PORT=8883") + print(" export MQTT_USERNAME=mosquitto") + print(" export MQTT_PASSWORD=xxx") + print("") + print(" 3) config.yaml im Projektroot") + print("") + sys.exit(1) + + print("=" * 60) + print("SHELLY PM MINI G3 SIMULATOR") + print("=" * 60) + + simulator = ShellySimulator( + broker_host=host, + broker_port=port, + username=username, + password=password, + topic_prefix=args.topic_prefix + ) + + try: + simulator.connect() + + # Szenario ausführen + if args.scenario == "standby": + simulator.scenario_standby() + elif args.scenario == "working": + simulator.scenario_working() + elif args.scenario == "session_end": + simulator.scenario_session_end() + elif args.scenario == "full_session": + simulator.scenario_full_session() + elif args.scenario == "timeout": + simulator.scenario_timeout() + elif args.scenario == "loop": + print("Starte Endlosschleife (STRG+C zum Beenden)...") + while True: + simulator.scenario_full_session() + else: + print(f"❌ Unbekanntes Szenario: {args.scenario}") + exit(1) + + print("\n" + "=" * 60) + print("Test abgeschlossen. Prüfe sessions.json für Ergebnisse.") + print("=" * 60) + + except KeyboardInterrupt: + print("\n\n⚠️ Abgebrochen") + except Exception as e: + print(f"\n❌ Fehler: {e}") + finally: + simulator.disconnect() + + +if __name__ == "__main__": + main() diff --git a/python_proto_type/tests/unit/__init__.py b/python_proto_type/tests/unit/__init__.py new file mode 100644 index 0000000..8ff464d --- /dev/null +++ b/python_proto_type/tests/unit/__init__.py @@ -0,0 +1,3 @@ +""" +Unit Tests - Testen einzelne Komponenten isoliert ohne externe Dependencies +""" diff --git a/python_proto_type/tests/unit/test_session_detector.py b/python_proto_type/tests/unit/test_session_detector.py new file mode 100644 index 0000000..af7ef5b --- /dev/null +++ b/python_proto_type/tests/unit/test_session_detector.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python3 +""" +Unit Tests für SessionDetector + +Testet die Dual-Threshold Session Detection ohne MQTT Broker. +Direktes Testen der Logik mit simulierten Events. + +Usage: + pytest test_session_detector.py -v + pytest test_session_detector.py::test_standby_to_working -v +""" + +import pytest +from datetime import datetime, timezone, timedelta +from session_detector import SessionDetector, SessionState + + +@pytest.fixture +def detector(): + """SessionDetector mit Test-Konfiguration""" + config = [ + { + 'machine_id': 'test-machine-01', + 'machine_name': 'Test Machine', + 'standby_threshold_w': 20, + 'working_threshold_w': 100, + 'start_debounce_s': 3, + 'stop_debounce_s': 15, + 'message_timeout_s': 20, + } + ] + return SessionDetector(device_config=config) + + +def create_event(power_w: float, machine_id: str = 'test-machine-01', timestamp: datetime = None): + """Helper: Erstellt ein normalisiertes Event""" + if timestamp is None: + timestamp = datetime.now(timezone.utc) + + return { + 'event_id': 'test-event', + 'event_type': 'power_measurement', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': 'Test Machine' + }, + 'metrics': { + 'power_w': power_w + } + } + + +class TestSessionStart: + """Tests für Session-Start mit start_debounce""" + + def test_idle_to_starting(self, detector): + """Power >= 20W → STARTING""" + event = create_event(power_w=30) + result = detector.process_event(event) + + assert result is None # Noch kein Session-Start + assert detector.get_machine_state('test-machine-01') == 'starting' + + def test_starting_to_standby(self, detector): + """Nach 3s debounce mit 20-100W → STANDBY""" + start_time = datetime.now(timezone.utc) + + # T+0: Power 30W → STARTING + event1 = create_event(power_w=30, timestamp=start_time) + result1 = detector.process_event(event1) + assert result1 is None + + # T+1: Power 50W → noch STARTING + event2 = create_event(power_w=50, timestamp=start_time + timedelta(seconds=1)) + result2 = detector.process_event(event2) + assert result2 is None + + # T+3: Power 50W → STANDBY (debounce passed) + event3 = create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)) + result3 = detector.process_event(event3) + + assert result3 is not None + assert result3['event_type'] == 'session_start' + assert result3['session_data']['start_time'] is not None + assert detector.get_machine_state('test-machine-01') == 'standby' + + def test_starting_to_working(self, detector): + """Nach 3s debounce mit >=100W → WORKING""" + start_time = datetime.now(timezone.utc) + + # T+0: Power 30W → STARTING + event1 = create_event(power_w=30, timestamp=start_time) + detector.process_event(event1) + + # T+1: Power 120W → noch STARTING + event2 = create_event(power_w=120, timestamp=start_time + timedelta(seconds=1)) + detector.process_event(event2) + + # T+3: Power 150W → WORKING (debounce passed) + event3 = create_event(power_w=150, timestamp=start_time + timedelta(seconds=3)) + result3 = detector.process_event(event3) + + assert result3 is not None + assert result3['event_type'] == 'session_start' + assert detector.get_machine_state('test-machine-01') == 'working' + + def test_false_start(self, detector): + """Power fällt vor debounce → zurück zu IDLE""" + start_time = datetime.now(timezone.utc) + + # T+0: Power 30W → STARTING + event1 = create_event(power_w=30, timestamp=start_time) + detector.process_event(event1) + + # T+1: Power 10W → zurück zu IDLE (false start) + event2 = create_event(power_w=10, timestamp=start_time + timedelta(seconds=1)) + result2 = detector.process_event(event2) + + assert result2 is None + assert detector.get_machine_state('test-machine-01') == 'idle' + + +class TestStateTransitions: + """Tests für Zustandsübergänge während Session""" + + def test_standby_to_working(self, detector): + """STANDBY → WORKING bei Power >= 100W""" + start_time = datetime.now(timezone.utc) + + # Session starten im STANDBY + detector.process_event(create_event(power_w=30, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + assert detector.get_machine_state('test-machine-01') == 'standby' + + # Power auf 150W → WORKING + event = create_event(power_w=150, timestamp=start_time + timedelta(seconds=10)) + result = detector.process_event(event) + + assert result is None # Keine session_end/start, nur Transition + assert detector.get_machine_state('test-machine-01') == 'working' + + def test_working_to_standby(self, detector): + """WORKING → STANDBY bei Power < 100W""" + start_time = datetime.now(timezone.utc) + + # Session starten im WORKING + detector.process_event(create_event(power_w=150, timestamp=start_time)) + detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))) + assert detector.get_machine_state('test-machine-01') == 'working' + + # Power auf 60W → STANDBY + event = create_event(power_w=60, timestamp=start_time + timedelta(seconds=10)) + result = detector.process_event(event) + + assert result is None + assert detector.get_machine_state('test-machine-01') == 'standby' + + +class TestSessionEnd: + """Tests für Session-Ende mit stop_debounce""" + + def test_session_end_from_standby(self, detector): + """STANDBY → STOPPING → IDLE nach 15s < 20W""" + start_time = datetime.now(timezone.utc) + + # Session starten + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # STANDBY für 10s + for i in range(10): + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i))) + + # Power fällt unter 20W → STOPPING + t_stopping = start_time + timedelta(seconds=13) + event_stopping = create_event(power_w=10, timestamp=t_stopping) + result_stopping = detector.process_event(event_stopping) + + assert result_stopping is None + assert detector.get_machine_state('test-machine-01') == 'stopping' + + # STOPPING für 14s → noch keine Session-End + for i in range(14): + result = detector.process_event( + create_event(power_w=10, timestamp=t_stopping + timedelta(seconds=i + 1)) + ) + assert result is None # Noch kein Session-Ende + + # T+15: stop_debounce passed → SESSION END + event_end = create_event(power_w=5, timestamp=t_stopping + timedelta(seconds=15)) + result_end = detector.process_event(event_end) + + assert result_end is not None + assert result_end['event_type'] == 'session_end' + assert result_end['session_data']['end_reason'] == 'power_drop' + assert detector.get_machine_state('test-machine-01') == 'idle' + + def test_stopping_canceled_power_back_up(self, detector): + """STOPPING abgebrochen wenn Power wieder >= 20W""" + start_time = datetime.now(timezone.utc) + + # Session starten + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # Power fällt → STOPPING + t_stopping = start_time + timedelta(seconds=10) + detector.process_event(create_event(power_w=10, timestamp=t_stopping)) + assert detector.get_machine_state('test-machine-01') == 'stopping' + + # Nach 5s: Power wieder hoch → zurück zu STANDBY + event_back = create_event(power_w=60, timestamp=t_stopping + timedelta(seconds=5)) + result = detector.process_event(event_back) + + assert result is None # Kein Session-Ende + assert detector.get_machine_state('test-machine-01') == 'standby' + + +class TestDurationTracking: + """Tests für Zeiterfassung in STANDBY und WORKING""" + + def test_standby_duration_tracked(self, detector): + """standby_duration_s wird korrekt akkumuliert""" + start_time = datetime.now(timezone.utc) + + # Session Start → STANDBY + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # STANDBY für 10 Sekunden + for i in range(10): + detector.process_event( + create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i)) + ) + + # Session beenden + t_end = start_time + timedelta(seconds=20) + detector.process_event(create_event(power_w=10, timestamp=t_end)) + + # STOPPING für genau 15 Sekunden (stop_debounce) + for i in range(14): + result = detector.process_event( + create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1)) + ) + assert result is None # Noch kein Session-Ende + + # T+15: stop_debounce passed → SESSION END + result = detector.process_event( + create_event(power_w=0, timestamp=t_end + timedelta(seconds=15)) + ) + + assert result is not None + assert result['session_data']['standby_duration_s'] >= 9 # ~10s STANDBY + assert result['session_data']['working_duration_s'] == 0 # Kein WORKING + + def test_working_duration_tracked(self, detector): + """working_duration_s wird korrekt akkumuliert""" + start_time = datetime.now(timezone.utc) + + # Session Start → WORKING + detector.process_event(create_event(power_w=150, timestamp=start_time)) + detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))) + + # WORKING für 10 Sekunden + for i in range(10): + detector.process_event( + create_event(power_w=150, timestamp=start_time + timedelta(seconds=3 + i)) + ) + + # Session beenden + t_end = start_time + timedelta(seconds=20) + detector.process_event(create_event(power_w=10, timestamp=t_end)) + + # STOPPING für genau 15 Sekunden + for i in range(14): + result = detector.process_event( + create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1)) + ) + assert result is None + + # T+15: stop_debounce passed + result = detector.process_event( + create_event(power_w=0, timestamp=t_end + timedelta(seconds=15)) + ) + + assert result is not None + assert result['session_data']['working_duration_s'] >= 9 # ~10s WORKING + assert result['session_data']['standby_duration_s'] == 0 # Kein STANDBY + + def test_mixed_standby_working_duration(self, detector): + """STANDBY und WORKING Zeiten separat getrackt""" + start_time = datetime.now(timezone.utc) + + # Start → STANDBY + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # STANDBY 5s + for i in range(5): + detector.process_event( + create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i)) + ) + + # → WORKING + t_working = start_time + timedelta(seconds=8) + detector.process_event(create_event(power_w=150, timestamp=t_working)) + + # WORKING 5s + for i in range(5): + detector.process_event( + create_event(power_w=150, timestamp=t_working + timedelta(seconds=i + 1)) + ) + + # → STANDBY + t_standby2 = t_working + timedelta(seconds=6) + detector.process_event(create_event(power_w=60, timestamp=t_standby2)) + + # STANDBY 5s + for i in range(5): + detector.process_event( + create_event(power_w=60, timestamp=t_standby2 + timedelta(seconds=i + 1)) + ) + + # Session beenden + t_end = t_standby2 + timedelta(seconds=6) + detector.process_event(create_event(power_w=10, timestamp=t_end)) + + # STOPPING für genau 15 Sekunden + for i in range(14): + result = detector.process_event( + create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1)) + ) + assert result is None + + # T+15: stop_debounce passed + result = detector.process_event( + create_event(power_w=0, timestamp=t_end + timedelta(seconds=15)) + ) + + assert result is not None + # ~10s STANDBY (5s + 5s) + assert result['session_data']['standby_duration_s'] >= 9 + assert result['session_data']['standby_duration_s'] <= 12 + # ~5s WORKING + assert result['session_data']['working_duration_s'] >= 4 + assert result['session_data']['working_duration_s'] <= 7 + + +class TestTimeoutDetection: + """Tests für Timeout-basierte Session-Beendigung""" + + def test_timeout_from_standby(self, detector): + """Timeout nach 20s ohne Messages im STANDBY""" + start_time = datetime.now(timezone.utc) + + # Session starten + detector.process_event(create_event(power_w=50, timestamp=start_time)) + detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))) + + # Letzte Message + last_msg = start_time + timedelta(seconds=10) + detector.process_event(create_event(power_w=50, timestamp=last_msg)) + + # 21 Sekunden später → TIMEOUT + timeout_event = create_event(power_w=50, timestamp=last_msg + timedelta(seconds=21)) + result = detector.process_event(timeout_event) + + assert result is not None + assert result['event_type'] == 'session_end' + assert result['session_data']['end_reason'] == 'timeout' + assert result['power_w'] == 0.0 # Bei Timeout wird 0W angenommen + + def test_timeout_from_working(self, detector): + """Timeout nach 20s ohne Messages im WORKING""" + start_time = datetime.now(timezone.utc) + + # Session starten im WORKING + detector.process_event(create_event(power_w=150, timestamp=start_time)) + detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))) + + # Letzte Message + last_msg = start_time + timedelta(seconds=10) + detector.process_event(create_event(power_w=150, timestamp=last_msg)) + + # 21 Sekunden später → TIMEOUT + timeout_event = create_event(power_w=150, timestamp=last_msg + timedelta(seconds=21)) + result = detector.process_event(timeout_event) + + assert result is not None + assert result['event_type'] == 'session_end' + assert result['session_data']['end_reason'] == 'timeout' + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])