feat: Add timeout detection and comprehensive test suite

- Implement check_timeouts() for session timeout detection (20s)
- Fix code duplication bug in session_detector.py
- Add periodic timeout check in main loop (every 1s)
- Add 13 unit tests for session detection
- Add 5 integration tests with real MQTT broker
- Document Odoo integration strategy in README
- Remove hardcoded credentials, use env vars
- All 18 tests passing
This commit is contained in:
Matthias Lotz 2026-01-24 11:32:23 +01:00
parent 4c03959437
commit aeb8e5660b
13 changed files with 1610 additions and 283 deletions

View File

@ -1,225 +1,127 @@
# Open Workshop IoT Bridge - Python Prototype
MQTT-basierte IoT-Integration für Odoo 18 Community Edition
## Projektstatus
**Stand: 2026-01-22**
**Phase 1: Standalone Python Prototype** (aktuell)
- ✅ M0: Projekt Setup & MQTT Connection (abgeschlossen)
- Virtual Environment erstellt
- MQTT Client mit TLS/SSL Support
- Verbindung zu mqtt.majufilo.eu:8883 erfolgreich
- ✅ M1: Shelly PM Mini G3 Integration (abgeschlossen)
- Parser für Status Messages (shaperorigin/status/pm1:0)
- Datenextraktion: apower, voltage, current, frequency, total_energy
- Custom MQTT Topic Prefix Support (shaperorigin)
- Live-Monitoring funktioniert
- ✅ M2: Event-Normalisierung (abgeschlossen)
- Event Schema v1 implementiert
- UUID-basierte Event IDs
- ISO 8601 UTC Timestamps
- Machine/Device Mapping
- Metrics Normalisierung (power_w, voltage_v, current_a, frequency_hz)
- ✅ M3: Session Detection Engine (abgeschlossen)
- State Machine: IDLE → STARTING → RUNNING → STOPPING → IDLE
- Power-basierte Schwellenwerte (konfigurierbar pro Maschine)
- Debounce-Logik (Start/Stop getrennt konfigurierbar)
- Session Events: session_start, session_end mit Duration
- Persistente Speicherung (JSON)
- ⏳ M4: Multi-Device Support (vorbereitet, Config-ready)
- ⏳ M5: Monitoring & Robustheit
**Phase 2: Odoo Integration** (geplant)
- M6-M10: Odoo Module + Bridge + Tests
## Beschreibung
Dieser Python-Prototyp dient als Grundlage für die spätere Odoo-Integration. Er:
- Empfängt MQTT Events von IoT-Geräten (z.B. Shelly PM Mini G3)
- Normalisiert die Daten in ein einheitliches Event-Schema (Event Schema v1)
- Erkennt Maschinenlaufzeit-Sessions basierend auf Power-Schwellenwerten
- Speichert Events und Sessions persistent (JSONL/JSON)
- Unterstützt mehrere Geräte parallel (topic_prefix basiert)
Der Code ist so strukturiert, dass er später direkt in eine Odoo-Bridge übernommen werden kann.
Die JSON-Speicherung ist für Migration zu Odoo-Models vorbereitet:
- `data/events.jsonl``open_workshop.power_event`
- `data/sessions.json``open_workshop.session`
## Installation
### Voraussetzungen
- Python 3.8+
- MQTT Broker (z.B. Mosquitto)
- pip
### Setup
1. **Dependencies installieren**
```bash
pip install -r requirements.txt
```
2. **Konfiguration erstellen**
```bash
cp config.yaml.example config.yaml
```
3. **config.yaml anpassen**
Bearbeite `config.yaml` und setze:
- MQTT Broker Host/Port/Credentials
- Device-Konfiguration (Shelly IDs, Schwellenwerte)
- MQTT Topics
## Verwendung
### Starten
```bash
cd /pfad/zu/python_prototype
source venv/bin/activate
python main.py
```
Dieser Befehl:
- Verbindet sich mit dem MQTT Broker (TLS/SSL)
- Abonniert die konfigurierten Topics
- Empfängt Shelly PM Mini G3 Status Messages
- Normalisiert Events (Event Schema v1)
- Detektiert Session Start/End Events
- Speichert in `data/events.jsonl` und `data/sessions.json`
### Erwartete Ausgabe
```
2026-01-22 19:36:17 - __main__ - INFO - === Open Workshop IoT Bridge Starting ===
2026-01-22 19:36:17 - mqtt_client - INFO - TLS/SSL enabled for port 8883
2026-01-22 19:36:17 - mqtt_client - INFO - Connected to MQTT Broker at mqtt.majufilo.eu:8883
2026-01-22 19:36:17 - mqtt_client - INFO - Subscribed to topic: shaperorigin/#
2026-01-22 19:36:17 - __main__ - INFO - IoT Bridge started successfully
2026-01-22 19:36:17 - __main__ - INFO - Listening for MQTT messages... (Press Ctrl+C to stop)
2026-01-22 19:36:17 - session_detector - INFO - 🟡 Shaper Origin: Power 43.3W >= 30W → STARTING
2026-01-22 19:36:20 - session_detector - INFO - 🟢 Shaper Origin: Session START (debounce 3.0s)
2026-01-22 19:36:20 - __main__ - INFO - 🚀 SESSION START
2026-01-22 19:37:03 - session_detector - INFO - 🟠 Shaper Origin: Power 0.0W < 30W STOPPING
2026-01-22 19:37:18 - session_detector - INFO - 🔴 Shaper Origin: Session END (debounce 15.0s)
2026-01-22 19:37:18 - __main__ - INFO - 🏁 SESSION END - Duration: 58s (0.97 min)
```
## Konfiguration
### MQTT Broker
```yaml
mqtt:
host: "localhost"
port: 1883
username: ""
password: ""
topics:
- "shellies/+/status"
```
### Devices
```yaml
devices:
- topic_prefix: "shaperorigin" # Custom MQTT Prefix (im Shelly konfiguriert)
machine_name: "Shaper Origin"
machine_id: "shaper-origin-01"
device_type: "shelly_pm_mini_g3"
power_threshold: 30 # Watt (Schwellenwert für Session-Erkennung)
start_debounce_s: 3 # Verzögerung bis Session Start
stop_debounce_s: 15 # Verzögerung bis Session End
enabled: true
```
**Multi-Device Support:** Einfach weitere Geräte hinzufügen mit unterschiedlichen `topic_prefix`.
Jedes Gerät benötigt im Shelly einen eigenen Custom MQTT Prefix.
MQTT-basierte IoT Bridge für Odoo 18 Community zur Erfassung von Maschinenlaufzeiten.
## Projektstruktur
```
python_prototype/
├── main.py # Entry point & Orchestration
├── mqtt_client.py # MQTT Client wrapper (TLS/SSL)
├── shelly_parser.py # Shelly PM Mini G3 Message Parser
├── event_normalizer.py # Event Schema v1 Normalizer
├── session_detector.py # Session Detection State Machine
├── event_storage.py # Persistent Storage (JSONL/JSON)
├── config.yaml # Configuration (nicht im Git)
├── config.yaml.example # Config template
├── requirements.txt # Python dependencies
├── README.md # Diese Datei
├── data/ # Output directory
│ ├── events.jsonl # Events (JSON Lines)
│ └── sessions.json # Sessions (JSON Array)
└── logs/ # Log files
└── ows_iot_bridge.log
├── main.py # Haupteinstiegspunkt
├── config.yaml # Produktionskonfiguration (nicht in Git)
├── config.yaml.example # Beispielkonfiguration
├── Core Components (Produktionscode)
├── mqtt_client.py # MQTT Client mit TLS
├── shelly_parser.py # Parser für Shelly PM Mini G3
├── event_normalizer.py # Event Schema v1
├── session_detector.py # Session Detection (Dual-Threshold)
├── event_storage.py # JSON Storage
├── data/ # Laufzeitdaten
│ ├── events.jsonl # Event-Log
│ └── sessions.json # Session-Daten
├── tests/ # Tests
│ ├── unit/ # Unit Tests (schnell, isoliert)
│ │ └── test_session_detector.py
│ ├── integration/ # Integration Tests (mit MQTT)
│ │ └── test_mqtt_integration.py
│ └── tools/ # Test-Hilfsprogramme
│ └── shelly_simulator.py
└── venv/ # Virtual Environment
```
## Nächste Schritte
## Installation
### M4: Multi-Device Support (vorbereitet)
- Zweites Shelly-Device konfigurieren
- Parallele Überwachung mehrerer Maschinen testen
```bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
### M5: Monitoring & Robustheit
- MQTT Reconnect-Logik
- Error Handling & Recovery
- Systemd Service Setup
- Health Monitoring
### M6+: Odoo Integration
- Odoo Models: `open_workshop.machine`, `open_workshop.session`
- Migration: JSON → Odoo Database
- Views & Dashboards
- Live-Monitoring in Odoo
## Gespeicherte Daten
### Events (data/events.jsonl)
JSON Lines Format - ein Event pro Zeile:
```json
{"event_id":"uuid","event_type":"power_measurement","timestamp":"2026-01-22T18:45:09.985Z","machine":{"machine_id":"shaper-origin-01","machine_name":"Shaper Origin"},"metrics":{"power_w":43.8,"voltage_v":230.2,"current_a":0.19}}
cp config.yaml.example config.yaml
# config.yaml anpassen
```
### Sessions (data/sessions.json)
JSON Array mit Session-Objekten:
```json
[
{
"session_id": "uuid",
"machine_id": "shaper-origin-01",
"machine_name": "Shaper Origin",
"start_time": "2026-01-22T18:52:59.000Z",
"end_time": "2026-01-22T18:54:01.995Z",
"duration_s": 62,
"start_power_w": 37.1,
"end_power_w": 0.0,
"status": "completed"
}
]
## Tests
```bash
# Unit Tests (schnell, ~0.05s)
pytest tests/unit/ -v
# Integration Tests (mit MQTT, ~30-60s)
# Benötigt MQTT Konfiguration:
# Option 1: Nutzt existierende config.yaml
pytest tests/integration/ -v -s
# Option 2: Mit Umgebungsvariablen
export MQTT_HOST=mqtt.majufilo.eu
export MQTT_PORT=8883
export MQTT_USERNAME=mosquitto
export MQTT_PASSWORD=dein_passwort
pytest tests/integration/ -v -s
# Alle Tests
pytest tests/ -v
```
## Troubleshooting
**Wichtig:** Integration Tests lesen MQTT-Zugangsdaten aus:
1. Umgebungsvariablen (`MQTT_HOST`, `MQTT_PASSWORD`, etc.) ODER
2. Existierender `config.yaml` im Projektroot
### Connection refused
```
Error: Failed to connect to MQTT Broker: [Errno 111] Connection refused
```
→ Prüfe ob MQTT Broker läuft und erreichbar ist
**Niemals Passwörter im Source Code committen!**
### Permission denied
```
PermissionError: [Errno 13] Permission denied: 'logs/ows_iot_bridge.log'
```
→ Stelle sicher, dass das logs/ Verzeichnis beschreibbar ist
## Betrieb
### Invalid config
```
Error: Config file not found: config.yaml
```
→ Erstelle config.yaml von config.yaml.example
```bash
# Bridge starten
python main.py
## Support
# Mit alternativer Config
python main.py --config custom_config.yaml
```
Siehe Feature Request Dokument: `FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md`
## Manuelle Tests
```bash
# Shelly Simulator für Tests
python tests/tools/shelly_simulator.py --scenario session_end
python tests/tools/shelly_simulator.py --scenario full_session
python tests/tools/shelly_simulator.py --scenario timeout
```
## Session Detection States
- **IDLE**: Power < 20W
- **STARTING**: Power >= 20W, 3s debounce
- **STANDBY**: 20-100W (Maschine an, Spindel aus)
- **WORKING**: >= 100W (Spindel läuft)
- **STOPPING**: < 20W, 15s debounce
- **Timeout**: 20s keine Messages → Session-Ende
## Odoo Integration
**Deployment-Strategie:** Direkte Integration in Odoo (kein separater Container)
**Begründung:**
- Werkstatt-Setup: maximal 10 Maschinen
- Performance: `check_timeouts()` alle 10s = ~10ms CPU-Zeit pro Durchlauf
- CPU-Last: < 0.1% bei 10 Maschinen vernachlässigbar
- Vorteile: Direkter DB-Zugriff, ACID-Transaktionen, keine API-Overhead
- Einfachheit: Ein Container, keine zusätzliche Infrastruktur
**Implementation in Odoo:**
- MQTT Client als Odoo Thread oder Cron-Job
- `check_timeouts()` als scheduled action alle 10 Sekunden
- Direkte Verwendung von `workshop.machine`, `workshop.session` Models
**Alternative (nur bei >50 Maschinen nötig):** Separater Microservice-Container
## Roadmap
- [x] M0-M3: MQTT + Parser + Session Detection
- [x] Unit + Integration Tests
- [x] Timeout Detection mit check_timeouts()
- [ ] M4: Multi-Device Support
- [ ] M5: Reconnect + Error Handling
- [ ] M6-M10: Odoo Integration

