feat(mqtt): Implement M0-M3 - MQTT IoT Bridge with Session Detection

- M0: MQTT Client with TLS/SSL support
  - Connection to mqtt.majufilo.eu:8883
  - Topic subscription with wildcards
  - JSON payload parsing

- M1: Shelly PM Mini G3 Integration
  - Status message parser (shaperorigin/status/pm1:0)
  - Custom MQTT topic prefix support
  - Power, voltage, current, frequency extraction
  - Device mapping via topic_prefix

- M2: Event Normalization
  - Event Schema v1 with UUID-based event IDs
  - ISO 8601 UTC timestamps
  - Machine/Device info mapping
  - Metrics normalization (power_w, voltage_v, current_a, frequency_hz)
  - Filter None values

- M3: Session Detection Engine
  - State machine: IDLE → STARTING → RUNNING → STOPPING → IDLE
  - Power-based threshold detection (configurable per machine)
  - Debounce logic (separate for start/stop)
  - Session events with duration calculation
  - Persistent storage (JSONL for events, JSON for sessions)
  - Odoo-ready data format for future migration

Data Storage:
- events.jsonl: JSON Lines format (one event per line)
- sessions.json: Session records with start/end/duration
- Ready for migration to open_workshop.session model

Multi-device ready: Add devices via config.yaml with unique topic_prefix
This commit is contained in:
Matthias Lotz 2026-01-22 19:59:17 +01:00
parent 75d91984d1
commit 4c03959437
13 changed files with 2192 additions and 324 deletions

View File

@ -1,324 +0,0 @@
# Projektplan: MQTT-basierte IoT-Events in Odoo 18 Community (Simulation-first)
Stand: 2026-01-10 : https://chatgpt.com/share/696e559d-1640-800f-9d53-f7a9d1e784bd
Ziel: **Odoo-18-Community (self-hosted)** soll **Geräte-Events** (Timer/Maschinenlaufzeit, Waage, Zustandswechsel) über **MQTT** aufnehmen und in Odoo als **saubere, nachvollziehbare Sessions/Events** verarbeiten.
Wichtig: **Hardware wird zunächst vollständig simuliert** (Software-Simulatoren), damit die Odoo-Schnittstelle stabil steht, bevor echte Geräte angebunden werden.
---
## 1. Ziele und Nicht-Ziele
### 1.1 Ziele (MVP)
- Einheitliche **Device-Event-Schnittstelle** in Odoo (REST/Webhook) inkl. Authentifizierung
- **Event-Log** in Odoo (persistente Rohereignisse + Normalisierung)
- **Session-Logik** für Timer/Maschinenlaufzeit (Start/Stop/Heartbeat)
- **Simulation** von:
- Maschinenzustand (running/idle/fault)
- Timer-Events (run_start/run_stop)
- Waage (stable_weight/tare)
- Reproduzierbare Tests (Unit/Integration) und eindeutige Fehlerdiagnostik (Logging)
### 1.2 Nicht-Ziele (für Phase 1)
- Keine Enterprise-IoT-Box, keine Enterprise-Module
- Keine echte Hardware-Treiberentwicklung in Phase 1
- Kein POS-Frontend-Live-Widget (optional erst in späterer Phase)
- Keine Abrechnungslogik/Preisregeln (kann vorbereitet, aber nicht umgesetzt werden)
---
## 2. Zielarchitektur (Simulation-first)
### 2.1 Komponenten
1. **MQTT Broker** : Mosquitto in Docker vorhanden
2. **Device Simulator(s)** (Python/Node)
- veröffentlicht MQTT Topics wie echte Geräte
3. **Gateway/Bridge (Software)**
- abonniert MQTT Topics
- validiert/normalisiert Payload
- sendet Events via HTTPS an Odoo (Webhook)
4. **Odoo Modul** `ows_iot_bridge`
- REST Controller `/ows/iot/event`
- Modelle für Devices, Events, Sessions
- Business-Regeln für Session-Start/Stop/Hysterese (softwareseitig)
5. Optional später: **Realtime-Feed** (WebSocket) für POS/Display
### 2.2 Datenfluss
1. Simulator publiziert MQTT → `hobbyhimmel/machines/<machine_id>/state`
2. Bridge konsumiert MQTT, mappt auf Event-Format
3. Bridge POSTet JSON an Odoo Endpoint
4. Odoo speichert Roh-Event + erzeugt ggf. Session-Updates
---
## 3. Schnittstelle zu Odoo (Kern des Projekts)
### 3.1 Authentifizierung
- Pro Device oder pro Bridge ein **API-Token** (Bearer)
- Header: `Authorization: Bearer <token>`
- Token in Odoo in `ows.iot.device` oder `ir.config_parameter` hinterlegt
- Optional: IP-Allowlist / Rate-Limit (in Reverse Proxy)
### 3.2 Endpoint (REST/Webhook)
- `POST /ows/iot/event`
- Content-Type: `application/json`
- Antwort:
- `200 OK` mit `{ "status": "ok", "event_id": "...", "session_id": "..." }`
- Fehler: `400` (Schema), `401/403` (Auth), `409` (Duplikat), `500` (Server)
### 3.3 Idempotenz / Duplikaterkennung
- Jedes Event enthält eine eindeutige `event_uid`
- Odoo legt Unique-Constraint auf `event_uid` (pro device)
- Wiederholte Events liefern `409 Conflict` oder `200 OK (duplicate=true)` (Designentscheidung)
---
## 4. Ereignis- und Topic-Standard (Versioniert)
### 4.1 MQTT Topics (v1)
- Maschinenzustand:
`hobbyhimmel/machines/<machine_id>/state`
- Maschinenereignisse:
`hobbyhimmel/machines/<machine_id>/event`
- Waage:
`hobbyhimmel/scales/<scale_id>/event`
- Geräte-Status (optional):
`hobbyhimmel/devices/<device_id>/status`
### 4.2 Gemeinsames JSON Event Schema (v1)
Pflichtfelder:
- `schema_version`: `"v1"`
- `event_uid`: UUID/string
- `ts`: ISO-8601 UTC (z. B. `"2026-01-10T12:34:56Z"`)
- `source`: `"simulator" | "device" | "gateway"`
- `device_id`: string
- `entity_type`: `"machine" | "scale" | "sensor"`
- `entity_id`: string (z. B. machine_id)
- `event_type`: string (siehe unten)
- `payload`: object
- `confidence`: `"high" | "medium" | "low"` (für Sensorfusion)
### 4.3 Event-Typen (v1)
**Maschine/Timer**
- `run_start` (Start einer Laufphase)
- `run_stop` (Ende einer Laufphase)
- `heartbeat` (optional, laufend während running)
- `state_changed` (idle/running/fault)
- `fault` (Fehler mit Code/Severity)
**Waage**
- `stable_weight` (stabiler Messwert)
- `weight` (laufend)
- `tare`
- `zero`
- `error`
---
## 5. Odoo Datenmodell (Vorschlag)
### 5.1 `ows.iot.device`
- `name`
- `device_id` (unique)
- `token_hash` (oder Token in separater Tabelle)
- `device_type` (machine/scale/...)
- `active`
- `last_seen`
- `notes`
### 5.2 `ows.iot.event`
- `event_uid` (unique)
- `device_id` (m2o -> device)
- `entity_type`, `entity_id`
- `event_type`
- `timestamp`
- `payload_json` (Text/JSON)
- `confidence`
- `processing_state` (new/processed/error)
- `session_id` (m2o optional)
### 5.3 `ows.machine.session` (Timer-Sessions)
- `machine_id` (Char oder m2o auf bestehendes Maschinenmodell)
- `start_ts`, `stop_ts`
- `duration_s` (computed)
- `state` (running/stopped/aborted)
- `origin` (sensor/manual/sim)
- `confidence_summary`
- `event_ids` (o2m)
> Hinweis: Wenn du bereits `ows.machine` aus deinem open_workshop nutzt, referenziert `machine_id` direkt dieses Modell.
---
## 6. Verarbeitungslogik (Phase 1: minimal, robust)
### 6.1 Session-Automat (State Machine)
- Eingang: Events `run_start`, `run_stop`, optional `heartbeat`
- Regeln:
- `run_start` erstellt neue Session, wenn keine läuft
- `run_start` während laufender Session → nur Log, keine neue Session
- `run_stop` schließt laufende Session
- Timeout-Regel: wenn `heartbeat` ausbleibt (z. B. 10 min) → Session `aborted`
### 6.2 Hysterese (Simulation als Stellvertreter für Sensorik)
In Simulation definierst du:
- Start, wenn „running“ mindestens **2s stabil**
- Stop, wenn „idle“ mindestens **15s stabil**
Diese Parameter als Odoo Systemparameter, z. B.:
- `ows_iot.start_debounce_s`
- `ows_iot.stop_debounce_s`
---
## 7. Simulation (Software statt Hardware)
### 7.1 Device Simulator: Maschine
- Konfigurierbar:
- Muster: random, fixed schedule, manuell per CLI
- Zustände: idle/running/fault
- Optional: „power_w“ und „vibration“ als Felder im Payload
- Publiziert MQTT in realistischen Intervallen
### 7.2 Device Simulator: Waage
- Modi:
- stream weight (mehrfach pro Sekunde)
- stable_weight nur auf „stabil“
- tare/zero Events per CLI
### 7.3 Bridge Simulator (MQTT → Odoo)
- Abonniert alle relevanten Topics
- Validiert Schema v1
- POSTet Events an Odoo
- Retry-Queue (lokal) bei Odoo-Ausfall
- Metriken/Logs:
- gesendete Events, Fehlerquoten, Latenz
---
## 8. Milestones & Deliverables
### M0 Repo/Grundgerüst (0.51 Tag)
**Deliverables**
- Git-Repo Struktur
- Docker Compose: mosquitto + simulator + bridge
- Odoo Modul Skeleton `ows_iot_bridge`
### M1 Odoo Endpoint & Modelle (12 Tage)
**Deliverables**
- `/ows/iot/event` Controller inkl. Token-Auth
- Modelle `ows.iot.device`, `ows.iot.event`
- Admin UI: Geräte anlegen, Token setzen, letzte Events anzeigen
### M2 Session-Engine (12 Tage)
**Deliverables**
- Modell `ows.machine.session`
- Event → Session Zuordnung
- Timeout/Abbruch-Logik
- Parameter für Debounce/Timeout
### M3 Simulatoren & End-to-End Test (12 Tage)
**Deliverables**
- Machine Simulator + Scale Simulator
- Bridge mit Retry-Queue
- End-to-End: Simulator → MQTT → Bridge → Odoo → Session entsteht
### M4 Qualität & Betrieb (1 Tag)
**Deliverables**
- Logging-Konzept (Odoo + Bridge)
- Tests (Unit: Controller/Auth, Integration: Session Engine)
- Doku: Event-Schema v1, Topics, Beispielpayloads
### Optional M5 POS/Anzeige (später)
- Realtime Anzeige im POS oder auf Display
- Live-Weight in POS
---
## 9. Testplan (Simulation-first)
### 9.1 Unit Tests (Odoo)
- Auth: gültiger Token → 200
- ungültig/fehlend → 401/403
- Schema-Validation → 400
- Idempotenz: duplicate `event_uid` → 409 oder duplicate-flag
### 9.2 Integration Tests
- Sequenz: start → heartbeat → stop → Session duration plausibel
- stop ohne start → kein Crash, Event loggt Fehlerzustand
- Timeout: start → keine heartbeat → Session aborted
### 9.3 Last-/Stabilitätstest (Simulator)
- 20 Maschinen, je 1 Event/s, 1h Lauf
- Ziel: Odoo bleibt stabil, Event-Insert performant, Queue läuft nicht über
---
## 10. Betriebs- und Sicherheitskonzept
- Token-Rotation möglich (neues Token, altes deaktivieren)
- Reverse Proxy:
- HTTPS
- Rate limiting auf `/ows/iot/event`
- optional Basic WAF Regeln
- Payload-Größenlimit (z. B. 1664 KB)
- Bridge persistiert Retry-Queue auf Disk
---
## 11. Offene Entscheidungen (später, nicht blocker für MVP)
- Event-Consumer in Odoo vs. Bridge-only Webhook (empfohlen: Webhook)
- Genaues Mapping zu bestehenden open_workshop Modellen (`ows.machine`, Nutzer/RFID)
- Abrechnung: Preisregeln, Rundungen, POS-Integration
- Realtime: Odoo Bus vs. eigener WebSocket Service
---
## 12. Nächste Schritte (konkret)
1. `ows_iot_bridge` Modul: Endpoint + Device/Event Modelle implementieren
2. Docker Compose: mosquitto + bridge + simulator bereitstellen
3. Simulatoren produzieren v1-Events; Bridge sendet an Odoo
4. Session-Engine anschließen und End-to-End testen
---
## Anhang A: Beispiel-Event (Maschine run_start)
```json
{
"schema_version": "v1",
"event_uid": "c2a7d6f1-7d8d-4a63-9a7f-4e6d7b0d9e2a",
"ts": "2026-01-10T12:34:56Z",
"source": "simulator",
"device_id": "esp32-fraser-01",
"entity_type": "machine",
"entity_id": "formatkreissaege",
"event_type": "run_start",
"confidence": "high",
"payload": {
"power_w": 820,
"vibration": 0.73,
"reason": "power_threshold"
}
}
```
## Anhang B: Beispiel-Event (Waage stable_weight)
```json
{
"schema_version": "v1",
"event_uid": "b6d0a2c5-9b1f-4a0b-8b19-7f2e1b8f3d11",
"ts": "2026-01-10T12:40:12Z",
"source": "simulator",
"device_id": "scale-sim-01",
"entity_type": "scale",
"entity_id": "waage-01",
"event_type": "stable_weight",
"confidence": "high",
"payload": {
"weight_g": 1532.4,
"unit": "g",
"stable_ms": 1200
}
}
```

View File

@ -0,0 +1,530 @@
# Projektplan: MQTT-basierte IoT-Events in Odoo 18 Community (Hardware-First mit Shelly PM Mini G3)
Stand: 2026-01-22 (aktualisiert)
Original: 2026-01-10 : https://chatgpt.com/share/696e559d-1640-800f-9d53-f7a9d1e784bd
Ziel: **Odoo-18-Community (self-hosted)** soll **Geräte-Events** (Timer/Maschinenlaufzeit, Waage, Zustandswechsel) über **MQTT** aufnehmen und in Odoo als **saubere, nachvollziehbare Sessions/Events** verarbeiten.
**Änderung der Vorgehensweise**: Entwicklung startet mit **echtem Shelly PM Mini G3** Hardware-Device, das bereits im MQTT Broker integriert ist. Python-Prototyp wird so entwickelt, dass Code später direkt in Odoo-Bridge übernommen werden kann.
---
## 1. Ziele und Nicht-Ziele
### 1.1 Ziele (MVP)
- **Hardware-Integration**: Shelly PM Mini G3 als primäres Test-Device
- **Session-Logik** für Maschinenlaufzeit basierend auf Power-Schwellenwerten (Start/Stop mit Hysterese)
- **Python-Prototyp** (Standalone) der später in Odoo-Bridge übernommen wird:
- MQTT Client für Shelly-Integration
- Event-Normalisierung (Shelly-Format → Unified Event Schema)
- Session-Detection Engine
- Konfigurierbare Device-Mappings und Schwellenwerte
- **Odoo-Integration** (Phase 2):
- Einheitliche **Device-Event-Schnittstelle** (REST/Webhook) inkl. Authentifizierung
- **Event-Log** in Odoo (persistente Rohereignisse + Normalisierung)
- Session-Verwaltung mit Maschinenzuordnung
- Reproduzierbare Tests und eindeutige Fehlerdiagnostik (Logging)
### 1.2 Nicht-Ziele (für Phase 1)
- Keine Enterprise-IoT-Box, keine Enterprise-Module
- Keine Simulatoren in Phase 1 (nur echte Hardware)
- Kein POS-Frontend-Live-Widget (optional erst in späterer Phase)
- Keine Abrechnungslogik/Preisregeln (kann vorbereitet, aber nicht umgesetzt werden)
---
## 2. Zielarchitektur (Simulation-first)
### 2.1 Komponenten
1. **MQTT Broker** : Mosquitto in Docker vorhanden
2. **Device Simulator(s)** (Python/Node)
- veröffentlicht MQTT Topics wie echte Geräte
3. **Gateway/Bridge (Software)**
- abonniert MQTT Topics
- validiert/normalisiert Payload
- sendet Events via HTTPS an Odoo (Webhook)
4. **Odoo Modul** `ows_iot_bridge`
- REST Controller `/ows/iot/event`
- Modelle für Devices, Events, Sessions
- Business-Regeln für Session-Start/Stop/Hysterese (softwareseitig)
5. Optional später: **Realtime-Feed** (WebSocket) für POS/Display
### 2.2 Datenfluss
1. Simulator publiziert MQTT → `hobbyhimmel/machines/<machine_id>/state`
2. Bridge konsumiert MQTT, mappt auf Event-Format
3. Bridge POSTet JSON an Odoo Endpoint
4. Odoo speichert Roh-Event + erzeugt ggf. Session-Updates
---
## 3. Schnittstelle zu Odoo (Kern des Projekts)
### 3.1 Authentifizierung
- Pro Device oder pro Bridge ein **API-Token** (Bearer)
- Header: `Authorization: Bearer <token>`
- Token in Odoo in `ows.iot.device` oder `ir.config_parameter` hinterlegt
- Optional: IP-Allowlist / Rate-Limit (in Reverse Proxy)
### 3.2 Endpoint (REST/Webhook)
- `POST /ows/iot/event`
- Content-Type: `application/json`
- Antwort:
- `200 OK` mit `{ "status": "ok", "event_id": "...", "session_id": "..." }`
- Fehler: `400` (Schema), `401/403` (Auth), `409` (Duplikat), `500` (Server)
### 3.3 Idempotenz / Duplikaterkennung
- Jedes Event enthält eine eindeutige `event_uid`
- Odoo legt Unique-Constraint auf `event_uid` (pro device)
- Wiederholte Events liefern `409 Conflict` oder `200 OK (duplicate=true)` (Designentscheidung)
---
## 4. Ereignis- und Topic-Standard (Versioniert)
### 4.1 MQTT Topics (v1)
- Maschinenzustand:
`hobbyhimmel/machines/<machine_id>/state`
- Maschinenereignisse:
`hobbyhimmel/machines/<machine_id>/event`
- Waage:
`hobbyhimmel/scales/<scale_id>/event`
- Geräte-Status (optional):
`hobbyhimmel/devices/<device_id>/status`
### 4.2 Gemeinsames JSON Event Schema (v1)
Pflichtfelder:
- `schema_version`: `"v1"`
- `event_uid`: UUID/string
- `ts`: ISO-8601 UTC (z. B. `"2026-01-10T12:34:56Z"`)
- `source`: `"simulator" | "device" | "gateway"`
- `device_id`: string
- `entity_type`: `"machine" | "scale" | "sensor"`
- `entity_id`: string (z. B. machine_id)
- `event_type`: string (siehe unten)
- `payload`: object
- `confidence`: `"high" | "medium" | "low"` (für Sensorfusion)
### 4.3 Event-Typen (v1)
**Maschine/Timer**
- `run_start` (Start einer Laufphase)
- `run_stop` (Ende einer Laufphase)
- `heartbeat` (optional, laufend während running)
- `state_changed` (idle/running/fault)
- `fault` (Fehler mit Code/Severity)
**Waage**
- `stable_weight` (stabiler Messwert)
- `weight` (laufend)
- `tare`
- `zero`
- `error`
---
## 5. Odoo Datenmodell (Vorschlag)
### 5.1 `ows.iot.device`
- `name`
- `device_id` (unique)
- `token_hash` (oder Token in separater Tabelle)
- `device_type` (machine/scale/...)
- `active`
- `last_seen`
- `notes`
### 5.2 `ows.iot.event`
- `event_uid` (unique)
- `device_id` (m2o -> device)
- `entity_type`, `entity_id`
- `event_type`
- `timestamp`
- `payload_json` (Text/JSON)
- `confidence`
- `processing_state` (new/processed/error)
- `session_id` (m2o optional)
### 5.3 `ows.machine.session` (Timer-Sessions)
- `machine_id` (Char oder m2o auf bestehendes Maschinenmodell)
- `start_ts`, `stop_ts`
- `duration_s` (computed)
- `state` (running/stopped/aborted)
- `origin` (sensor/manual/sim)
- `confidence_summary`
- `event_ids` (o2m)
> Hinweis: Wenn du bereits `ows.machine` aus deinem open_workshop nutzt, referenziert `machine_id` direkt dieses Modell.
---
## 6. Verarbeitungslogik (Phase 1: minimal, robust)
### 6.1 Session-Automat (State Machine)
- Eingang: Events `run_start`, `run_stop`, optional `heartbeat`
- Regeln:
- `run_start` erstellt neue Session, wenn keine läuft
- `run_start` während laufender Session → nur Log, keine neue Session
- `run_stop` schließt laufende Session
- Timeout-Regel: wenn `heartbeat` ausbleibt (z. B. 10 min) → Session `aborted`
### 6.2 Hysterese (Simulation als Stellvertreter für Sensorik)
In Simulation definierst du:
- Start, wenn „running“ mindestens **2s stabil**
- Stop, wenn „idle“ mindestens **15s stabil**
Diese Parameter als Odoo Systemparameter, z. B.:
- `ows_iot.start_debounce_s`
- `ows_iot.stop_debounce_s`
---
## 7. Simulation (Software statt Hardware)
### 7.1 Device Simulator: Maschine
- Konfigurierbar:
- Muster: random, fixed schedule, manuell per CLI
- Zustände: idle/running/fault
- Optional: „power_w“ und „vibration“ als Felder im Payload
- Publiziert MQTT in realistischen Intervallen
### 7.2 Device Simulator: Waage
- Modi:
- stream weight (mehrfach pro Sekunde)
- stable_weight nur auf „stabil“
- tare/zero Events per CLI
### 7.3 Bridge Simulator (MQTT → Odoo)
- Abonniert alle relevanten Topics
- Validiert Schema v1
- POSTet Events an Odoo
- Retry-Queue (lokal) bei Odoo-Ausfall
- Metriken/Logs:
- gesendete Events, Fehlerquoten, Latenz
---
## 8. Milestones & Deliverables (NEU: Hardware-First Approach)
### Phase 1: Python-Prototyp (Standalone, ohne Odoo)
#### M0 Projekt Setup & MQTT Verbindung (0.5 Tag)
**Deliverables**
- Python Projekt Struktur
- Requirements.txt (paho-mqtt, pyyaml, etc.)
- Config-Datei (YAML): MQTT Broker Settings
- MQTT Client Basis-Verbindung testen
**Test**: Verbindung zum MQTT Broker herstellen, Topics anzeigen
---
#### M1 Shelly PM Mini G3 Integration (1 Tag)
**Deliverables**
- MQTT Subscriber für Shelly-Topics
- Parser für Shelly JSON-Payloads (beide Formate):
- Status-Updates (full data mit voltage, current, apower)
- NotifyStatus Events (einzelne Werte)
- Datenextraktion: `apower`, `timestamp`, `device_id`
- Logging aller empfangenen Shelly-Messages
**Test**: Shelly-Daten empfangen und parsen, Console-Output
---
#### M2 Event-Normalisierung & Unified Schema (1 Tag)
**Deliverables**
- Unified Event Schema v1 Implementation
- Shelly-to-Unified Konverter:
- `apower` Updates → `power_measurement` Events
- Enrichment mit `device_id`, `machine_id`, `timestamp`
- Event-UID Generierung
- JSON-Export der normalisierten Events
**Test**: Shelly-Daten werden zu Schema-v1-konformen Events konvertiert
---
#### M3 Session Detection Engine (1-2 Tage)
**Deliverables**
- State Machine für run_start/run_stop Detection
- Konfigurierbare Parameter pro Device:
- `power_threshold` (z.B. 50W)
- `start_debounce_s` (z.B. 3s)
- `stop_debounce_s` (z.B. 15s)
- Session-Objekte:
- `session_id`, `machine_id`, `start_ts`, `stop_ts`, `duration_s`
- `events` (Liste der zugehörigen Events)
- Session-Export (JSON/CSV)
**Test**:
- Maschine anschalten → `run_start` Event
- Maschine ausschalten → `run_stop` Event
- Sessions werden korrekt erkannt und gespeichert
---
#### M4 Multi-Device Support & Config (0.5-1 Tag)
**Deliverables**
- Device-Registry in Config:
```yaml
devices:
- shelly_id: "shellypmminig3-48f6eeb73a1c"
machine_name: "Shaper Origin"
power_threshold: 50
start_debounce_s: 3
stop_debounce_s: 15
```
- Dynamisches Laden mehrerer Devices
- Parallele Session-Tracking pro Device
**Test**: Mehrere Shelly-Devices in Config, jedes wird separat getrackt
---
#### M5 Monitoring & Robustheit (0.5 Tag)
**Deliverables**
- Reconnect-Logik bei MQTT-Disconnect
- Error-Handling bei ungültigen Payloads
- Statistiken: empfangene Events, aktive Sessions, Fehler
- Strukturiertes Logging (Logger-Konfiguration)
**Test**: MQTT Broker Neustart → Client reconnected automatisch
---
### Phase 2: Odoo-Integration
#### M6 Odoo Modul Grundgerüst (0.5 Tag)
**Deliverables**
- Odoo Modul `ows_iot_bridge`
- Modelle: `ows.iot.device`, `ows.iot.event`, `ows.machine.session`
- Admin UI: Device-Liste, Event-Log, Session-Übersicht
---
#### M7 REST Endpoint & Authentication (1 Tag)
**Deliverables**
- Controller `/ows/iot/event` (POST)
- Token-basierte Authentifizierung
- Event-Validation gegen Schema v1
- Event-Speicherung in `ows.iot.event`
**Test**: Python-Script POSTet Events an Odoo, werden gespeichert
---
#### M8 Python-Bridge Integration (1 Tag)
**Deliverables**
- Python-Prototyp wird zur Odoo-Bridge:
- Events werden an Odoo Endpoint gesendet (statt nur File-Export)
- Retry-Queue bei Odoo-Ausfall
- Bridge läuft als separater Service (Docker optional)
**Test**: End-to-End: Shelly → MQTT → Bridge → Odoo → Event sichtbar in Odoo
---
#### M9 Session-Engine in Odoo (1 Tag)
**Deliverables**
- Session-Logik aus Python-Prototyp nach Odoo portieren
- Events → Session-Zuordnung
- Session-Updates bei run_start/run_stop
- Admin UI: Sessions anzeigen, Filter nach Maschine/Zeitraum
**Test**: Sessions werden in Odoo korrekt erstellt und geschlossen
---
#### M10 Tests & Dokumentation (1 Tag)
**Deliverables**
- Unit Tests: Event-Parsing, Session-Detection
- Integration Tests: MQTT → Odoo End-to-End
- Dokumentation: Setup-Anleitung, Config-Beispiele, API-Docs
---
### Optional M11 POS/Anzeige (später)
- Realtime Anzeige im POS oder auf Display
- Live Power-Consumption in Odoo
---
## 9. Testplan (Simulation-first)
### 9.1 Unit Tests (Odoo)
- Auth: gültiger Token → 200
- ungültig/fehlend → 401/403
- Schema-Validation → 400
- Idempotenz: duplicate `event_uid` → 409 oder duplicate-flag
### 9.2 Integration Tests
- Sequenz: start → heartbeat → stop → Session duration plausibel
- stop ohne start → kein Crash, Event loggt Fehlerzustand
- Timeout: start → keine heartbeat → Session aborted
### 9.3 Last-/Stabilitätstest (Simulator)
- 20 Maschinen, je 1 Event/s, 1h Lauf
- Ziel: Odoo bleibt stabil, Event-Insert performant, Queue läuft nicht über
---
## 10. Betriebs- und Sicherheitskonzept
- Token-Rotation möglich (neues Token, altes deaktivieren)
- Reverse Proxy:
- HTTPS
- Rate limiting auf `/ows/iot/event`
- optional Basic WAF Regeln
- Payload-Größenlimit (z. B. 1664 KB)
- Bridge persistiert Retry-Queue auf Disk
---
## 11. Offene Entscheidungen (später, nicht blocker für MVP)
- Event-Consumer in Odoo vs. Bridge-only Webhook (empfohlen: Webhook)
- Genaues Mapping zu bestehenden open_workshop Modellen (`ows.machine`, Nutzer/RFID)
- Abrechnung: Preisregeln, Rundungen, POS-Integration
- Realtime: Odoo Bus vs. eigener WebSocket Service
---
## 12. Nächste Schritte (AKTUALISIERT: Hardware-First)
### Sofort starten (Phase 1 - Python Prototyp):
1. **M0**: Python-Projekt aufsetzen, MQTT-Verbindung testen
2. **M1**: Shelly PM Mini G3 Daten empfangen und parsen
3. **M2**: Event-Normalisierung implementieren (Shelly → Schema v1)
4. **M3**: Session Detection Engine (run_start/run_stop basierend auf Power)
5. **M4**: Multi-Device Config-Support
### Danach (Phase 2 - Odoo Integration):
6. **M6-M9**: Odoo Module + Bridge + Session-Engine
7. **M10**: Tests & Dokumentation
### Offene Informationen benötigt:
- [ ] Genaue MQTT Topic-Namen des Shelly PM Mini G3
- [ ] Typische Power-Werte des Shaper Origin (Idle vs. Running)
- [ ] Gewünschte Schwellenwerte für Start/Stop-Detection
- [ ] MQTT Broker Zugangsdaten (Host, Port, User, Password)
---
## Anhang C: Shelly PM Mini G3 Payload-Beispiele
### Format 1: Full Status Update
```json
{
"id": 0,
"voltage": 234.7,
"current": 0.289,
"apower": 59.7,
"freq": 49.9,
"aenergy": {
"total": 256.325,
"by_minute": [415.101, 830.202, 830.202],
"minute_ts": 1769100576
},
"ret_aenergy": {
"total": 0.000,
"by_minute": [0.000, 0.000, 0.000],
"minute_ts": 1769100576
}
}
```
### Format 2: NotifyStatus Event
```json
{
"src": "shellypmminig3-48f6eeb73a1c",
"dst": "shellypmminig3/events",
"method": "NotifyStatus",
"params": {
"ts": 1769100615.87,
"pm1:0": {
"id": 0,
"current": 0.185
}
}
}
```
### Mapping zu Unified Event Schema
```json
{
"schema_version": "v1",
"event_uid": "generated-uuid",
"ts": "2026-01-22T10:30:15Z",
"source": "shelly_pm_mini_g3",
"device_id": "shellypmminig3-48f6eeb73a1c",
"entity_type": "machine",
"entity_id": "shaper-origin-01",
"event_type": "power_measurement",
"confidence": "high",
"payload": {
"apower": 59.7,
"voltage": 234.7,
"current": 0.289,
"frequency": 49.9,
"total_energy_kwh": 0.256325
}
}
```
### Session Detection Beispiel
```yaml
# Device Config
device_id: shellypmminig3-48f6eeb73a1c
machine_name: Shaper Origin
power_threshold: 50 # Watt
start_debounce_s: 3 # Power > threshold für 3s → run_start
stop_debounce_s: 15 # Power < threshold für 15s run_stop
```
**Session Flow**:
1. `apower: 5W` (idle) → keine Session
2. `apower: 120W` → Power > 50W → Debounce startet
3. Nach 3s immer noch > 50W → `run_start` Event, Session öffnet
4. `apower: 95W, 110W, 88W` → Session läuft weiter
5. `apower: 12W` → Power < 50W Stop-Debounce startet
6. Nach 15s immer noch < 50W `run_stop` Event, Session schließt
7. Session: `duration = stop_ts - start_ts`
---
## Anhang A: Beispiel-Event (Maschine run_start)
```json
{
"schema_version": "v1",
"event_uid": "c2a7d6f1-7d8d-4a63-9a7f-4e6d7b0d9e2a",
"ts": "2026-01-10T12:34:56Z",
"source": "simulator",
"device_id": "esp32-fraser-01",
"entity_type": "machine",
"entity_id": "formatkreissaege",
"event_type": "run_start",
"confidence": "high",
"payload": {
"power_w": 820,
"vibration": 0.73,
"reason": "power_threshold"
}
}
```
## Anhang B: Beispiel-Event (Waage stable_weight)
```json
{
"schema_version": "v1",
"event_uid": "b6d0a2c5-9b1f-4a0b-8b19-7f2e1b8f3d11",
"ts": "2026-01-10T12:40:12Z",
"source": "simulator",
"device_id": "scale-sim-01",
"entity_type": "scale",
"entity_id": "waage-01",
"event_type": "stable_weight",
"confidence": "high",
"payload": {
"weight_g": 1532.4,
"unit": "g",
"stable_ms": 1200
}
}
```