View File

@ -21,12 +21,19 @@ devices:
machine_id: "shaper-origin-01"
device_type: "shelly_pm_mini_g3"
# Power threshold for run detection (in Watts)
power_threshold: 50
# Dual Power Thresholds (in Watts)
# IDLE: Power < standby_threshold_w
# STANDBY: standby_threshold_w <= Power < working_threshold_w (Maschine an, Spindel aus)
# WORKING: Power >= working_threshold_w (Spindel läuft)
standby_threshold_w: 20 # Start tracking session
working_threshold_w: 100 # Active work detected
# Debounce times (in seconds)
start_debounce_s: 3 # Power > threshold for X seconds → run_start
stop_debounce_s: 15 # Power < threshold for Y seconds → run_stop
start_debounce_s: 3 # Power >= standby_threshold for X seconds → session_start
stop_debounce_s: 15 # Power < standby_threshold for Y seconds → session_end
# Timeout detection (in seconds)
message_timeout_s: 20 # No message for X seconds → session_end (timeout)
enabled: true

View File

@ -72,9 +72,12 @@ class EventStorage:
"machine_name": "Shaper Origin",
"start_time": "2026-01-22T18:36:20.993Z",
"end_time": "2026-01-22T18:38:01.993Z", // null if running
"duration_s": 101, // null if running
"total_duration_s": 101, // null if running
"standby_duration_s": 80, // null if running (machine on, not working)
"working_duration_s": 21, // null if running (active work)
"start_power_w": 45.7,
"end_power_w": 0.0, // null if running
"end_reason": "power_drop", // or "timeout", null if running
"status": "completed" // or "running"
}
"""
@ -107,9 +110,12 @@ class EventStorage:
'machine_name': machine.get('machine_name'),
'start_time': session_data.get('start_time'),
'end_time': None,
'duration_s': None,
'total_duration_s': None,
'standby_duration_s': None,
'working_duration_s': None,
'start_power_w': event.get('power_w'),
'end_power_w': None,
'end_reason': None,
'status': 'running'
}
@ -130,14 +136,23 @@ class EventStorage:
session_data = event.get('session_data', {})
session['end_time'] = session_data.get('end_time')
session['duration_s'] = session_data.get('duration_s')
session['total_duration_s'] = session_data.get('total_duration_s')
session['standby_duration_s'] = session_data.get('standby_duration_s')
session['working_duration_s'] = session_data.get('working_duration_s')
session['end_power_w'] = event.get('power_w')
session['end_reason'] = session_data.get('end_reason', 'normal')
session['status'] = 'completed'
self._write_sessions(sessions)
duration_min = session['duration_s'] / 60
self.logger.info(f"Session {session_id[:8]}... completed ({duration_min:.1f} min)")
total_min = session['total_duration_s'] / 60 if session['total_duration_s'] else 0
standby_min = session['standby_duration_s'] / 60 if session['standby_duration_s'] else 0
working_min = session['working_duration_s'] / 60 if session['working_duration_s'] else 0
self.logger.info(
f"Session {session_id[:8]}... completed ({session['end_reason']}) - "
f"Total: {total_min:.1f}min, Standby: {standby_min:.1f}min, Working: {working_min:.1f}min"
)
return True
self.logger.error(f"Session {session_id} not found for update")

View File

@ -12,6 +12,7 @@ import yaml
import time
import signal
import json
import argparse
from pathlib import Path
from typing import Dict
@ -223,8 +224,11 @@ class IoTBridge:
while self.running:
time.sleep(1)
# TODO: Hier können später periodische Tasks ausgeführt werden
# z.B. Session-Timeout-Checks
# Check for session timeouts periodically
timeout_events = self.session_detector.check_timeouts()
for session_event in timeout_events:
self._log_session_event(session_event)
self.event_storage.store_session_event(session_event)
except KeyboardInterrupt:
self.logger.info("Interrupted by user")
@ -241,12 +245,16 @@ class IoTBridge:
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='Open Workshop IoT Bridge')
parser.add_argument('--config', default='config.yaml', help='Path to config file (default: config.yaml)')
args = parser.parse_args()
# Check if config file exists
config_file = Path('config.yaml')
config_file = Path(args.config)
if not config_file.exists():
print("\n" + "="*60)
print("Configuration file not found!")
print(f"Configuration file not found: {config_file}")
print("="*60)
print("\nPlease create config.yaml from the example:")
print(" cp config.yaml.example config.yaml")
@ -255,7 +263,7 @@ def main():
sys.exit(1)
# Start bridge
bridge = IoTBridge('config.yaml')
bridge = IoTBridge(str(config_file))
bridge.start()

View File

@ -1,37 +1,44 @@
"""
Session Detection Engine for Open Workshop IoT Bridge
Detects machine run sessions based on power consumption thresholds
with debounce logic to avoid false starts/stops.
Detects machine run sessions with dual power thresholds:
- STANDBY: Machine on, not working (e.g. 20-100W)
- WORKING: Active work (e.g. >= 100W)
Includes timeout detection for when machine powers off (no MQTT messages).
"""
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from datetime import datetime, timezone, timedelta
from typing import Dict, Optional, List
from enum import Enum
class SessionState(Enum):
"""Session detection states"""
IDLE = "idle" # Machine off (power < threshold)
STARTING = "starting" # Power above threshold, waiting for debounce
RUNNING = "running" # Confirmed run session active
STOPPING = "stopping" # Power below threshold, waiting for debounce
IDLE = "idle" # Machine off (power < standby_threshold)
STARTING = "starting" # Power above standby threshold, waiting for debounce
STANDBY = "standby" # Machine on, not working (standby <= power < working)
WORKING = "working" # Active work (power >= working threshold)
STOPPING = "stopping" # Power below standby threshold, waiting for debounce
class SessionDetector:
"""
Detects machine run sessions based on power measurements
Detects machine run sessions with dual thresholds and timeout detection
State Machine:
IDLE -> STARTING -> RUNNING -> STOPPING -> IDLE
IDLE -> STARTING -> STANDBY/WORKING -> STOPPING -> IDLE
- IDLE: Power < threshold
- STARTING: Power >= threshold for < start_debounce_s
- RUNNING: Power >= threshold for >= start_debounce_s
- STOPPING: Power < threshold for < stop_debounce_s (while in RUNNING)
- Back to IDLE: Power < threshold for >= stop_debounce_s
- IDLE: Power < standby_threshold
- STARTING: Power >= standby_threshold for < start_debounce_s
- STANDBY: Power >= standby_threshold < working_threshold (confirmed)
- WORKING: Power >= working_threshold (confirmed)
- STOPPING: Power < standby_threshold for < stop_debounce_s
- Back to IDLE: Power < standby_threshold for >= stop_debounce_s OR timeout
Timeout: No MQTT message for > message_timeout_s session_end (machine powered off)
"""
def __init__(self, device_config: list = None):
@ -50,15 +57,58 @@ class SessionDetector:
machine_id = device.get('machine_id')
if machine_id:
self.machine_config[machine_id] = {
'power_threshold': device.get('power_threshold', 50),
'standby_threshold_w': device.get('standby_threshold_w', 20),
'working_threshold_w': device.get('working_threshold_w', 100),
'start_debounce_s': device.get('start_debounce_s', 3),
'stop_debounce_s': device.get('stop_debounce_s', 15),
'message_timeout_s': device.get('message_timeout_s', 60),
'machine_name': device.get('machine_name', 'Unknown'),
}
# State tracking per machine
self.machine_states = {} # machine_id -> state info
def check_timeouts(self) -> List[Dict]:
"""
Check all machines for message timeouts and end sessions if needed.
Should be called periodically (e.g. every second).
Returns:
List of session_end events for timed-out sessions
"""
timeout_events = []
current_time = datetime.now(timezone.utc)
for machine_id, state_info in self.machine_states.items():
# Only check machines in active session states
if state_info['state'] not in [SessionState.STANDBY, SessionState.WORKING]:
continue
# Skip if no last message time
if not state_info['last_message_time']:
continue
# Get machine config
config = self.machine_config.get(machine_id)
if not config:
continue
# Check if timeout exceeded
time_since_last_message = (current_time - state_info['last_message_time']).total_seconds()
if time_since_last_message > config['message_timeout_s']:
machine_name = config.get('machine_name', 'Unknown')
self.logger.warning(
f"⏱️ {machine_name}: Message timeout "
f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END"
)
# End session with timeout
timeout_event = self._end_session_timeout(machine_id, machine_name, current_time, config, state_info)
if timeout_event:
timeout_events.append(timeout_event)
return timeout_events
def process_event(self, event: Dict) -> Optional[Dict]:
"""
Process a normalized event and detect session changes
@ -113,10 +163,29 @@ class SessionDetector:
'current_session_id': None,
'session_start_time': None,
'last_power': None,
'last_message_time': None,
'standby_duration_s': 0,
'working_duration_s': 0,
'last_state_change': datetime.now(timezone.utc),
}
state_info = self.machine_states[machine_id]
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
machine_name = machine.get('machine_name', 'Unknown')
# Check for timeout (no message received)
if state_info['last_message_time']:
time_since_last_message = (timestamp - state_info['last_message_time']).total_seconds()
if time_since_last_message > config['message_timeout_s']:
if state_info['state'] in [SessionState.STANDBY, SessionState.WORKING]:
self.logger.warning(
f"{machine_name}: Message timeout "
f"({time_since_last_message:.0f}s > {config['message_timeout_s']}s) → SESSION END"
)
return self._end_session_timeout(machine_id, machine_name, timestamp, config, state_info)
# Update last message time
state_info['last_message_time'] = timestamp
# Update last power
state_info['last_power'] = power_w
@ -141,41 +210,60 @@ class SessionDetector:
state_info: Dict
) -> Optional[Dict]:
"""
Process state machine logic
Process state machine logic with dual thresholds
Returns session event if state change occurred
"""
current_state = state_info['state']
threshold = config['power_threshold']
standby_threshold = config['standby_threshold_w']
working_threshold = config['working_threshold_w']
start_debounce = config['start_debounce_s']
stop_debounce = config['stop_debounce_s']
time_in_state = (timestamp - state_info['state_since']).total_seconds()
# Update duration tracking for active states
if current_state == SessionState.STANDBY:
time_since_last_update = (timestamp - state_info['last_state_change']).total_seconds()
state_info['standby_duration_s'] += time_since_last_update
state_info['last_state_change'] = timestamp
elif current_state == SessionState.WORKING:
time_since_last_update = (timestamp - state_info['last_state_change']).total_seconds()
state_info['working_duration_s'] += time_since_last_update
state_info['last_state_change'] = timestamp
# State machine transitions
if current_state == SessionState.IDLE:
if power_w >= threshold:
if power_w >= standby_threshold:
# Transition to STARTING
self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {threshold}W → STARTING")
self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {standby_threshold}W → STARTING")
state_info['state'] = SessionState.STARTING
state_info['state_since'] = timestamp
elif current_state == SessionState.STARTING:
if power_w < threshold:
if power_w < standby_threshold:
# False start, back to IDLE
self.logger.info(f"{machine_name}: Power dropped before debounce → IDLE")
state_info['state'] = SessionState.IDLE
state_info['state_since'] = timestamp
elif time_in_state >= start_debounce:
# Debounce passed, transition to RUNNING
# Debounce passed, transition to STANDBY or WORKING
session_id = str(uuid.uuid4())
state_info['state'] = SessionState.RUNNING
state_info['state_since'] = timestamp
state_info['current_session_id'] = session_id
state_info['session_start_time'] = timestamp
state_info['standby_duration_s'] = 0
state_info['working_duration_s'] = 0
state_info['last_state_change'] = timestamp
self.logger.info(f"🟢 {machine_name}: Session START (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W")
if power_w >= working_threshold:
state_info['state'] = SessionState.WORKING
self.logger.info(f"🔵 {machine_name}: Session START → WORKING (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W")
else:
state_info['state'] = SessionState.STANDBY
self.logger.info(f"🟢 {machine_name}: Session START → STANDBY (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W")
state_info['state_since'] = timestamp
# Generate session_start event
return {
@ -192,55 +280,158 @@ class SessionDetector:
}
}
elif current_state == SessionState.RUNNING:
if power_w < threshold:
elif current_state == SessionState.STANDBY:
if power_w < standby_threshold:
# Transition to STOPPING
self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {threshold}W → STOPPING")
self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {standby_threshold}W → STOPPING")
state_info['state'] = SessionState.STOPPING
state_info['state_since'] = timestamp
state_info['last_state_change'] = timestamp
elif power_w >= working_threshold:
# Transition to WORKING
self.logger.info(f"🔵 {machine_name}: Power {power_w:.1f}W >= {working_threshold}W → WORKING")
state_info['state'] = SessionState.WORKING
state_info['state_since'] = timestamp
state_info['last_state_change'] = timestamp
elif current_state == SessionState.WORKING:
if power_w < standby_threshold:
# Transition to STOPPING
self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {standby_threshold}W → STOPPING")
state_info['state'] = SessionState.STOPPING
state_info['state_since'] = timestamp
state_info['last_state_change'] = timestamp
elif power_w < working_threshold:
# Transition to STANDBY
self.logger.info(f"🟢 {machine_name}: Power {power_w:.1f}W < {working_threshold}W → STANDBY")
state_info['state'] = SessionState.STANDBY
state_info['state_since'] = timestamp
state_info['last_state_change'] = timestamp
elif current_state == SessionState.STOPPING:
if power_w >= threshold:
# Power back up, back to RUNNING
self.logger.info(f"🟢 {machine_name}: Power back up → RUNNING")
state_info['state'] = SessionState.RUNNING
if power_w >= standby_threshold:
# Power back up, cancel STOPPING
if power_w >= working_threshold:
self.logger.info(f"🔵 {machine_name}: Power back up → WORKING")
state_info['state'] = SessionState.WORKING
else:
self.logger.info(f"🟢 {machine_name}: Power back up → STANDBY")
state_info['state'] = SessionState.STANDBY
state_info['state_since'] = timestamp
state_info['last_state_change'] = timestamp
elif time_in_state >= stop_debounce:
# Debounce passed, session ended
session_id = state_info['current_session_id']
start_time = state_info['session_start_time']
duration_s = (timestamp - start_time).total_seconds()
self.logger.info(f"🔴 {machine_name}: Session END (debounce {time_in_state:.1f}s) - Duration: {duration_s:.1f}s")
# Generate session_end event
session_event = {
'session_id': session_id,
'event_type': 'session_end',
'timestamp': timestamp.isoformat().replace('+00:00', 'Z'),
'machine': {
'machine_id': machine_id,
'machine_name': machine_name
},
'power_w': power_w,
'session_data': {
'start_time': start_time.isoformat().replace('+00:00', 'Z'),
'end_time': timestamp.isoformat().replace('+00:00', 'Z'),
'duration_s': int(duration_s)
}
}
# Reset to IDLE
state_info['state'] = SessionState.IDLE
state_info['state_since'] = timestamp
state_info['current_session_id'] = None
state_info['session_start_time'] = None
return session_event
else:
# Power still low, check if debounce passed
if time_in_state >= stop_debounce:
# Debounce passed, session ended
return self._end_session_normal(machine_id, machine_name, power_w, timestamp, state_info)
return None
def _end_session_normal(
self,
machine_id: str,
machine_name: str,
power_w: float,
timestamp: datetime,
state_info: Dict
) -> Dict:
"""End session normally (power drop)"""
session_id = state_info['current_session_id']
start_time = state_info['session_start_time']
duration_s = (timestamp - start_time).total_seconds()
standby_duration = state_info['standby_duration_s']
working_duration = state_info['working_duration_s']
self.logger.info(
f"🔴 {machine_name}: Session END (power drop) - "
f"Total: {duration_s:.0f}s, Standby: {standby_duration:.0f}s, Working: {working_duration:.0f}s"
)
# Generate session_end event
session_event = {
'session_id': session_id,
'event_type': 'session_end',
'timestamp': timestamp.isoformat().replace('+00:00', 'Z'),
'machine': {
'machine_id': machine_id,
'machine_name': machine_name
},
'power_w': power_w,
'session_data': {
'start_time': start_time.isoformat().replace('+00:00', 'Z'),
'end_time': timestamp.isoformat().replace('+00:00', 'Z'),
'total_duration_s': int(duration_s),
'standby_duration_s': int(standby_duration),
'working_duration_s': int(working_duration),
'end_reason': 'power_drop'
}
}
# Reset to IDLE
self._reset_session_state(state_info, timestamp)
return session_event
def _end_session_timeout(
self,
machine_id: str,
machine_name: str,
timestamp: datetime,
config: Dict,
state_info: Dict
) -> Dict:
"""End session due to timeout (no messages)"""
session_id = state_info['current_session_id']
start_time = state_info['session_start_time']
duration_s = (timestamp - start_time).total_seconds()
standby_duration = state_info['standby_duration_s']
working_duration = state_info['working_duration_s']
self.logger.warning(
f"⏱️ {machine_name}: Session END (TIMEOUT) - "
f"Total: {duration_s:.0f}s, Standby: {standby_duration:.0f}s, Working: {working_duration:.0f}s"
)
# Generate session_end event
session_event = {
'session_id': session_id,
'event_type': 'session_end',
'timestamp': timestamp.isoformat().replace('+00:00', 'Z'),
'machine': {
'machine_id': machine_id,
'machine_name': machine_name
},
'power_w': 0.0, # Assume power is 0 on timeout
'session_data': {
'start_time': start_time.isoformat().replace('+00:00', 'Z'),
'end_time': timestamp.isoformat().replace('+00:00', 'Z'),
'total_duration_s': int(duration_s),
'standby_duration_s': int(standby_duration),
'working_duration_s': int(working_duration),
'end_reason': 'timeout'
}
}
# Reset to IDLE
self._reset_session_state(state_info, timestamp)
return session_event
def _reset_session_state(self, state_info: Dict, timestamp: datetime):
"""Reset session state to IDLE"""
state_info['state'] = SessionState.IDLE
state_info['state_since'] = timestamp
state_info['current_session_id'] = None
state_info['session_start_time'] = None
state_info['standby_duration_s'] = 0
state_info['working_duration_s'] = 0
state_info['last_state_change'] = timestamp
def get_machine_state(self, machine_id: str) -> Optional[str]:
"""Get current state of a machine"""
state_info = self.machine_states.get(machine_id)