View File

@ -0,0 +1,31 @@
# Config with secrets
config.yaml
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
build/
dist/
*.egg-info/
# Data & Logs
data/
logs/
*.log
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db

View File

@ -0,0 +1,225 @@
# Open Workshop IoT Bridge - Python Prototype
MQTT-basierte IoT-Integration für Odoo 18 Community Edition
## Projektstatus
**Stand: 2026-01-22**
**Phase 1: Standalone Python Prototype** (aktuell)
- ✅ M0: Projekt Setup & MQTT Connection (abgeschlossen)
- Virtual Environment erstellt
- MQTT Client mit TLS/SSL Support
- Verbindung zu mqtt.majufilo.eu:8883 erfolgreich
- ✅ M1: Shelly PM Mini G3 Integration (abgeschlossen)
- Parser für Status Messages (shaperorigin/status/pm1:0)
- Datenextraktion: apower, voltage, current, frequency, total_energy
- Custom MQTT Topic Prefix Support (shaperorigin)
- Live-Monitoring funktioniert
- ✅ M2: Event-Normalisierung (abgeschlossen)
- Event Schema v1 implementiert
- UUID-basierte Event IDs
- ISO 8601 UTC Timestamps
- Machine/Device Mapping
- Metrics Normalisierung (power_w, voltage_v, current_a, frequency_hz)
- ✅ M3: Session Detection Engine (abgeschlossen)
- State Machine: IDLE → STARTING → RUNNING → STOPPING → IDLE
- Power-basierte Schwellenwerte (konfigurierbar pro Maschine)
- Debounce-Logik (Start/Stop getrennt konfigurierbar)
- Session Events: session_start, session_end mit Duration
- Persistente Speicherung (JSON)
- ⏳ M4: Multi-Device Support (vorbereitet, Config-ready)
- ⏳ M5: Monitoring & Robustheit
**Phase 2: Odoo Integration** (geplant)
- M6-M10: Odoo Module + Bridge + Tests
## Beschreibung
Dieser Python-Prototyp dient als Grundlage für die spätere Odoo-Integration. Er:
- Empfängt MQTT Events von IoT-Geräten (z.B. Shelly PM Mini G3)
- Normalisiert die Daten in ein einheitliches Event-Schema (Event Schema v1)
- Erkennt Maschinenlaufzeit-Sessions basierend auf Power-Schwellenwerten
- Speichert Events und Sessions persistent (JSONL/JSON)
- Unterstützt mehrere Geräte parallel (topic_prefix basiert)
Der Code ist so strukturiert, dass er später direkt in eine Odoo-Bridge übernommen werden kann.
Die JSON-Speicherung ist für Migration zu Odoo-Models vorbereitet:
- `data/events.jsonl``open_workshop.power_event`
- `data/sessions.json``open_workshop.session`
## Installation
### Voraussetzungen
- Python 3.8+
- MQTT Broker (z.B. Mosquitto)
- pip
### Setup
1. **Dependencies installieren**
```bash
pip install -r requirements.txt
```
2. **Konfiguration erstellen**
```bash
cp config.yaml.example config.yaml
```
3. **config.yaml anpassen**
Bearbeite `config.yaml` und setze:
- MQTT Broker Host/Port/Credentials
- Device-Konfiguration (Shelly IDs, Schwellenwerte)
- MQTT Topics
## Verwendung
### Starten
```bash
cd /pfad/zu/python_prototype
source venv/bin/activate
python main.py
```
Dieser Befehl:
- Verbindet sich mit dem MQTT Broker (TLS/SSL)
- Abonniert die konfigurierten Topics
- Empfängt Shelly PM Mini G3 Status Messages
- Normalisiert Events (Event Schema v1)
- Detektiert Session Start/End Events
- Speichert in `data/events.jsonl` und `data/sessions.json`
### Erwartete Ausgabe
```
2026-01-22 19:36:17 - __main__ - INFO - === Open Workshop IoT Bridge Starting ===
2026-01-22 19:36:17 - mqtt_client - INFO - TLS/SSL enabled for port 8883
2026-01-22 19:36:17 - mqtt_client - INFO - Connected to MQTT Broker at mqtt.majufilo.eu:8883
2026-01-22 19:36:17 - mqtt_client - INFO - Subscribed to topic: shaperorigin/#
2026-01-22 19:36:17 - __main__ - INFO - IoT Bridge started successfully
2026-01-22 19:36:17 - __main__ - INFO - Listening for MQTT messages... (Press Ctrl+C to stop)
2026-01-22 19:36:17 - session_detector - INFO - 🟡 Shaper Origin: Power 43.3W >= 30W → STARTING
2026-01-22 19:36:20 - session_detector - INFO - 🟢 Shaper Origin: Session START (debounce 3.0s)
2026-01-22 19:36:20 - __main__ - INFO - 🚀 SESSION START
2026-01-22 19:37:03 - session_detector - INFO - 🟠 Shaper Origin: Power 0.0W < 30W STOPPING
2026-01-22 19:37:18 - session_detector - INFO - 🔴 Shaper Origin: Session END (debounce 15.0s)
2026-01-22 19:37:18 - __main__ - INFO - 🏁 SESSION END - Duration: 58s (0.97 min)
```
## Konfiguration
### MQTT Broker
```yaml
mqtt:
host: "localhost"
port: 1883
username: ""
password: ""
topics:
- "shellies/+/status"
```
### Devices
```yaml
devices:
- topic_prefix: "shaperorigin" # Custom MQTT Prefix (im Shelly konfiguriert)
machine_name: "Shaper Origin"
machine_id: "shaper-origin-01"
device_type: "shelly_pm_mini_g3"
power_threshold: 30 # Watt (Schwellenwert für Session-Erkennung)
start_debounce_s: 3 # Verzögerung bis Session Start
stop_debounce_s: 15 # Verzögerung bis Session End
enabled: true
```
**Multi-Device Support:** Einfach weitere Geräte hinzufügen mit unterschiedlichen `topic_prefix`.
Jedes Gerät benötigt im Shelly einen eigenen Custom MQTT Prefix.
## Projektstruktur
```
python_prototype/
├── main.py # Entry point & Orchestration
├── mqtt_client.py # MQTT Client wrapper (TLS/SSL)
├── shelly_parser.py # Shelly PM Mini G3 Message Parser
├── event_normalizer.py # Event Schema v1 Normalizer
├── session_detector.py # Session Detection State Machine
├── event_storage.py # Persistent Storage (JSONL/JSON)
├── config.yaml # Configuration (nicht im Git)
├── config.yaml.example # Config template
├── requirements.txt # Python dependencies
├── README.md # Diese Datei
├── data/ # Output directory
│ ├── events.jsonl # Events (JSON Lines)
│ └── sessions.json # Sessions (JSON Array)
└── logs/ # Log files
└── ows_iot_bridge.log
```
## Nächste Schritte
### M4: Multi-Device Support (vorbereitet)
- Zweites Shelly-Device konfigurieren
- Parallele Überwachung mehrerer Maschinen testen
### M5: Monitoring & Robustheit
- MQTT Reconnect-Logik
- Error Handling & Recovery
- Systemd Service Setup
- Health Monitoring
### M6+: Odoo Integration
- Odoo Models: `open_workshop.machine`, `open_workshop.session`
- Migration: JSON → Odoo Database
- Views & Dashboards
- Live-Monitoring in Odoo
## Gespeicherte Daten
### Events (data/events.jsonl)
JSON Lines Format - ein Event pro Zeile:
```json
{"event_id":"uuid","event_type":"power_measurement","timestamp":"2026-01-22T18:45:09.985Z","machine":{"machine_id":"shaper-origin-01","machine_name":"Shaper Origin"},"metrics":{"power_w":43.8,"voltage_v":230.2,"current_a":0.19}}
```
### Sessions (data/sessions.json)
JSON Array mit Session-Objekten:
```json
[
{
"session_id": "uuid",
"machine_id": "shaper-origin-01",
"machine_name": "Shaper Origin",
"start_time": "2026-01-22T18:52:59.000Z",
"end_time": "2026-01-22T18:54:01.995Z",
"duration_s": 62,
"start_power_w": 37.1,
"end_power_w": 0.0,
"status": "completed"
}
]
```
## Troubleshooting
### Connection refused
```
Error: Failed to connect to MQTT Broker: [Errno 111] Connection refused
```
→ Prüfe ob MQTT Broker läuft und erreichbar ist
### Permission denied
```
PermissionError: [Errno 13] Permission denied: 'logs/ows_iot_bridge.log'
```
→ Stelle sicher, dass das logs/ Verzeichnis beschreibbar ist
### Invalid config
```
Error: Config file not found: config.yaml
```
→ Erstelle config.yaml von config.yaml.example
## Support
Siehe Feature Request Dokument: `FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md`