View File

@ -62,6 +62,17 @@ class ShellyParser:
"""
Parse full status message
Topic: shaperorigin/status/pm1:0
Payload format (Shelly PM Mini G3):
{
"id": 0,
"voltage": 230.0,
"current": 0.217,
"apower": 50.0,
"freq": 50.0,
"aenergy": {"total": 12345.6, "by_minute": [...], "minute_ts": 1234567890},
"temperature": {"tC": 35.2, "tF": 95.4}
}
"""
# Extract device ID from topic prefix
device_id = self._extract_device_id_from_topic(topic)

View File

@ -0,0 +1,3 @@
"""
Tests für Open Workshop IoT Bridge
"""

View File

@ -0,0 +1,3 @@
"""
Integration Tests - Testen mit echten externen Services (MQTT Broker, Storage)
"""

View File

@ -0,0 +1,418 @@
#!/usr/bin/env python3
"""
Integration Tests mit echtem MQTT Broker
Testet die komplette Pipeline:
MQTT Parser Normalizer SessionDetector Storage
Usage:
pytest test_mqtt_integration.py -v -s
"""
import pytest
import subprocess
import time
import json
import signal
import os
from pathlib import Path
from datetime import datetime
import paho.mqtt.client as mqtt_client
import ssl
@pytest.fixture(scope="session")
def mqtt_config():
"""MQTT Broker Konfiguration aus Umgebungsvariablen oder config.yaml"""
# Option 1: Aus Umgebungsvariablen
if os.getenv('MQTT_HOST'):
return {
'host': os.getenv('MQTT_HOST'),
'port': int(os.getenv('MQTT_PORT', '8883')),
'username': os.getenv('MQTT_USERNAME'),
'password': os.getenv('MQTT_PASSWORD'),
'topic_prefix': os.getenv('MQTT_TEST_TOPIC_PREFIX', 'pytest-test')
}
# Option 2: Aus config.yaml lesen
config_path = Path(__file__).parent.parent.parent / 'config.yaml'
if config_path.exists():
import yaml
with open(config_path) as f:
config = yaml.safe_load(f)
mqtt_conf = config.get('mqtt', {})
return {
'host': mqtt_conf.get('host'),
'port': mqtt_conf.get('port', 8883),
'username': mqtt_conf.get('username'),
'password': mqtt_conf.get('password'),
'topic_prefix': 'pytest-test' # Immer separates Topic für Tests
}
pytest.skip("Keine MQTT Konfiguration gefunden. Setze Umgebungsvariablen oder erstelle config.yaml")
@pytest.fixture(scope="session")
def workspace_dir():
"""Workspace-Verzeichnis"""
return Path(__file__).parent
@pytest.fixture(scope="session")
def test_config_file(workspace_dir, mqtt_config):
"""Erstellt temporäre test_config.yaml"""
# Relative Pfade zu den Test Storage Files (relativ zum Projektroot)
config_content = f"""
mqtt:
host: "{mqtt_config['host']}"
port: {mqtt_config['port']}
username: "{mqtt_config['username']}"
password: "{mqtt_config['password']}"
client_id: "ows_iot_bridge_pytest"
keepalive: 60
topics:
- "{mqtt_config['topic_prefix']}/#"
devices:
- topic_prefix: "{mqtt_config['topic_prefix']}"
machine_name: "PyTest Machine"
machine_id: "pytest-machine-01"
device_type: "shelly_pm_mini_g3"
standby_threshold_w: 20
working_threshold_w: 100
start_debounce_s: 3
stop_debounce_s: 15
message_timeout_s: 20
enabled: true
logging:
level: "INFO"
console: true
output:
events_file: "data/test_events.jsonl"
sessions_file: "data/test_sessions.json"
"""
config_path = workspace_dir / "test_config.yaml"
config_path.write_text(config_content)
yield config_path
# Cleanup
if config_path.exists():
config_path.unlink()
@pytest.fixture(scope="session")
def test_storage_files(workspace_dir):
"""Gibt Pfade zu Test-Storage-Dateien zurück (werden von Bridge erstellt)"""
# Test-Dateien im Projektroot data/ Verzeichnis
project_root = workspace_dir.parent.parent
data_dir = project_root / "data"
events_file = data_dir / "test_events.jsonl"
sessions_file = data_dir / "test_sessions.json"
# NICHT löschen - zum Debugging beibehalten!
yield {
'events': events_file,
'sessions': sessions_file
}
# Cleanup nach allen Tests (optional auskommentiert)
# if events_file.exists():
# events_file.unlink()
# if sessions_file.exists():
# sessions_file.unlink()
@pytest.fixture(scope="module")
def bridge_process(workspace_dir, test_config_file, test_storage_files):
"""Startet die IoT Bridge als Subprocess"""
# Bridge starten mit test_config.yaml
# cwd muss das Projektroot sein, nicht tests/integration/
project_root = workspace_dir.parent.parent
config_path = workspace_dir / 'test_config.yaml'
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = '1'
# Use python from venv explicitly
python_exe = project_root / 'venv' / 'bin' / 'python'
bridge_log = workspace_dir / 'bridge_output.log'
with open(bridge_log, 'w') as log_file:
process = subprocess.Popen(
[str(python_exe), 'main.py', '--config', str(config_path)],
cwd=project_root,
env=env,
stdout=log_file,
stderr=subprocess.STDOUT,
text=True
)
# Warten bis Bridge gestartet ist
time.sleep(3)
# Prüfen ob Prozess läuft
if process.poll() is not None:
with open(bridge_log, 'r') as f:
output = f.read()
pytest.fail(f"Bridge konnte nicht gestartet werden:\n{output}")
# Debug: Bridge Output ausgeben
with open(bridge_log, 'r') as f:
output = f.read()
if output:
print(f"\n=== Bridge Output ===\n{output}\n===\n")
yield process
# Bridge sauber beenden
process.send_signal(signal.SIGTERM)
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
@pytest.fixture
def mqtt_sender(mqtt_config):
"""MQTT Client zum Senden von Test-Messages"""
client_id = f"pytest-sender-{int(time.time())}"
client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5)
client.username_pw_set(mqtt_config['username'], mqtt_config['password'])
# TLS/SSL
client.tls_set(cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
client.connect(mqtt_config['host'], mqtt_config['port'], keepalive=60)
client.loop_start()
time.sleep(1)
yield client
client.loop_stop()
client.disconnect()
def send_shelly_message(mqtt_sender, topic: str, power_w: float):
"""Sendet eine Shelly PM Mini G3 Status Message"""
message = {
"id": 0,
"voltage": 230.0,
"current": round(power_w / 230.0, 3),
"apower": power_w,
"freq": 50.0,
"aenergy": {
"total": 12345.6,
"by_minute": [0.0, 0.0, 0.0],
"minute_ts": int(time.time())
},
"temperature": {
"tC": 35.2,
"tF": 95.4
}
}
mqtt_sender.publish(topic, json.dumps(message), qos=1)
time.sleep(0.1) # Kurze Pause zwischen Messages
def read_sessions(sessions_file: Path):
"""Liest sessions.json"""
if not sessions_file.exists():
return []
content = sessions_file.read_text().strip()
if not content or content == "[]":
return []
return json.loads(content)
def wait_for_session_count(sessions_file: Path, expected_count: int, timeout: int = 10):
"""Wartet bis die erwartete Anzahl Sessions vorhanden ist"""
start_time = time.time()
while time.time() - start_time < timeout:
sessions = read_sessions(sessions_file)
if len(sessions) >= expected_count:
return sessions
time.sleep(0.5)
sessions = read_sessions(sessions_file)
pytest.fail(
f"Timeout: Erwartete {expected_count} Sessions, gefunden: {len(sessions)}\n"
f"Sessions: {json.dumps(sessions, indent=2)}"
)
class TestMQTTIntegration:
"""Integration Tests mit echtem MQTT"""
def test_bridge_is_running(self, bridge_process):
"""Bridge läuft erfolgreich"""
assert bridge_process.poll() is None, "Bridge Prozess ist abgestürzt"
def test_session_start_standby(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files):
"""Session Start → STANDBY"""
topic = f"{mqtt_config['topic_prefix']}/status/pm1:0"
sessions_file = test_storage_files['sessions']
# Anzahl Sessions vor dem Test
initial_count = len(read_sessions(sessions_file))
# Power ansteigen lassen - wie reale Maschine
send_shelly_message(mqtt_sender, topic, 0)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 30) # STARTING
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50) # Nach 3s → STANDBY
# Warten auf Session-Start (3s debounce + Verarbeitung)
time.sleep(3)
# Session prüfen
sessions = wait_for_session_count(sessions_file, initial_count + 1, timeout=5)
# Neueste Session
latest = sessions[-1]
assert latest['machine_id'] == 'pytest-machine-01'
assert latest['status'] == 'running'
assert latest['start_power_w'] >= 20
def test_session_end_with_stop_debounce(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files):
"""Session Ende mit stop_debounce (15s unter 20W)"""
topic = f"{mqtt_config['topic_prefix']}/status/pm1:0"
sessions_file = test_storage_files['sessions']
initial_count = len(read_sessions(sessions_file))
# Session starten
send_shelly_message(mqtt_sender, topic, 0)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(2) # Session sollte jetzt laufen
# STANDBY für 3 Sekunden
for _ in range(3):
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
# Power runterfahren → STOPPING
send_shelly_message(mqtt_sender, topic, 15)
# STOPPING für 16 Sekunden (> 15s stop_debounce)
for i in range(16):
send_shelly_message(mqtt_sender, topic, 10 - i * 0.5)
time.sleep(1)
# Warten auf Session-Ende
time.sleep(2)
# Prüfen ob Session beendet wurde
sessions = read_sessions(sessions_file)
# Finde die Session die gerade beendet wurde
completed_sessions = [s for s in sessions if s.get('status') == 'completed' and s.get('end_reason') == 'power_drop']
assert len(completed_sessions) > 0, f"Keine completed Session mit power_drop gefunden. Sessions: {json.dumps(sessions[-3:], indent=2)}"
latest = completed_sessions[-1]
assert latest['end_reason'] == 'power_drop'
assert latest['total_duration_s'] is not None
assert latest['standby_duration_s'] is not None
def test_standby_to_working_transition(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files):
"""STANDBY → WORKING Transition"""
topic = f"{mqtt_config['topic_prefix']}/status/pm1:0"
sessions_file = test_storage_files['sessions']
# Session im STANDBY starten
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(2)
# STANDBY halten
for _ in range(3):
send_shelly_message(mqtt_sender, topic, 60)
time.sleep(1)
# → WORKING
send_shelly_message(mqtt_sender, topic, 120)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 150)
# WORKING halten
for _ in range(5):
send_shelly_message(mqtt_sender, topic, 150)
time.sleep(1)
# Session beenden
send_shelly_message(mqtt_sender, topic, 10)
for _ in range(16):
send_shelly_message(mqtt_sender, topic, 5)
time.sleep(1)
time.sleep(2)
# Prüfen
sessions = read_sessions(sessions_file)
completed = [s for s in sessions if s.get('status') == 'completed']
assert len(completed) > 0
latest = completed[-1]
# Sollte sowohl STANDBY als auch WORKING Zeit haben
assert latest.get('standby_duration_s', 0) > 0, "Keine STANDBY Zeit"
assert latest.get('working_duration_s', 0) > 0, "Keine WORKING Zeit"
def test_timeout_detection(self, bridge_process, mqtt_sender, mqtt_config, test_storage_files):
"""Timeout Detection (20s keine Messages)"""
topic = f"{mqtt_config['topic_prefix']}/status/pm1:0"
sessions_file = test_storage_files['sessions']
initial_count = len(read_sessions(sessions_file))
# Session starten
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(1)
send_shelly_message(mqtt_sender, topic, 50)
time.sleep(2)
# Letzte Message
send_shelly_message(mqtt_sender, topic, 50)
# 25 Sekunden warten (> 20s timeout)
print("\n⏱️ Warte 25 Sekunden für Timeout Detection...")
time.sleep(25)
# Prüfen ob Session mit timeout beendet wurde
sessions = read_sessions(sessions_file)
timeout_sessions = [s for s in sessions if s.get('end_reason') == 'timeout']
assert len(timeout_sessions) > 0, f"Keine timeout Session gefunden. Sessions: {json.dumps(sessions[-3:], indent=2)}"
if __name__ == '__main__':
pytest.main([__file__, '-v', '-s'])