View File

@ -0,0 +1,58 @@
# MQTT Broker Configuration
mqtt:
host: "localhost" # MQTT Broker IP/Hostname
port: 1883 # Standard MQTT Port (1883 unencrypted, 8883 encrypted)
username: "" # Optional: MQTT Username
password: "" # Optional: MQTT Password
client_id: "ows_iot_bridge_prototype"
keepalive: 60
# Topics to subscribe
topics:
- "shellies/+/status" # Shelly Status Updates
- "shellypmminig3/events" # Shelly Events
- "shellies/+/online" # Shelly Online Status
# Add more topics as needed
# Device Configuration
devices:
- shelly_id: "shellypmminig3-48f6eeb73a1c"
machine_name: "Shaper Origin"
machine_id: "shaper-origin-01"
device_type: "shelly_pm_mini_g3"
# Power threshold for run detection (in Watts)
power_threshold: 50
# Debounce times (in seconds)
start_debounce_s: 3 # Power > threshold for X seconds → run_start
stop_debounce_s: 15 # Power < threshold for Y seconds → run_stop
enabled: true
# Example for additional device
# - shelly_id: "shellypmminig3-xxxxxxxxxxxx"
# machine_name: "CNC Mill"
# machine_id: "cnc-mill-01"
# device_type: "shelly_pm_mini_g3"
# power_threshold: 100
# start_debounce_s: 5
# stop_debounce_s: 20
# enabled: true
# Logging Configuration
logging:
level: "INFO" # DEBUG, INFO, WARNING, ERROR
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file: "logs/ows_iot_bridge.log" # Optional: log to file
console: true
# Output Configuration
output:
# Where to store events and sessions
events_file: "data/events.json"
sessions_file: "data/sessions.json"
# Console output
print_events: true
print_sessions: true

View File

@ -0,0 +1,192 @@
"""
Event Normalizer for Open Workshop IoT Bridge
Converts device-specific data formats into a unified Event Schema v1
for consistent processing across different IoT device types.
"""
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
class EventNormalizer:
"""
Normalizes device-specific events into unified Event Schema v1
Event Schema v1:
{
"event_id": "uuid4",
"event_type": "power_measurement",
"timestamp": "2026-01-22T18:30:45.123456Z",
"machine": {
"machine_id": "shaper-origin-01",
"machine_name": "Shaper Origin"
},
"device": {
"device_id": "48f6eeb73a1c",
"device_type": "shelly_pm_mini_g3",
"topic_prefix": "shaperorigin"
},
"metrics": {
"power_w": 52.9,
"voltage_v": 234.8,
"current_a": 0.267,
"frequency_hz": 49.9
},
"raw_data": { ... } # Optional: Original device data
}
"""
def __init__(self, device_config: list = None):
"""
Initialize normalizer with device configuration
Args:
device_config: List of device configurations from config.yaml
"""
self.logger = logging.getLogger(__name__)
self.device_config = device_config or []
# Build topic_prefix to machine mapping
self.prefix_machine_map = {}
for device in self.device_config:
topic_prefix = device.get('topic_prefix')
if topic_prefix:
self.prefix_machine_map[topic_prefix] = {
'machine_id': device.get('machine_id'),
'machine_name': device.get('machine_name'),
'device_type': device.get('device_type', 'unknown')
}
def normalize_shelly_event(self, shelly_data: Dict) -> Optional[Dict]:
"""
Convert Shelly PM Mini G3 data to Event Schema v1
Args:
shelly_data: Parsed Shelly data from ShellyParser
Returns:
Normalized event dict or None if data incomplete
"""
try:
# Extract device ID and find topic prefix
device_id = shelly_data.get('device_id', 'unknown')
topic_prefix = self._find_topic_prefix(device_id, shelly_data)
if not topic_prefix:
self.logger.warning(f"Could not determine topic_prefix for device {device_id}")
return None
# Get machine info from config
machine_info = self.prefix_machine_map.get(topic_prefix)
if not machine_info:
self.logger.warning(f"No machine config found for topic_prefix: {topic_prefix}")
return None
# Build normalized event
event = {
"event_id": str(uuid.uuid4()),
"event_type": self._determine_event_type(shelly_data),
"timestamp": self._normalize_timestamp(shelly_data.get('timestamp')),
"machine": {
"machine_id": machine_info['machine_id'],
"machine_name": machine_info['machine_name']
},
"device": {
"device_id": device_id,
"device_type": machine_info['device_type'],
"topic_prefix": topic_prefix
},
"metrics": self._extract_metrics(shelly_data),
"raw_data": shelly_data # Keep original for debugging
}
return event
except Exception as e:
self.logger.error(f"Failed to normalize Shelly event: {e}", exc_info=True)
return None
def _find_topic_prefix(self, device_id: str, shelly_data: Dict) -> Optional[str]:
"""
Find topic_prefix for device
Device ID can be:
- topic_prefix itself (e.g. 'shaperorigin' from status messages)
- actual device ID (e.g. '48f6eeb73a1c' from RPC events)
"""
# Check if device_id is already a known topic_prefix
if device_id in self.prefix_machine_map:
return device_id
# Otherwise, we need to infer it (currently only one device configured)
# For multiple devices, we'd need more sophisticated matching
if len(self.prefix_machine_map) == 1:
return list(self.prefix_machine_map.keys())[0]
# TODO: For multi-device setups, implement device_id to topic_prefix mapping
self.logger.warning(f"Cannot map device_id {device_id} to topic_prefix")
return None
def _determine_event_type(self, shelly_data: Dict) -> str:
"""Determine event type from Shelly data"""
msg_type = shelly_data.get('message_type', 'unknown')
if msg_type in ['status', 'event']:
return 'power_measurement'
return 'unknown'
def _normalize_timestamp(self, timestamp_str: Optional[str]) -> str:
"""
Normalize timestamp to ISO 8601 UTC format
Args:
timestamp_str: ISO timestamp string or None
Returns:
ISO 8601 UTC timestamp string
"""
if timestamp_str:
try:
# Parse and ensure UTC
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
return dt.astimezone(timezone.utc).isoformat().replace('+00:00', 'Z')
except Exception as e:
self.logger.warning(f"Failed to parse timestamp {timestamp_str}: {e}")
# Fallback: current time in UTC
return datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
def _extract_metrics(self, shelly_data: Dict) -> Dict:
"""
Extract metrics from Shelly data
Returns dict with standardized metric names
"""
metrics = {}
# Power (always present in our use case)
# Shelly uses 'apower' (active power)
if 'apower' in shelly_data:
metrics['power_w'] = shelly_data['apower']
# Voltage (only in status messages)
if 'voltage' in shelly_data and shelly_data['voltage'] is not None:
metrics['voltage_v'] = shelly_data['voltage']
# Current (in status and some events)
if 'current' in shelly_data and shelly_data['current'] is not None:
metrics['current_a'] = shelly_data['current']
# Frequency (only in status messages)
if 'frequency' in shelly_data and shelly_data['frequency'] is not None:
metrics['frequency_hz'] = shelly_data['frequency']
# Energy counters (if available)
if 'total_energy' in shelly_data and shelly_data['total_energy'] is not None:
metrics['energy_total_wh'] = shelly_data['total_energy']
return metrics

View File