View File

@ -0,0 +1,3 @@
"""
Test Tools - Hilfsprogramme für manuelle Tests und Debugging
"""

View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""
Test MQTT Client - Simuliert Shelly PM Mini G3 Verhalten
Sendet MQTT Messages auf das Topic 'shaperorigin/status/pm1:0'
mit verschiedenen Test-Szenarien für die Session Detection.
Usage:
python test_shelly_simulator.py --scenario standby
python test_shelly_simulator.py --scenario working
python test_shelly_simulator.py --scenario session_end
python test_shelly_simulator.py --scenario full_session
"""
import argparse
import json
import os
import sys
import time
import ssl
from datetime import datetime
from pathlib import Path
from paho.mqtt import client as mqtt_client
import yaml
class ShellySimulator:
"""Simuliert Shelly PM Mini G3 MQTT Messages"""
def __init__(self, broker_host: str, broker_port: int, username: str, password: str, topic_prefix: str = "testshelly"):
self.broker_host = broker_host
self.broker_port = broker_port
self.username = username
self.password = password
self.topic_prefix = topic_prefix
self.topic = f"{topic_prefix}/status/pm1:0"
self.client = None
def connect(self):
"""Verbindung zum MQTT Broker herstellen"""
client_id = f"shelly-simulator-{int(time.time())}"
self.client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5)
self.client.username_pw_set(self.username, self.password)
# TLS/SSL
self.client.tls_set(cert_reqs=ssl.CERT_NONE)
self.client.tls_insecure_set(True)
print(f"Verbinde zu MQTT Broker {self.broker_host}:{self.broker_port}...")
self.client.connect(self.broker_host, self.broker_port, keepalive=60)
self.client.loop_start()
time.sleep(1)
print("✓ Verbunden")
def disconnect(self):
"""Verbindung trennen"""
if self.client:
self.client.loop_stop()
self.client.disconnect()
print("✓ Verbindung getrennt")
def send_power_message(self, power_w: float, interval_s: int = 1):
"""
Sendet eine Shelly Status Message im echten Shelly PM Mini G3 Format
Args:
power_w: Leistung in Watt
interval_s: Pause nach dem Senden in Sekunden
"""
# Echtes Shelly PM Mini G3 Format (ohne result wrapper)
message = {
"id": 0,
"voltage": 230.0,
"current": round(power_w / 230.0, 3),
"apower": power_w,
"freq": 50.0,
"aenergy": {
"total": round(12345.6 + (power_w * interval_s / 3600), 1),
"by_minute": [0.0, 0.0, 0.0],
"minute_ts": int(time.time())
},
"temperature": {
"tC": 35.2,
"tF": 95.4
}
}
payload = json.dumps(message)
self.client.publish(self.topic, payload, qos=1)
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] 📤 Gesendet: {power_w:.1f}W")
if interval_s > 0:
time.sleep(interval_s)
def scenario_standby(self):
"""Szenario: Maschine im STANDBY (20-100W)"""
print("\n=== SZENARIO: STANDBY (20-100W) ===")
print("Simuliert: Maschine an, Spindel aus\n")
# Anfahren
print("Phase 1: Anfahren (0 → 50W)")
self.send_power_message(0, 1)
self.send_power_message(10, 1)
self.send_power_message(30, 1) # > 20W → STARTING
self.send_power_message(45, 1)
self.send_power_message(50, 2) # Nach 3s debounce → STANDBY
# STANDBY halten
print("\nPhase 2: STANDBY halten (50W für 20s)")
for i in range(20):
self.send_power_message(50 + (i % 5) * 2, 1) # 50-58W variieren
print("\n✓ Szenario abgeschlossen")
def scenario_working(self):
"""Szenario: Maschine WORKING (>=100W)"""
print("\n=== SZENARIO: WORKING (>=100W) ===")
print("Simuliert: Spindel läuft\n")
# Anfahren direkt zu WORKING
print("Phase 1: Anfahren (0 → 150W)")
self.send_power_message(0, 1)
self.send_power_message(30, 1) # > 20W → STARTING
self.send_power_message(80, 1)
self.send_power_message(120, 1)
self.send_power_message(150, 2) # Nach 3s debounce → WORKING
# WORKING halten
print("\nPhase 2: WORKING halten (150W für 20s)")
for i in range(20):
self.send_power_message(150 + (i % 10) * 5, 1) # 150-195W variieren
print("\n✓ Szenario abgeschlossen")
def scenario_session_end(self):
"""Szenario: Session sauber beenden mit stop_debounce"""
print("\n=== SZENARIO: SESSION END (mit stop_debounce 15s) ===")
print("Simuliert: Maschine einschalten → arbeiten → ausschalten\n")
# Start: STANDBY
print("Phase 1: Session Start → STANDBY (50W)")
self.send_power_message(0, 1)
self.send_power_message(30, 1) # STARTING
self.send_power_message(50, 1)
self.send_power_message(50, 1)
self.send_power_message(50, 2) # Nach 3s → STANDBY
# STANDBY 10s
print("\nPhase 2: STANDBY halten (50W für 10s)")
for _ in range(10):
self.send_power_message(50, 1)
# Runterfahren
print("\nPhase 3: Herunterfahren (50W → 0W)")
self.send_power_message(40, 1)
self.send_power_message(25, 1)
self.send_power_message(15, 1) # < 20W → STOPPING
# STOPPING für 15 Sekunden (stop_debounce)
print("\nPhase 4: STOPPING (< 20W für 15s = stop_debounce)")
for i in range(16):
self.send_power_message(10 - i * 0.6, 1) # 10W → ~0W über 15s
print(f" STOPPING seit {i+1}s / 15s")
print("\n✓ Session sollte jetzt beendet sein (nach stop_debounce)")
print(" Erwartung: Session END mit end_reason='power_drop'")
def scenario_full_session(self):
"""Szenario: Komplette Session mit STANDBY→WORKING→STANDBY→END"""
print("\n=== SZENARIO: FULL SESSION ===")
print("Simuliert: Start → STANDBY → WORKING → STANDBY → END\n")
# Start → STANDBY
print("Phase 1: Start → STANDBY (50W)")
self.send_power_message(0, 1)
self.send_power_message(30, 1)
self.send_power_message(50, 1)
self.send_power_message(50, 2) # → STANDBY
# STANDBY 5s
print("\nPhase 2: STANDBY (50W für 5s)")
for _ in range(5):
self.send_power_message(55, 1)
# STANDBY → WORKING
print("\nPhase 3: STANDBY → WORKING (50W → 150W)")
self.send_power_message(70, 1)
self.send_power_message(90, 1)
self.send_power_message(110, 1) # >= 100W → WORKING
self.send_power_message(130, 1)
self.send_power_message(150, 1)
# WORKING 10s
print("\nPhase 4: WORKING (150W für 10s)")
for i in range(10):
self.send_power_message(150 + (i % 5) * 10, 1)
# WORKING → STANDBY
print("\nPhase 5: WORKING → STANDBY (150W → 60W)")
self.send_power_message(120, 1)
self.send_power_message(90, 1) # < 100W → STANDBY
self.send_power_message(70, 1)
self.send_power_message(60, 1)
# STANDBY 5s
print("\nPhase 6: STANDBY (60W für 5s)")
for _ in range(5):
self.send_power_message(60, 1)
# Session END
print("\nPhase 7: Session END (60W → 0W mit stop_debounce)")
self.send_power_message(40, 1)
self.send_power_message(20, 1)
self.send_power_message(10, 1) # < 20W → STOPPING
print("\nPhase 8: STOPPING (15s debounce)")
for i in range(16):
self.send_power_message(5 - i * 0.3, 1)
print(f" STOPPING seit {i+1}s / 15s")
print("\n✓ Full Session abgeschlossen")
print(" Erwartung:")
print(" - Session START")
print(" - STANDBY → WORKING → STANDBY Transitionen")
print(" - Session END nach stop_debounce")
print(" - standby_duration_s und working_duration_s getrackt")
def scenario_timeout(self):
"""Szenario: Timeout Detection (keine Messages für 20s)"""
print("\n=== SZENARIO: TIMEOUT (message_timeout_s = 20s) ===")
print("Simuliert: Maschine läuft → plötzlicher Stromausfall (keine Messages mehr)\n")
# Start → STANDBY
print("Phase 1: Session Start → STANDBY (50W)")
self.send_power_message(0, 1)
self.send_power_message(30, 1)
self.send_power_message(50, 1)
self.send_power_message(50, 2) # → STANDBY
# STANDBY 10s
print("\nPhase 2: STANDBY (50W für 10s)")
for _ in range(10):
self.send_power_message(50, 1)
# Keine Messages mehr (Stromausfall simulieren)
print("\n⚡ STROMAUSFALL: Keine Messages mehr für 25 Sekunden")
print(" (message_timeout_s = 20s)")
for i in range(25):
time.sleep(1)
print(f" Keine Messages seit {i+1}s / 20s", end="\r")
print("\n\n✓ Timeout Szenario abgeschlossen")
print(" Erwartung: Session END nach 20s mit end_reason='timeout'")
def main():
parser = argparse.ArgumentParser(description="Shelly PM Mini G3 MQTT Simulator")
parser.add_argument(
"--scenario",
choices=["standby", "working", "session_end", "full_session", "timeout"],
required=True,
help="Test-Szenario"
)
parser.add_argument("--host", default=None, help="MQTT Broker Host (default: from env/config.yaml)")
parser.add_argument("--port", type=int, default=None, help="MQTT Broker Port (default: from env/config.yaml)")
parser.add_argument("--username", default=None, help="MQTT Username (default: from env/config.yaml)")
parser.add_argument("--password", default=None, help="MQTT Password (default: from env/config.yaml)")
parser.add_argument("--topic-prefix", default="testshelly", help="MQTT Topic Prefix (default: testshelly)")
args = parser.parse_args()
# Load credentials from environment or config.yaml if not provided via CLI
mqtt_config = {}
if not all([args.host, args.port, args.username, args.password]):
# Try environment variables first
mqtt_config = {
'host': os.getenv('MQTT_HOST'),
'port': int(os.getenv('MQTT_PORT', 0)) if os.getenv('MQTT_PORT') else None,
'username': os.getenv('MQTT_USERNAME'),
'password': os.getenv('MQTT_PASSWORD')
}
# If not all available, try config.yaml
if not all(mqtt_config.values()):
config_path = Path(__file__).parent.parent.parent / "config.yaml"
if config_path.exists():
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
mqtt_section = config.get('mqtt', {})
mqtt_config = {
'host': mqtt_section.get('broker_host'),
'port': mqtt_section.get('broker_port'),
'username': mqtt_section.get('username'),
'password': mqtt_section.get('password')
}
except Exception as e:
print(f"⚠️ Warnung: Fehler beim Lesen von config.yaml: {e}")
# Use CLI args or fallback to loaded config
host = args.host or mqtt_config.get('host')
port = args.port or mqtt_config.get('port')
username = args.username or mqtt_config.get('username')
password = args.password or mqtt_config.get('password')
# Validate that we have all required credentials
if not all([host, port, username, password]):
print("❌ Fehler: MQTT Credentials fehlen!")
print("")
print("Bitte setze die Credentials über:")
print(" 1) Kommandozeilen-Parameter:")
print(" --host <host> --port <port> --username <user> --password <pass>")
print("")
print(" 2) Umgebungsvariablen:")
print(" export MQTT_HOST=mqtt.majufilo.eu")
print(" export MQTT_PORT=8883")
print(" export MQTT_USERNAME=mosquitto")
print(" export MQTT_PASSWORD=xxx")
print("")
print(" 3) config.yaml im Projektroot")
print("")
sys.exit(1)
print("=" * 60)
print("SHELLY PM MINI G3 SIMULATOR")
print("=" * 60)
simulator = ShellySimulator(
broker_host=host,
broker_port=port,
username=username,
password=password,
topic_prefix=args.topic_prefix
)
try:
simulator.connect()
# Szenario ausführen
if args.scenario == "standby":
simulator.scenario_standby()
elif args.scenario == "working":
simulator.scenario_working()
elif args.scenario == "session_end":
simulator.scenario_session_end()
elif args.scenario == "full_session":
simulator.scenario_full_session()
elif args.scenario == "timeout":
simulator.scenario_timeout()
print("\n" + "=" * 60)
print("Test abgeschlossen. Prüfe sessions.json für Ergebnisse.")
print("=" * 60)
except KeyboardInterrupt:
print("\n\n⚠️ Abgebrochen")
except Exception as e:
print(f"\n❌ Fehler: {e}")
finally:
simulator.disconnect()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,3 @@
"""
Unit Tests - Testen einzelne Komponenten isoliert ohne externe Dependencies
"""

View File

@ -0,0 +1,396 @@
#!/usr/bin/env python3
"""
Unit Tests für SessionDetector
Testet die Dual-Threshold Session Detection ohne MQTT Broker.
Direktes Testen der Logik mit simulierten Events.
Usage:
pytest test_session_detector.py -v
pytest test_session_detector.py::test_standby_to_working -v
"""
import pytest
from datetime import datetime, timezone, timedelta
from session_detector import SessionDetector, SessionState
@pytest.fixture
def detector():
"""SessionDetector mit Test-Konfiguration"""
config = [
{
'machine_id': 'test-machine-01',
'machine_name': 'Test Machine',
'standby_threshold_w': 20,
'working_threshold_w': 100,
'start_debounce_s': 3,
'stop_debounce_s': 15,
'message_timeout_s': 20,
}
]
return SessionDetector(device_config=config)
def create_event(power_w: float, machine_id: str = 'test-machine-01', timestamp: datetime = None):
"""Helper: Erstellt ein normalisiertes Event"""
if timestamp is None:
timestamp = datetime.now(timezone.utc)
return {
'event_id': 'test-event',
'event_type': 'power_measurement',
'timestamp': timestamp.isoformat().replace('+00:00', 'Z'),
'machine': {
'machine_id': machine_id,
'machine_name': 'Test Machine'
},
'metrics': {
'power_w': power_w
}
}
class TestSessionStart:
"""Tests für Session-Start mit start_debounce"""
def test_idle_to_starting(self, detector):
"""Power >= 20W → STARTING"""
event = create_event(power_w=30)
result = detector.process_event(event)
assert result is None # Noch kein Session-Start
assert detector.get_machine_state('test-machine-01') == 'starting'
def test_starting_to_standby(self, detector):
"""Nach 3s debounce mit 20-100W → STANDBY"""
start_time = datetime.now(timezone.utc)
# T+0: Power 30W → STARTING
event1 = create_event(power_w=30, timestamp=start_time)
result1 = detector.process_event(event1)
assert result1 is None
# T+1: Power 50W → noch STARTING
event2 = create_event(power_w=50, timestamp=start_time + timedelta(seconds=1))
result2 = detector.process_event(event2)
assert result2 is None
# T+3: Power 50W → STANDBY (debounce passed)
event3 = create_event(power_w=50, timestamp=start_time + timedelta(seconds=3))
result3 = detector.process_event(event3)
assert result3 is not None
assert result3['event_type'] == 'session_start'
assert result3['session_data']['start_time'] is not None
assert detector.get_machine_state('test-machine-01') == 'standby'
def test_starting_to_working(self, detector):
"""Nach 3s debounce mit >=100W → WORKING"""
start_time = datetime.now(timezone.utc)
# T+0: Power 30W → STARTING
event1 = create_event(power_w=30, timestamp=start_time)
detector.process_event(event1)
# T+1: Power 120W → noch STARTING
event2 = create_event(power_w=120, timestamp=start_time + timedelta(seconds=1))
detector.process_event(event2)
# T+3: Power 150W → WORKING (debounce passed)
event3 = create_event(power_w=150, timestamp=start_time + timedelta(seconds=3))
result3 = detector.process_event(event3)
assert result3 is not None
assert result3['event_type'] == 'session_start'
assert detector.get_machine_state('test-machine-01') == 'working'
def test_false_start(self, detector):
"""Power fällt vor debounce → zurück zu IDLE"""
start_time = datetime.now(timezone.utc)
# T+0: Power 30W → STARTING
event1 = create_event(power_w=30, timestamp=start_time)
detector.process_event(event1)
# T+1: Power 10W → zurück zu IDLE (false start)
event2 = create_event(power_w=10, timestamp=start_time + timedelta(seconds=1))
result2 = detector.process_event(event2)
assert result2 is None
assert detector.get_machine_state('test-machine-01') == 'idle'
class TestStateTransitions:
"""Tests für Zustandsübergänge während Session"""
def test_standby_to_working(self, detector):
"""STANDBY → WORKING bei Power >= 100W"""
start_time = datetime.now(timezone.utc)
# Session starten im STANDBY
detector.process_event(create_event(power_w=30, timestamp=start_time))
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)))
assert detector.get_machine_state('test-machine-01') == 'standby'
# Power auf 150W → WORKING
event = create_event(power_w=150, timestamp=start_time + timedelta(seconds=10))
result = detector.process_event(event)
assert result is None # Keine session_end/start, nur Transition
assert detector.get_machine_state('test-machine-01') == 'working'
def test_working_to_standby(self, detector):
"""WORKING → STANDBY bei Power < 100W"""
start_time = datetime.now(timezone.utc)
# Session starten im WORKING
detector.process_event(create_event(power_w=150, timestamp=start_time))
detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3)))
assert detector.get_machine_state('test-machine-01') == 'working'
# Power auf 60W → STANDBY
event = create_event(power_w=60, timestamp=start_time + timedelta(seconds=10))
result = detector.process_event(event)
assert result is None
assert detector.get_machine_state('test-machine-01') == 'standby'
class TestSessionEnd:
"""Tests für Session-Ende mit stop_debounce"""
def test_session_end_from_standby(self, detector):
"""STANDBY → STOPPING → IDLE nach 15s < 20W"""
start_time = datetime.now(timezone.utc)
# Session starten
detector.process_event(create_event(power_w=50, timestamp=start_time))
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)))
# STANDBY für 10s
for i in range(10):
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i)))
# Power fällt unter 20W → STOPPING
t_stopping = start_time + timedelta(seconds=13)
event_stopping = create_event(power_w=10, timestamp=t_stopping)
result_stopping = detector.process_event(event_stopping)
assert result_stopping is None
assert detector.get_machine_state('test-machine-01') == 'stopping'
# STOPPING für 14s → noch keine Session-End
for i in range(14):
result = detector.process_event(
create_event(power_w=10, timestamp=t_stopping + timedelta(seconds=i + 1))
)
assert result is None # Noch kein Session-Ende
# T+15: stop_debounce passed → SESSION END
event_end = create_event(power_w=5, timestamp=t_stopping + timedelta(seconds=15))
result_end = detector.process_event(event_end)
assert result_end is not None
assert result_end['event_type'] == 'session_end'
assert result_end['session_data']['end_reason'] == 'power_drop'
assert detector.get_machine_state('test-machine-01') == 'idle'
def test_stopping_canceled_power_back_up(self, detector):
"""STOPPING abgebrochen wenn Power wieder >= 20W"""
start_time = datetime.now(timezone.utc)
# Session starten
detector.process_event(create_event(power_w=50, timestamp=start_time))
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)))
# Power fällt → STOPPING
t_stopping = start_time + timedelta(seconds=10)
detector.process_event(create_event(power_w=10, timestamp=t_stopping))
assert detector.get_machine_state('test-machine-01') == 'stopping'
# Nach 5s: Power wieder hoch → zurück zu STANDBY
event_back = create_event(power_w=60, timestamp=t_stopping + timedelta(seconds=5))
result = detector.process_event(event_back)
assert result is None # Kein Session-Ende
assert detector.get_machine_state('test-machine-01') == 'standby'
class TestDurationTracking:
"""Tests für Zeiterfassung in STANDBY und WORKING"""
def test_standby_duration_tracked(self, detector):
"""standby_duration_s wird korrekt akkumuliert"""
start_time = datetime.now(timezone.utc)
# Session Start → STANDBY
detector.process_event(create_event(power_w=50, timestamp=start_time))
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)))
# STANDBY für 10 Sekunden
for i in range(10):
detector.process_event(
create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i))
)
# Session beenden
t_end = start_time + timedelta(seconds=20)
detector.process_event(create_event(power_w=10, timestamp=t_end))
# STOPPING für genau 15 Sekunden (stop_debounce)
for i in range(14):
result = detector.process_event(
create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1))
)
assert result is None # Noch kein Session-Ende
# T+15: stop_debounce passed → SESSION END
result = detector.process_event(
create_event(power_w=0, timestamp=t_end + timedelta(seconds=15))
)
assert result is not None
assert result['session_data']['standby_duration_s'] >= 9 # ~10s STANDBY
assert result['session_data']['working_duration_s'] == 0 # Kein WORKING
def test_working_duration_tracked(self, detector):
"""working_duration_s wird korrekt akkumuliert"""
start_time = datetime.now(timezone.utc)
# Session Start → WORKING
detector.process_event(create_event(power_w=150, timestamp=start_time))
detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3)))
# WORKING für 10 Sekunden
for i in range(10):
detector.process_event(
create_event(power_w=150, timestamp=start_time + timedelta(seconds=3 + i))
)
# Session beenden
t_end = start_time + timedelta(seconds=20)
detector.process_event(create_event(power_w=10, timestamp=t_end))
# STOPPING für genau 15 Sekunden
for i in range(14):
result = detector.process_event(
create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1))
)
assert result is None
# T+15: stop_debounce passed
result = detector.process_event(
create_event(power_w=0, timestamp=t_end + timedelta(seconds=15))
)
assert result is not None
assert result['session_data']['working_duration_s'] >= 9 # ~10s WORKING
assert result['session_data']['standby_duration_s'] == 0 # Kein STANDBY
def test_mixed_standby_working_duration(self, detector):
"""STANDBY und WORKING Zeiten separat getrackt"""
start_time = datetime.now(timezone.utc)
# Start → STANDBY
detector.process_event(create_event(power_w=50, timestamp=start_time))
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)))
# STANDBY 5s
for i in range(5):
detector.process_event(
create_event(power_w=50, timestamp=start_time + timedelta(seconds=3 + i))
)
# → WORKING
t_working = start_time + timedelta(seconds=8)
detector.process_event(create_event(power_w=150, timestamp=t_working))
# WORKING 5s
for i in range(5):
detector.process_event(
create_event(power_w=150, timestamp=t_working + timedelta(seconds=i + 1))
)
# → STANDBY
t_standby2 = t_working + timedelta(seconds=6)
detector.process_event(create_event(power_w=60, timestamp=t_standby2))
# STANDBY 5s
for i in range(5):
detector.process_event(
create_event(power_w=60, timestamp=t_standby2 + timedelta(seconds=i + 1))
)
# Session beenden
t_end = t_standby2 + timedelta(seconds=6)
detector.process_event(create_event(power_w=10, timestamp=t_end))
# STOPPING für genau 15 Sekunden
for i in range(14):
result = detector.process_event(
create_event(power_w=5, timestamp=t_end + timedelta(seconds=i + 1))
)
assert result is None
# T+15: stop_debounce passed
result = detector.process_event(
create_event(power_w=0, timestamp=t_end + timedelta(seconds=15))
)
assert result is not None
# ~10s STANDBY (5s + 5s)
assert result['session_data']['standby_duration_s'] >= 9
assert result['session_data']['standby_duration_s'] <= 12
# ~5s WORKING
assert result['session_data']['working_duration_s'] >= 4
assert result['session_data']['working_duration_s'] <= 7
class TestTimeoutDetection:
"""Tests für Timeout-basierte Session-Beendigung"""
def test_timeout_from_standby(self, detector):
"""Timeout nach 20s ohne Messages im STANDBY"""
start_time = datetime.now(timezone.utc)
# Session starten
detector.process_event(create_event(power_w=50, timestamp=start_time))
detector.process_event(create_event(power_w=50, timestamp=start_time + timedelta(seconds=3)))
# Letzte Message
last_msg = start_time + timedelta(seconds=10)
detector.process_event(create_event(power_w=50, timestamp=last_msg))
# 21 Sekunden später → TIMEOUT
timeout_event = create_event(power_w=50, timestamp=last_msg + timedelta(seconds=21))
result = detector.process_event(timeout_event)
assert result is not None
assert result['event_type'] == 'session_end'
assert result['session_data']['end_reason'] == 'timeout'
assert result['power_w'] == 0.0 # Bei Timeout wird 0W angenommen
def test_timeout_from_working(self, detector):
"""Timeout nach 20s ohne Messages im WORKING"""
start_time = datetime.now(timezone.utc)
# Session starten im WORKING
detector.process_event(create_event(power_w=150, timestamp=start_time))
detector.process_event(create_event(power_w=150, timestamp=start_time + timedelta(seconds=3)))
# Letzte Message
last_msg = start_time + timedelta(seconds=10)
detector.process_event(create_event(power_w=150, timestamp=last_msg))
# 21 Sekunden später → TIMEOUT
timeout_event = create_event(power_w=150, timestamp=last_msg + timedelta(seconds=21))
result = detector.process_event(timeout_event)
assert result is not None
assert result['event_type'] == 'session_end'
assert result['session_data']['end_reason'] == 'timeout'
if __name__ == '__main__':
pytest.main([__file__, '-v'])