@ -0,0 +1,188 @@
"""
Event Storage Module - Persists events and sessions to JSON files
For later migration to Odoo database models
"""
import json
import logging
from pathlib import Path
from typing import Dict, Optional
from datetime import datetime
class EventStorage:
"""
Stores events and sessions to JSON files
Format designed for easy migration to Odoo models:
- events.jsonl: JSON Lines format (one event per line) open_workshop.power_event
- sessions.json: JSON array of sessions open_workshop.session
"""
def __init__(self, events_file: str = "data/events.jsonl",
sessions_file: str = "data/sessions.json"):
self.logger = logging.getLogger(__name__)
self.events_file = Path(events_file)
self.sessions_file = Path(sessions_file)
# Ensure data directory exists
self.events_file.parent.mkdir(parents=True, exist_ok=True)
# Initialize sessions file if it doesn't exist
if not self.sessions_file.exists():
self._write_sessions([])
def store_event(self, event: Dict) -> bool:
"""
Append event to JSONL file (one JSON object per line)
Format for Odoo migration:
{
"event_id": "uuid",
"event_type": "power_measurement",
"timestamp": "ISO 8601",
"machine": {"machine_id": "...", "machine_name": "..."},
"device": {...},
"metrics": {"power_w": 45.7, "voltage_v": 230.2, ...}
}
"""
try:
with open(self.events_file, 'a', encoding='utf-8') as f:
# Write one JSON object per line (JSONL format)
json.dump(event, f, ensure_ascii=False)
f.write('\n')
self.logger.debug(f"Event {event.get('event_id', 'N/A')} stored")
return True
except Exception as e:
self.logger.error(f"Failed to store event: {e}")
return False
def store_session_event(self, session_event: Dict) -> bool:
"""
Store session_start or session_end event
Updates sessions.json with structured session data
Format for Odoo migration (open_workshop.session):
{
"session_id": "uuid",
"machine_id": "shaper-origin-01",
"machine_name": "Shaper Origin",
"start_time": "2026-01-22T18:36:20.993Z",
"end_time": "2026-01-22T18:38:01.993Z", // null if running
"duration_s": 101, // null if running
"start_power_w": 45.7,
"end_power_w": 0.0, // null if running
"status": "completed" // or "running"
}
"""
try:
event_type = session_event.get('event_type')
session_id = session_event.get('session_id')
if event_type == 'session_start':
return self._store_session_start(session_event)
elif event_type == 'session_end':
return self._store_session_end(session_event)
else:
self.logger.warning(f"Unknown session event type: {event_type}")
return False
except Exception as e:
self.logger.error(f"Failed to store session event: {e}")
return False
def _store_session_start(self, event: Dict) -> bool:
"""Create new session record"""
sessions = self._read_sessions()
machine = event.get('machine', {})
session_data = event.get('session_data', {})
new_session = {
'session_id': event['session_id'],
'machine_id': machine.get('machine_id'),
'machine_name': machine.get('machine_name'),
'start_time': session_data.get('start_time'),
'end_time': None,
'duration_s': None,
'start_power_w': event.get('power_w'),
'end_power_w': None,
'status': 'running'
}
sessions.append(new_session)
self._write_sessions(sessions)
self.logger.info(f"Session {event['session_id'][:8]}... started")
return True
def _store_session_end(self, event: Dict) -> bool:
"""Update existing session with end data"""
sessions = self._read_sessions()
session_id = event['session_id']
# Find and update session
for session in sessions:
if session['session_id'] == session_id:
session_data = event.get('session_data', {})
session['end_time'] = session_data.get('end_time')
session['duration_s'] = session_data.get('duration_s')
session['end_power_w'] = event.get('power_w')
session['status'] = 'completed'
self._write_sessions(sessions)
duration_min = session['duration_s'] / 60
self.logger.info(f"Session {session_id[:8]}... completed ({duration_min:.1f} min)")
return True
self.logger.error(f"Session {session_id} not found for update")
return False
def _read_sessions(self) -> list:
"""Read all sessions from JSON file"""
try:
if self.sessions_file.exists():
with open(self.sessions_file, 'r', encoding='utf-8') as f:
return json.load(f)
return []
except Exception as e:
self.logger.error(f"Failed to read sessions: {e}")
return []
def _write_sessions(self, sessions: list) -> bool:
"""Write sessions to JSON file"""
try:
with open(self.sessions_file, 'w', encoding='utf-8') as f:
json.dump(sessions, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
self.logger.error(f"Failed to write sessions: {e}")
return False
def get_active_sessions(self) -> list:
"""Get all running sessions"""
sessions = self._read_sessions()
return [s for s in sessions if s['status'] == 'running']
def get_session_by_id(self, session_id: str) -> Optional[Dict]:
"""Get session by ID"""
sessions = self._read_sessions()
for session in sessions:
if session['session_id'] == session_id:
return session
return None
def get_sessions_by_machine(self, machine_id: str, limit: int = 10) -> list:
"""Get recent sessions for a machine"""
sessions = self._read_sessions()
machine_sessions = [s for s in sessions if s['machine_id'] == machine_id]
# Sort by start_time descending
machine_sessions.sort(key=lambda x: x['start_time'], reverse=True)
return machine_sessions[:limit]

View File

@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
Open Workshop IoT Bridge - Python Prototype
Main entry point for MQTT to Odoo bridge
Phase 1: Standalone prototype (without Odoo)
"""
import sys
import logging
import yaml
import time
import signal
import json
from pathlib import Path
from typing import Dict
from mqtt_client import MQTTClient
from shelly_parser import ShellyParser
from event_normalizer import EventNormalizer
from session_detector import SessionDetector
from event_storage import EventStorage
class IoTBridge:
"""Main IoT Bridge Application"""
def __init__(self, config_path: str = 'config.yaml'):
"""
Initialize IoT Bridge
Args:
config_path: Path to config.yaml file
"""
# Load configuration
self.config = self._load_config(config_path)
# Setup logging
self._setup_logging()
self.logger = logging.getLogger(__name__)
self.logger.info("=== Open Workshop IoT Bridge Starting ===")
# Initialize Shelly Parser with device config
self.shelly_parser = ShellyParser(device_config=self.config.get('devices', []))
# Initialize Event Normalizer
self.event_normalizer = EventNormalizer(device_config=self.config.get('devices', []))
# Initialize Session Detector
self.session_detector = SessionDetector(device_config=self.config.get('devices', []))
# Initialize Event Storage
output_config = self.config.get('output', {})
self.event_storage = EventStorage(
events_file=output_config.get('events_file', 'data/events.jsonl'),
sessions_file=output_config.get('sessions_file', 'data/sessions.json')
)
# Initialize MQTT Client
self.mqtt_client = MQTTClient(
config=self.config['mqtt'],
message_callback=self.on_mqtt_message
)
self.running = False
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _load_config(self, config_path: str) -> dict:
"""Load configuration from YAML file"""
config_file = Path(config_path)
if not config_file.exists():
print(f"Error: Config file not found: {config_path}")
print(f"Please copy config.yaml.example to config.yaml and adjust settings")
sys.exit(1)
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
return config
def _setup_logging(self):
"""Setup logging configuration"""
log_config = self.config.get('logging', {})
# Create logs directory if logging to file
if log_config.get('file'):
log_file = Path(log_config['file'])
log_file.parent.mkdir(parents=True, exist_ok=True)
# Configure logging
log_level = getattr(logging, log_config.get('level', 'INFO').upper())
log_format = log_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handlers = []
# Console handler
if log_config.get('console', True):
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format))
handlers.append(console_handler)
# File handler
if log_config.get('file'):
file_handler = logging.FileHandler(log_config['file'])
file_handler.setFormatter(logging.Formatter(log_format))
handlers.append(file_handler)
logging.basicConfig(
level=log_level,
format=log_format,
handlers=handlers
)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self.logger.info(f"Received signal {signum}, shutting down...")
self.stop()
def on_mqtt_message(self, topic: str, payload):
"""
Callback for incoming MQTT messages
Args:
topic: MQTT topic
payload: Message payload (parsed JSON or raw string)
"""
# Log RAW topic for analysis
if 'shellypmminig3' in topic and 'debug' not in topic:
self.logger.info(f"RAW Topic: {topic}")
# Only process JSON payloads
if not isinstance(payload, dict):
return
# Parse Shelly message
parsed_data = self.shelly_parser.parse_message(topic, payload)
if parsed_data:
# Normalize to Event Schema v1
normalized_event = self.event_normalizer.normalize_shelly_event(parsed_data)
if normalized_event:
# Store event (for later Odoo migration)
self.event_storage.store_event(normalized_event)
# Log normalized event (condensed)
metrics = normalized_event.get('metrics', {})
power_w = metrics.get('power_w', 'N/A')
self.logger.debug(
f"Event: {normalized_event['machine']['machine_name']} - "
f"Power: {power_w}W"
)
# Process through Session Detector
session_event = self.session_detector.process_event(normalized_event)
if session_event:
# Session state change occurred!
self._log_session_event(session_event)
# Store session event (for Odoo migration)
self.event_storage.store_session_event(session_event)
def _log_session_event(self, session_event: Dict):
"""Log session start/end events"""
self.logger.info("\n" + "=" * 70)
if session_event['event_type'] == 'session_start':
self.logger.info("🚀 SESSION START")
else:
self.logger.info("🏁 SESSION END")
self.logger.info("=" * 70)
self.logger.info(f"Session ID: {session_event['session_id']}")
self.logger.info(f"Machine: {session_event['machine']['machine_name']} ({session_event['machine']['machine_id']})")
self.logger.info(f"Timestamp: {session_event['timestamp']}")
self.logger.info(f"Power: {session_event['power_w']:.1f}W")
session_data = session_event.get('session_data', {})
self.logger.info(f"Start Time: {session_data.get('start_time')}")
if 'end_time' in session_data:
self.logger.info(f"End Time: {session_data['end_time']}")
duration_s = session_data.get('duration_s', 0)
duration_min = duration_s / 60
self.logger.info(f"Duration: {duration_s}s ({duration_min:.2f} min)")
self.logger.info("=" * 70 + "\n")
def start(self):
"""Start the IoT Bridge"""
self.logger.info("Starting IoT Bridge...")
# Create data directory
data_dir = Path('data')
data_dir.mkdir(exist_ok=True)
# Connect to MQTT
if not self.mqtt_client.connect():
self.logger.error("Failed to connect to MQTT Broker")
return False
# Start MQTT loop
self.mqtt_client.start()
# Wait for connection
if not self.mqtt_client.wait_for_connection(timeout=10):
self.logger.error("MQTT connection timeout")
return False
self.logger.info("IoT Bridge started successfully")
self.logger.info("Listening for MQTT messages... (Press Ctrl+C to stop)")
self.running = True
# Main loop
try:
while self.running:
time.sleep(1)
# TODO: Hier können später periodische Tasks ausgeführt werden
# z.B. Session-Timeout-Checks
except KeyboardInterrupt:
self.logger.info("Interrupted by user")
return True
def stop(self):
"""Stop the IoT Bridge"""
self.running = False
self.logger.info("Stopping IoT Bridge...")
self.mqtt_client.stop()
self.logger.info("IoT Bridge stopped")
def main():
"""Main entry point"""
# Check if config file exists
config_file = Path('config.yaml')
if not config_file.exists():
print("\n" + "="*60)
print("Configuration file not found!")
print("="*60)
print("\nPlease create config.yaml from the example:")
print(" cp config.yaml.example config.yaml")
print("\nThen edit config.yaml with your MQTT broker settings.")
print("="*60 + "\n")
sys.exit(1)
# Start bridge
bridge = IoTBridge('config.yaml')
bridge.start()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""
MQTT Client für Open Workshop IoT Bridge
Verbindet sich mit MQTT Broker und empfängt Device Events
"""
import logging
import time
import json
import ssl
from typing import Callable, Optional
import paho.mqtt.client as mqtt
class MQTTClient:
"""MQTT Client Wrapper für Device Event Empfang"""
def __init__(self, config: dict, message_callback: Optional[Callable] = None):
"""
Initialize MQTT Client
Args:
config: MQTT configuration dict from config.yaml
message_callback: Callback function for incoming messages
"""
self.config = config
self.message_callback = message_callback
self.logger = logging.getLogger(__name__)
# Create MQTT Client
self.client = mqtt.Client(
client_id=config.get('client_id', 'ows_iot_bridge'),
protocol=mqtt.MQTTv5
)
# Set callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
# Set username/password if provided
if config.get('username') and config.get('password'):
self.client.username_pw_set(
config['username'],
config['password']
)
# Enable TLS/SSL if port is 8883
if config.get('port') == 8883:
self.client.tls_set(cert_reqs=ssl.CERT_NONE)
self.client.tls_insecure_set(True)
self.logger.info("TLS/SSL enabled for port 8883")
self.connected = False
self.topics = config.get('topics', [])
def _on_connect(self, client, userdata, flags, rc, properties=None):
"""Callback when connected to MQTT broker"""
if rc == 0:
self.connected = True
self.logger.info(f"Connected to MQTT Broker at {self.config['host']}:{self.config['port']}")
# Subscribe to all configured topics
for topic in self.topics:
self.client.subscribe(topic)
self.logger.info(f"Subscribed to topic: {topic}")
else:
self.logger.error(f"Failed to connect to MQTT Broker, return code {rc}")
self.connected = False
def _on_disconnect(self, client, userdata, rc, properties=None):
"""Callback when disconnected from MQTT broker"""
self.connected = False
if rc != 0:
self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc}). Reconnecting...")
else:
self.logger.info("Disconnected from MQTT Broker")
def _on_message(self, client, userdata, msg):
"""Callback when message received"""
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
# Log all shellypmminig3 topics (exclude debug)
if 'shellypmminig3' in topic and 'debug' not in topic:
self.logger.info(f"📡 TOPIC: {topic}")
self.logger.debug(f"Message received on topic '{topic}'")
# Try to parse as JSON
try:
payload_json = json.loads(payload)
except json.JSONDecodeError:
# Debug logs are expected to be non-JSON, only warn for non-debug topics
if 'debug' not in topic:
self.logger.warning(f"Message on '{topic}' is not valid JSON: {payload[:100]}")
payload_json = None
# Call user callback if provided
if self.message_callback:
self.message_callback(topic, payload_json or payload)
else:
# Default: just log
self.logger.info(f"Topic: {topic}")
if payload_json:
self.logger.info(f"Payload: {json.dumps(payload_json, indent=2)}")
else:
self.logger.info(f"Payload: {payload}")
except Exception as e:
self.logger.error(f"Error processing message: {e}", exc_info=True)
def connect(self):
"""Connect to MQTT broker"""
try:
self.logger.info(f"Connecting to MQTT Broker {self.config['host']}:{self.config['port']}...")
self.client.connect(
self.config['host'],
self.config['port'],
self.config.get('keepalive', 60)
)
return True
except Exception as e:
self.logger.error(f"Failed to connect to MQTT Broker: {e}")
return False
def start(self):
"""Start the MQTT client loop (non-blocking)"""
self.client.loop_start()
def stop(self):
"""Stop the MQTT client loop"""
self.client.loop_stop()
self.client.disconnect()
self.logger.info("MQTT Client stopped")
def publish(self, topic: str, payload: dict):
"""
Publish message to MQTT topic
Args:
topic: MQTT topic
payload: Message payload (will be JSON encoded)
"""
try:
payload_json = json.dumps(payload)
result = self.client.publish(topic, payload_json)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.logger.debug(f"Published to {topic}")
return True
else:
self.logger.error(f"Failed to publish to {topic}, rc={result.rc}")
return False
except Exception as e:
self.logger.error(f"Error publishing to {topic}: {e}")
return False
def wait_for_connection(self, timeout: int = 10) -> bool:
"""
Wait for connection to be established
Args:
timeout: Maximum seconds to wait
Returns:
True if connected, False if timeout
"""
start_time = time.time()
while not self.connected and (time.time() - start_time) < timeout:
time.sleep(0.1)
return self.connected

View File

@ -0,0 +1,17 @@
# MQTT Client
paho-mqtt>=2.0.0
# Configuration
pyyaml>=6.0
# Logging & Utilities
python-dateutil>=2.8.2
# For UUID generation
uuid>=1.30
# Optional: für spätere JSON Schema Validation
jsonschema>=4.17.0
# Optional: für besseres Logging
colorlog>=6.7.0

View File

@ -0,0 +1,267 @@
"""
Session Detection Engine for Open Workshop IoT Bridge
Detects machine run sessions based on power consumption thresholds
with debounce logic to avoid false starts/stops.
"""
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from enum import Enum
class SessionState(Enum):
"""Session detection states"""
IDLE = "idle" # Machine off (power < threshold)
STARTING = "starting" # Power above threshold, waiting for debounce
RUNNING = "running" # Confirmed run session active
STOPPING = "stopping" # Power below threshold, waiting for debounce
class SessionDetector:
"""
Detects machine run sessions based on power measurements
State Machine:
IDLE -> STARTING -> RUNNING -> STOPPING -> IDLE
- IDLE: Power < threshold
- STARTING: Power >= threshold for < start_debounce_s
- RUNNING: Power >= threshold for >= start_debounce_s
- STOPPING: Power < threshold for < stop_debounce_s (while in RUNNING)
- Back to IDLE: Power < threshold for >= stop_debounce_s
"""
def __init__(self, device_config: list = None):
"""
Initialize session detector
Args:
device_config: List of device configurations from config.yaml
"""
self.logger = logging.getLogger(__name__)
self.device_config = device_config or []
# Build machine_id to config mapping
self.machine_config = {}
for device in self.device_config:
machine_id = device.get('machine_id')
if machine_id:
self.machine_config[machine_id] = {
'power_threshold': device.get('power_threshold', 50),
'start_debounce_s': device.get('start_debounce_s', 3),
'stop_debounce_s': device.get('stop_debounce_s', 15),
'machine_name': device.get('machine_name', 'Unknown'),
}
# State tracking per machine
self.machine_states = {} # machine_id -> state info
def process_event(self, event: Dict) -> Optional[Dict]:
"""
Process a normalized event and detect session changes
Args:
event: Normalized event from EventNormalizer
Returns:
Session event dict if state change occurred, None otherwise
Session Event Format:
{
"session_id": "uuid4",
"event_type": "session_start" | "session_end",
"timestamp": "ISO 8601 UTC",
"machine": { "machine_id": "...", "machine_name": "..." },
"power_w": 123.4,
"session_data": {
"start_time": "ISO 8601 UTC",
"end_time": "ISO 8601 UTC", # only for session_end
"duration_s": 123 # only for session_end
}
}
"""
# Extract machine info
machine = event.get('machine', {})
machine_id = machine.get('machine_id')
if not machine_id:
self.logger.warning("Event missing machine_id, skipping")
return None
# Get machine config
config = self.machine_config.get(machine_id)
if not config:
self.logger.warning(f"No config found for machine {machine_id}")
return None
# Extract power measurement
metrics = event.get('metrics', {})
power_w = metrics.get('power_w')
if power_w is None:
self.logger.debug(f"Event missing power_w, skipping")
return None
# Initialize machine state if needed
if machine_id not in self.machine_states:
self.machine_states[machine_id] = {
'state': SessionState.IDLE,
'state_since': datetime.now(timezone.utc),
'current_session_id': None,
'session_start_time': None,
'last_power': None,
}
state_info = self.machine_states[machine_id]
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
# Update last power
state_info['last_power'] = power_w
# Process state machine
return self._process_state_machine(
machine_id=machine_id,
machine_name=machine.get('machine_name'),
power_w=power_w,
timestamp=timestamp,
config=config,
state_info=state_info
)
def _process_state_machine(
self,
machine_id: str,
machine_name: str,
power_w: float,
timestamp: datetime,
config: Dict,
state_info: Dict
) -> Optional[Dict]:
"""
Process state machine logic
Returns session event if state change occurred
"""
current_state = state_info['state']
threshold = config['power_threshold']
start_debounce = config['start_debounce_s']
stop_debounce = config['stop_debounce_s']
time_in_state = (timestamp - state_info['state_since']).total_seconds()
# State machine transitions
if current_state == SessionState.IDLE:
if power_w >= threshold:
# Transition to STARTING
self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {threshold}W → STARTING")
state_info['state'] = SessionState.STARTING
state_info['state_since'] = timestamp
elif current_state == SessionState.STARTING:
if power_w < threshold:
# False start, back to IDLE
self.logger.info(f"{machine_name}: Power dropped before debounce → IDLE")
state_info['state'] = SessionState.IDLE
state_info['state_since'] = timestamp
elif time_in_state >= start_debounce:
# Debounce passed, transition to RUNNING
session_id = str(uuid.uuid4())
state_info['state'] = SessionState.RUNNING
state_info['state_since'] = timestamp
state_info['current_session_id'] = session_id
state_info['session_start_time'] = timestamp
self.logger.info(f"🟢 {machine_name}: Session START (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W")
# Generate session_start event
return {
'session_id': session_id,
'event_type': 'session_start',
'timestamp': timestamp.isoformat().replace('+00:00', 'Z'),
'machine': {
'machine_id': machine_id,
'machine_name': machine_name
},
'power_w': power_w,
'session_data': {
'start_time': timestamp.isoformat().replace('+00:00', 'Z')
}
}
elif current_state == SessionState.RUNNING:
if power_w < threshold:
# Transition to STOPPING
self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {threshold}W → STOPPING")
state_info['state'] = SessionState.STOPPING
state_info['state_since'] = timestamp
elif current_state == SessionState.STOPPING:
if power_w >= threshold:
# Power back up, back to RUNNING
self.logger.info(f"🟢 {machine_name}: Power back up → RUNNING")
state_info['state'] = SessionState.RUNNING
state_info['state_since'] = timestamp
elif time_in_state >= stop_debounce:
# Debounce passed, session ended
session_id = state_info['current_session_id']
start_time = state_info['session_start_time']
duration_s = (timestamp - start_time).total_seconds()
self.logger.info(f"🔴 {machine_name}: Session END (debounce {time_in_state:.1f}s) - Duration: {duration_s:.1f}s")
# Generate session_end event
session_event = {
'session_id': session_id,
'event_type': 'session_end',
'timestamp': timestamp.isoformat().replace('+00:00', 'Z'),
'machine': {
'machine_id': machine_id,
'machine_name': machine_name
},
'power_w': power_w,
'session_data': {
'start_time': start_time.isoformat().replace('+00:00', 'Z'),
'end_time': timestamp.isoformat().replace('+00:00', 'Z'),
'duration_s': int(duration_s)
}
}
# Reset to IDLE
state_info['state'] = SessionState.IDLE
state_info['state_since'] = timestamp
state_info['current_session_id'] = None
state_info['session_start_time'] = None
return session_event
return None
def get_machine_state(self, machine_id: str) -> Optional[str]:
"""Get current state of a machine"""
state_info = self.machine_states.get(machine_id)
if state_info:
return state_info['state'].value
return None
def get_active_sessions(self) -> Dict:
"""
Get all currently active sessions
Returns dict: machine_id -> session info
"""
active_sessions = {}
for machine_id, state_info in self.machine_states.items():
if state_info['state'] == SessionState.RUNNING:
active_sessions[machine_id] = {
'session_id': state_info['current_session_id'],
'start_time': state_info['session_start_time'].isoformat().replace('+00:00', 'Z'),
'current_power': state_info['last_power']
}
return active_sessions

View File

@ -0,0 +1,73 @@
#!/bin/bash
# Setup script for Open Workshop IoT Bridge Python Prototype
echo "==================================="
echo "Open Workshop IoT Bridge - Setup"
echo "==================================="
echo ""
# Check if Python 3 is installed
if ! command -v python3 &> /dev/null; then
echo "Error: Python 3 is not installed"
exit 1
fi
echo "Python version: $(python3 --version)"
echo ""
# Create virtual environment
echo "Creating virtual environment..."
python3 -m venv venv
if [ $? -ne 0 ]; then
echo "Error: Failed to create virtual environment"
exit 1
fi
echo "✓ Virtual environment created"
echo ""
# Activate virtual environment
echo "Activating virtual environment..."
source venv/bin/activate
# Upgrade pip
echo "Upgrading pip..."
pip install --upgrade pip
# Install requirements
echo ""
echo "Installing dependencies from requirements.txt..."
pip install -r requirements.txt
if [ $? -ne 0 ]; then
echo "Error: Failed to install requirements"
exit 1
fi
echo ""
echo "✓ Dependencies installed"
echo ""
# Check if config.yaml exists
if [ ! -f "config.yaml" ]; then
echo "Creating config.yaml from template..."
cp config.yaml.example config.yaml
echo "✓ config.yaml created"
echo ""
echo "⚠️ Please edit config.yaml with your MQTT broker settings!"
echo ""
fi
# Create data and logs directories
mkdir -p data logs
echo "==================================="
echo "Setup complete!"
echo "==================================="
echo ""
echo "Next steps:"
echo " 1. Edit config.yaml with your MQTT broker settings"
echo " 2. Activate venv: source venv/bin/activate"
echo " 3. Run the bridge: python main.py"
echo ""

View File

@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""
Shelly PM Mini G3 Parser
Parst MQTT Messages vom Shelly PM Mini G3
"""
import logging
from typing import Dict, Optional
from datetime import datetime
class ShellyParser:
"""Parser für Shelly PM Mini G3 MQTT Messages"""
def __init__(self, device_config: list = None):
"""
Initialize parser
Args:
device_config: List of device configurations from config.yaml
"""
self.logger = logging.getLogger(__name__)
self.device_config = device_config or []
# Build topic-prefix to machine mapping
self.prefix_machine_map = {}
for device in self.device_config:
topic_prefix = device.get('topic_prefix')
if topic_prefix:
self.prefix_machine_map[topic_prefix] = {
'machine_id': device.get('machine_id'),
'machine_name': device.get('machine_name'),
'device_id': None # Will be extracted from messages
}
def parse_message(self, topic: str, payload: dict) -> Optional[Dict]:
"""
Parse Shelly MQTT message
Args:
topic: MQTT topic
payload: Message payload (already parsed as JSON)
Returns:
Parsed data dict or None if not a relevant message
"""
# Ignore debug logs
if 'debug/log' in topic:
return None
# Parse different message types
if '/status/pm1:0' in topic:
return self._parse_status_message(topic, payload)
elif '/events/rpc' in topic:
return self._parse_rpc_event(topic, payload)
elif '/telemetry' in topic:
return self._parse_telemetry(topic, payload)
return None
def _parse_status_message(self, topic: str, payload: dict) -> Dict:
"""
Parse full status message
Topic: shaperorigin/status/pm1:0
"""
# Extract device ID from topic prefix
device_id = self._extract_device_id_from_topic(topic)
data = {
'message_type': 'status',
'device_id': device_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'voltage': payload.get('voltage'),
'current': payload.get('current'),
'apower': payload.get('apower'), # Active Power in Watts
'frequency': payload.get('freq'),
'total_energy': payload.get('aenergy', {}).get('total'),
}
self.logger.debug(f"Parsed status message: apower={data['apower']}W")
return data
def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]:
"""
Parse RPC NotifyStatus event
Topic: shellypmminig3/events/rpc
"""
if payload.get('method') != 'NotifyStatus':
return None
device_id = payload.get('src', '').replace('shellypmminig3-', '')
params = payload.get('params', {})
pm_data = params.get('pm1:0', {})
# Get timestamp from params or use current time
ts = params.get('ts')
if ts:
timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z'
else:
timestamp = datetime.utcnow().isoformat() + 'Z'
data = {
'message_type': 'event',
'device_id': device_id,
'timestamp': timestamp,
'apower': pm_data.get('apower'),
'current': pm_data.get('current'),
'voltage': pm_data.get('voltage'),
}
# Only return if we have actual data
if data['apower'] is not None or data['current'] is not None:
self.logger.debug(f"Parsed RPC event: {pm_data}")
return data
return None
def _parse_telemetry(self, topic: str, payload: dict) -> Dict:
"""
Parse telemetry message
Topic: shelly/pmmini/shellypmminig3-48f6eeb73a1c/telemetry
"""
# Extract device ID from topic
parts = topic.split('/')
device_id = parts[2] if len(parts) > 2 else 'unknown'
device_id = device_id.replace('shellypmminig3-', '')
# Get timestamp
ts = payload.get('ts')
if ts:
timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z'
else:
timestamp = datetime.utcnow().isoformat() + 'Z'
data = {
'message_type': 'telemetry',
'device_id': device_id,
'timestamp': timestamp,
'voltage': payload.get('voltage_v'),
'current': payload.get('current_a'),
'apower': payload.get('power_w'), # Active Power in Watts
'frequency': payload.get('freq_hz'),
'total_energy': payload.get('energy_wh'),
}
self.logger.debug(f"Parsed telemetry: apower={data['apower']}W")
return data
def _extract_device_id_from_topic(self, topic: str) -> str:
"""Extract device ID (topic prefix) from topic string"""
# Topic format: shaperorigin/status/pm1:0
# Extract the prefix (shaperorigin)
parts = topic.split('/')
if len(parts) > 0:
topic_prefix = parts[0]
# Check if this prefix is in our config
if topic_prefix in self.prefix_machine_map:
# Use the actual device_id if we've learned it, otherwise use prefix
device_info = self.prefix_machine_map[topic_prefix]
return device_info.get('device_id') or topic_prefix
return topic_prefix
return 'unknown'
def get_power_value(self, parsed_data: Dict) -> Optional[float]:
"""
Extract power value from parsed data
Returns:
Power in Watts or None
"""
return parsed_data.get('apower')
def get_device_id(self, parsed_data: Dict) -> str:
"""Get device ID from parsed data"""
return parsed_data.get('device_id', 'unknown')