From 4c03959437aa12800f45a74607b17691fc26514f Mon Sep 17 00:00:00 2001 From: "matthias.lotz" Date: Thu, 22 Jan 2026 19:59:17 +0100 Subject: [PATCH] feat(mqtt): Implement M0-M3 - MQTT IoT Bridge with Session Detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - M0: MQTT Client with TLS/SSL support - Connection to mqtt.majufilo.eu:8883 - Topic subscription with wildcards - JSON payload parsing - M1: Shelly PM Mini G3 Integration - Status message parser (shaperorigin/status/pm1:0) - Custom MQTT topic prefix support - Power, voltage, current, frequency extraction - Device mapping via topic_prefix - M2: Event Normalization - Event Schema v1 with UUID-based event IDs - ISO 8601 UTC timestamps - Machine/Device info mapping - Metrics normalization (power_w, voltage_v, current_a, frequency_hz) - Filter None values - M3: Session Detection Engine - State machine: IDLE → STARTING → RUNNING → STOPPING → IDLE - Power-based threshold detection (configurable per machine) - Debounce logic (separate for start/stop) - Session events with duration calculation - Persistent storage (JSONL for events, JSON for sessions) - Odoo-ready data format for future migration Data Storage: - events.jsonl: JSON Lines format (one event per line) - sessions.json: Session records with start/end/duration - Ready for migration to open_workshop.session model Multi-device ready: Add devices via config.yaml with unique topic_prefix --- FEATURE_REQUEST/Odoo18_MQTT_IoT.md | 324 ----------- .../FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md | 530 ++++++++++++++++++ .../python_prototype/.gitignore | 31 + open_workshop_mqtt/python_prototype/README.md | 225 ++++++++ .../python_prototype/config.yaml.example | 58 ++ .../python_prototype/event_normalizer.py | 192 +++++++ .../python_prototype/event_storage.py | 188 +++++++ open_workshop_mqtt/python_prototype/main.py | 263 +++++++++ .../python_prototype/mqtt_client.py | 173 ++++++ .../python_prototype/requirements.txt | 17 + .../python_prototype/session_detector.py | 267 +++++++++ open_workshop_mqtt/python_prototype/setup.sh | 73 +++ .../python_prototype/shelly_parser.py | 175 ++++++ 13 files changed, 2192 insertions(+), 324 deletions(-) delete mode 100644 FEATURE_REQUEST/Odoo18_MQTT_IoT.md create mode 100644 open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md create mode 100644 open_workshop_mqtt/python_prototype/.gitignore create mode 100644 open_workshop_mqtt/python_prototype/README.md create mode 100644 open_workshop_mqtt/python_prototype/config.yaml.example create mode 100644 open_workshop_mqtt/python_prototype/event_normalizer.py create mode 100644 open_workshop_mqtt/python_prototype/event_storage.py create mode 100644 open_workshop_mqtt/python_prototype/main.py create mode 100644 open_workshop_mqtt/python_prototype/mqtt_client.py create mode 100644 open_workshop_mqtt/python_prototype/requirements.txt create mode 100644 open_workshop_mqtt/python_prototype/session_detector.py create mode 100755 open_workshop_mqtt/python_prototype/setup.sh create mode 100644 open_workshop_mqtt/python_prototype/shelly_parser.py diff --git a/FEATURE_REQUEST/Odoo18_MQTT_IoT.md b/FEATURE_REQUEST/Odoo18_MQTT_IoT.md deleted file mode 100644 index 328e7c6..0000000 --- a/FEATURE_REQUEST/Odoo18_MQTT_IoT.md +++ /dev/null @@ -1,324 +0,0 @@ -# Projektplan: MQTT-basierte IoT-Events in Odoo 18 Community (Simulation-first) - -Stand: 2026-01-10 : https://chatgpt.com/share/696e559d-1640-800f-9d53-f7a9d1e784bd - -Ziel: **Odoo-18-Community (self-hosted)** soll **Geräte-Events** (Timer/Maschinenlaufzeit, Waage, Zustandswechsel) über **MQTT** aufnehmen und in Odoo als **saubere, nachvollziehbare Sessions/Events** verarbeiten. -Wichtig: **Hardware wird zunächst vollständig simuliert** (Software-Simulatoren), damit die Odoo-Schnittstelle stabil steht, bevor echte Geräte angebunden werden. - ---- - -## 1. Ziele und Nicht-Ziele - -### 1.1 Ziele (MVP) -- Einheitliche **Device-Event-Schnittstelle** in Odoo (REST/Webhook) inkl. Authentifizierung -- **Event-Log** in Odoo (persistente Rohereignisse + Normalisierung) -- **Session-Logik** für Timer/Maschinenlaufzeit (Start/Stop/Heartbeat) -- **Simulation** von: - - Maschinenzustand (running/idle/fault) - - Timer-Events (run_start/run_stop) - - Waage (stable_weight/tare) -- Reproduzierbare Tests (Unit/Integration) und eindeutige Fehlerdiagnostik (Logging) - -### 1.2 Nicht-Ziele (für Phase 1) -- Keine Enterprise-IoT-Box, keine Enterprise-Module -- Keine echte Hardware-Treiberentwicklung in Phase 1 -- Kein POS-Frontend-Live-Widget (optional erst in späterer Phase) -- Keine Abrechnungslogik/Preisregeln (kann vorbereitet, aber nicht umgesetzt werden) - ---- - -## 2. Zielarchitektur (Simulation-first) - -### 2.1 Komponenten -1. **MQTT Broker** : Mosquitto in Docker vorhanden -2. **Device Simulator(s)** (Python/Node) - - veröffentlicht MQTT Topics wie echte Geräte -3. **Gateway/Bridge (Software)** - - abonniert MQTT Topics - - validiert/normalisiert Payload - - sendet Events via HTTPS an Odoo (Webhook) -4. **Odoo Modul** `ows_iot_bridge` - - REST Controller `/ows/iot/event` - - Modelle für Devices, Events, Sessions - - Business-Regeln für Session-Start/Stop/Hysterese (softwareseitig) -5. Optional später: **Realtime-Feed** (WebSocket) für POS/Display - -### 2.2 Datenfluss -1. Simulator publiziert MQTT → `hobbyhimmel/machines//state` -2. Bridge konsumiert MQTT, mappt auf Event-Format -3. Bridge POSTet JSON an Odoo Endpoint -4. Odoo speichert Roh-Event + erzeugt ggf. Session-Updates - ---- - -## 3. Schnittstelle zu Odoo (Kern des Projekts) - -### 3.1 Authentifizierung -- Pro Device oder pro Bridge ein **API-Token** (Bearer) -- Header: `Authorization: Bearer ` -- Token in Odoo in `ows.iot.device` oder `ir.config_parameter` hinterlegt -- Optional: IP-Allowlist / Rate-Limit (in Reverse Proxy) - -### 3.2 Endpoint (REST/Webhook) -- `POST /ows/iot/event` -- Content-Type: `application/json` -- Antwort: - - `200 OK` mit `{ "status": "ok", "event_id": "...", "session_id": "..." }` - - Fehler: `400` (Schema), `401/403` (Auth), `409` (Duplikat), `500` (Server) - -### 3.3 Idempotenz / Duplikaterkennung -- Jedes Event enthält eine eindeutige `event_uid` -- Odoo legt Unique-Constraint auf `event_uid` (pro device) -- Wiederholte Events liefern `409 Conflict` oder `200 OK (duplicate=true)` (Designentscheidung) - ---- - -## 4. Ereignis- und Topic-Standard (Versioniert) - -### 4.1 MQTT Topics (v1) -- Maschinenzustand: - `hobbyhimmel/machines//state` -- Maschinenereignisse: - `hobbyhimmel/machines//event` -- Waage: - `hobbyhimmel/scales//event` -- Geräte-Status (optional): - `hobbyhimmel/devices//status` - -### 4.2 Gemeinsames JSON Event Schema (v1) -Pflichtfelder: -- `schema_version`: `"v1"` -- `event_uid`: UUID/string -- `ts`: ISO-8601 UTC (z. B. `"2026-01-10T12:34:56Z"`) -- `source`: `"simulator" | "device" | "gateway"` -- `device_id`: string -- `entity_type`: `"machine" | "scale" | "sensor"` -- `entity_id`: string (z. B. machine_id) -- `event_type`: string (siehe unten) -- `payload`: object -- `confidence`: `"high" | "medium" | "low"` (für Sensorfusion) - -### 4.3 Event-Typen (v1) -**Maschine/Timer** -- `run_start` (Start einer Laufphase) -- `run_stop` (Ende einer Laufphase) -- `heartbeat` (optional, laufend während running) -- `state_changed` (idle/running/fault) -- `fault` (Fehler mit Code/Severity) - -**Waage** -- `stable_weight` (stabiler Messwert) -- `weight` (laufend) -- `tare` -- `zero` -- `error` - ---- - -## 5. Odoo Datenmodell (Vorschlag) - -### 5.1 `ows.iot.device` -- `name` -- `device_id` (unique) -- `token_hash` (oder Token in separater Tabelle) -- `device_type` (machine/scale/...) -- `active` -- `last_seen` -- `notes` - -### 5.2 `ows.iot.event` -- `event_uid` (unique) -- `device_id` (m2o -> device) -- `entity_type`, `entity_id` -- `event_type` -- `timestamp` -- `payload_json` (Text/JSON) -- `confidence` -- `processing_state` (new/processed/error) -- `session_id` (m2o optional) - -### 5.3 `ows.machine.session` (Timer-Sessions) -- `machine_id` (Char oder m2o auf bestehendes Maschinenmodell) -- `start_ts`, `stop_ts` -- `duration_s` (computed) -- `state` (running/stopped/aborted) -- `origin` (sensor/manual/sim) -- `confidence_summary` -- `event_ids` (o2m) - -> Hinweis: Wenn du bereits `ows.machine` aus deinem open_workshop nutzt, referenziert `machine_id` direkt dieses Modell. - ---- - -## 6. Verarbeitungslogik (Phase 1: minimal, robust) - -### 6.1 Session-Automat (State Machine) -- Eingang: Events `run_start`, `run_stop`, optional `heartbeat` -- Regeln: - - `run_start` erstellt neue Session, wenn keine läuft - - `run_start` während laufender Session → nur Log, keine neue Session - - `run_stop` schließt laufende Session - - Timeout-Regel: wenn `heartbeat` ausbleibt (z. B. 10 min) → Session `aborted` - -### 6.2 Hysterese (Simulation als Stellvertreter für Sensorik) -In Simulation definierst du: -- Start, wenn „running“ mindestens **2s stabil** -- Stop, wenn „idle“ mindestens **15s stabil** -Diese Parameter als Odoo Systemparameter, z. B.: -- `ows_iot.start_debounce_s` -- `ows_iot.stop_debounce_s` - ---- - -## 7. Simulation (Software statt Hardware) - -### 7.1 Device Simulator: Maschine -- Konfigurierbar: - - Muster: random, fixed schedule, manuell per CLI - - Zustände: idle/running/fault - - Optional: „power_w“ und „vibration“ als Felder im Payload -- Publiziert MQTT in realistischen Intervallen - -### 7.2 Device Simulator: Waage -- Modi: - - stream weight (mehrfach pro Sekunde) - - stable_weight nur auf „stabil“ - - tare/zero Events per CLI - -### 7.3 Bridge Simulator (MQTT → Odoo) -- Abonniert alle relevanten Topics -- Validiert Schema v1 -- POSTet Events an Odoo -- Retry-Queue (lokal) bei Odoo-Ausfall -- Metriken/Logs: - - gesendete Events, Fehlerquoten, Latenz - ---- - -## 8. Milestones & Deliverables - -### M0 – Repo/Grundgerüst (0.5–1 Tag) -**Deliverables** -- Git-Repo Struktur -- Docker Compose: mosquitto + simulator + bridge -- Odoo Modul Skeleton `ows_iot_bridge` - -### M1 – Odoo Endpoint & Modelle (1–2 Tage) -**Deliverables** -- `/ows/iot/event` Controller inkl. Token-Auth -- Modelle `ows.iot.device`, `ows.iot.event` -- Admin UI: Geräte anlegen, Token setzen, letzte Events anzeigen - -### M2 – Session-Engine (1–2 Tage) -**Deliverables** -- Modell `ows.machine.session` -- Event → Session Zuordnung -- Timeout/Abbruch-Logik -- Parameter für Debounce/Timeout - -### M3 – Simulatoren & End-to-End Test (1–2 Tage) -**Deliverables** -- Machine Simulator + Scale Simulator -- Bridge mit Retry-Queue -- End-to-End: Simulator → MQTT → Bridge → Odoo → Session entsteht - -### M4 – Qualität & Betrieb (1 Tag) -**Deliverables** -- Logging-Konzept (Odoo + Bridge) -- Tests (Unit: Controller/Auth, Integration: Session Engine) -- Doku: Event-Schema v1, Topics, Beispielpayloads - -### Optional M5 – POS/Anzeige (später) -- Realtime Anzeige im POS oder auf Display -- Live-Weight in POS - ---- - -## 9. Testplan (Simulation-first) - -### 9.1 Unit Tests (Odoo) -- Auth: gültiger Token → 200 -- ungültig/fehlend → 401/403 -- Schema-Validation → 400 -- Idempotenz: duplicate `event_uid` → 409 oder duplicate-flag - -### 9.2 Integration Tests -- Sequenz: start → heartbeat → stop → Session duration plausibel -- stop ohne start → kein Crash, Event loggt Fehlerzustand -- Timeout: start → keine heartbeat → Session aborted - -### 9.3 Last-/Stabilitätstest (Simulator) -- 20 Maschinen, je 1 Event/s, 1h Lauf -- Ziel: Odoo bleibt stabil, Event-Insert performant, Queue läuft nicht über - ---- - -## 10. Betriebs- und Sicherheitskonzept - -- Token-Rotation möglich (neues Token, altes deaktivieren) -- Reverse Proxy: - - HTTPS - - Rate limiting auf `/ows/iot/event` - - optional Basic WAF Regeln -- Payload-Größenlimit (z. B. 16–64 KB) -- Bridge persistiert Retry-Queue auf Disk - ---- - -## 11. Offene Entscheidungen (später, nicht blocker für MVP) - -- Event-Consumer in Odoo vs. Bridge-only Webhook (empfohlen: Webhook) -- Genaues Mapping zu bestehenden open_workshop Modellen (`ows.machine`, Nutzer/RFID) -- Abrechnung: Preisregeln, Rundungen, POS-Integration -- Realtime: Odoo Bus vs. eigener WebSocket Service - ---- - -## 12. Nächste Schritte (konkret) - -1. `ows_iot_bridge` Modul: Endpoint + Device/Event Modelle implementieren -2. Docker Compose: mosquitto + bridge + simulator bereitstellen -3. Simulatoren produzieren v1-Events; Bridge sendet an Odoo -4. Session-Engine anschließen und End-to-End testen - ---- - -## Anhang A: Beispiel-Event (Maschine run_start) -```json -{ - "schema_version": "v1", - "event_uid": "c2a7d6f1-7d8d-4a63-9a7f-4e6d7b0d9e2a", - "ts": "2026-01-10T12:34:56Z", - "source": "simulator", - "device_id": "esp32-fraser-01", - "entity_type": "machine", - "entity_id": "formatkreissaege", - "event_type": "run_start", - "confidence": "high", - "payload": { - "power_w": 820, - "vibration": 0.73, - "reason": "power_threshold" - } -} -``` - -## Anhang B: Beispiel-Event (Waage stable_weight) -```json -{ - "schema_version": "v1", - "event_uid": "b6d0a2c5-9b1f-4a0b-8b19-7f2e1b8f3d11", - "ts": "2026-01-10T12:40:12Z", - "source": "simulator", - "device_id": "scale-sim-01", - "entity_type": "scale", - "entity_id": "waage-01", - "event_type": "stable_weight", - "confidence": "high", - "payload": { - "weight_g": 1532.4, - "unit": "g", - "stable_ms": 1200 - } -} -``` diff --git a/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md b/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md new file mode 100644 index 0000000..a30ac91 --- /dev/null +++ b/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md @@ -0,0 +1,530 @@ +# Projektplan: MQTT-basierte IoT-Events in Odoo 18 Community (Hardware-First mit Shelly PM Mini G3) + +Stand: 2026-01-22 (aktualisiert) +Original: 2026-01-10 : https://chatgpt.com/share/696e559d-1640-800f-9d53-f7a9d1e784bd + +Ziel: **Odoo-18-Community (self-hosted)** soll **Geräte-Events** (Timer/Maschinenlaufzeit, Waage, Zustandswechsel) über **MQTT** aufnehmen und in Odoo als **saubere, nachvollziehbare Sessions/Events** verarbeiten. +**Änderung der Vorgehensweise**: Entwicklung startet mit **echtem Shelly PM Mini G3** Hardware-Device, das bereits im MQTT Broker integriert ist. Python-Prototyp wird so entwickelt, dass Code später direkt in Odoo-Bridge übernommen werden kann. + +--- + +## 1. Ziele und Nicht-Ziele + +### 1.1 Ziele (MVP) +- **Hardware-Integration**: Shelly PM Mini G3 als primäres Test-Device +- **Session-Logik** für Maschinenlaufzeit basierend auf Power-Schwellenwerten (Start/Stop mit Hysterese) +- **Python-Prototyp** (Standalone) der später in Odoo-Bridge übernommen wird: + - MQTT Client für Shelly-Integration + - Event-Normalisierung (Shelly-Format → Unified Event Schema) + - Session-Detection Engine + - Konfigurierbare Device-Mappings und Schwellenwerte +- **Odoo-Integration** (Phase 2): + - Einheitliche **Device-Event-Schnittstelle** (REST/Webhook) inkl. Authentifizierung + - **Event-Log** in Odoo (persistente Rohereignisse + Normalisierung) + - Session-Verwaltung mit Maschinenzuordnung +- Reproduzierbare Tests und eindeutige Fehlerdiagnostik (Logging) + +### 1.2 Nicht-Ziele (für Phase 1) +- Keine Enterprise-IoT-Box, keine Enterprise-Module +- Keine Simulatoren in Phase 1 (nur echte Hardware) +- Kein POS-Frontend-Live-Widget (optional erst in späterer Phase) +- Keine Abrechnungslogik/Preisregeln (kann vorbereitet, aber nicht umgesetzt werden) + +--- + +## 2. Zielarchitektur (Simulation-first) + +### 2.1 Komponenten +1. **MQTT Broker** : Mosquitto in Docker vorhanden +2. **Device Simulator(s)** (Python/Node) + - veröffentlicht MQTT Topics wie echte Geräte +3. **Gateway/Bridge (Software)** + - abonniert MQTT Topics + - validiert/normalisiert Payload + - sendet Events via HTTPS an Odoo (Webhook) +4. **Odoo Modul** `ows_iot_bridge` + - REST Controller `/ows/iot/event` + - Modelle für Devices, Events, Sessions + - Business-Regeln für Session-Start/Stop/Hysterese (softwareseitig) +5. Optional später: **Realtime-Feed** (WebSocket) für POS/Display + +### 2.2 Datenfluss +1. Simulator publiziert MQTT → `hobbyhimmel/machines//state` +2. Bridge konsumiert MQTT, mappt auf Event-Format +3. Bridge POSTet JSON an Odoo Endpoint +4. Odoo speichert Roh-Event + erzeugt ggf. Session-Updates + +--- + +## 3. Schnittstelle zu Odoo (Kern des Projekts) + +### 3.1 Authentifizierung +- Pro Device oder pro Bridge ein **API-Token** (Bearer) +- Header: `Authorization: Bearer ` +- Token in Odoo in `ows.iot.device` oder `ir.config_parameter` hinterlegt +- Optional: IP-Allowlist / Rate-Limit (in Reverse Proxy) + +### 3.2 Endpoint (REST/Webhook) +- `POST /ows/iot/event` +- Content-Type: `application/json` +- Antwort: + - `200 OK` mit `{ "status": "ok", "event_id": "...", "session_id": "..." }` + - Fehler: `400` (Schema), `401/403` (Auth), `409` (Duplikat), `500` (Server) + +### 3.3 Idempotenz / Duplikaterkennung +- Jedes Event enthält eine eindeutige `event_uid` +- Odoo legt Unique-Constraint auf `event_uid` (pro device) +- Wiederholte Events liefern `409 Conflict` oder `200 OK (duplicate=true)` (Designentscheidung) + +--- + +## 4. Ereignis- und Topic-Standard (Versioniert) + +### 4.1 MQTT Topics (v1) +- Maschinenzustand: + `hobbyhimmel/machines//state` +- Maschinenereignisse: + `hobbyhimmel/machines//event` +- Waage: + `hobbyhimmel/scales//event` +- Geräte-Status (optional): + `hobbyhimmel/devices//status` + +### 4.2 Gemeinsames JSON Event Schema (v1) +Pflichtfelder: +- `schema_version`: `"v1"` +- `event_uid`: UUID/string +- `ts`: ISO-8601 UTC (z. B. `"2026-01-10T12:34:56Z"`) +- `source`: `"simulator" | "device" | "gateway"` +- `device_id`: string +- `entity_type`: `"machine" | "scale" | "sensor"` +- `entity_id`: string (z. B. machine_id) +- `event_type`: string (siehe unten) +- `payload`: object +- `confidence`: `"high" | "medium" | "low"` (für Sensorfusion) + +### 4.3 Event-Typen (v1) +**Maschine/Timer** +- `run_start` (Start einer Laufphase) +- `run_stop` (Ende einer Laufphase) +- `heartbeat` (optional, laufend während running) +- `state_changed` (idle/running/fault) +- `fault` (Fehler mit Code/Severity) + +**Waage** +- `stable_weight` (stabiler Messwert) +- `weight` (laufend) +- `tare` +- `zero` +- `error` + +--- + +## 5. Odoo Datenmodell (Vorschlag) + +### 5.1 `ows.iot.device` +- `name` +- `device_id` (unique) +- `token_hash` (oder Token in separater Tabelle) +- `device_type` (machine/scale/...) +- `active` +- `last_seen` +- `notes` + +### 5.2 `ows.iot.event` +- `event_uid` (unique) +- `device_id` (m2o -> device) +- `entity_type`, `entity_id` +- `event_type` +- `timestamp` +- `payload_json` (Text/JSON) +- `confidence` +- `processing_state` (new/processed/error) +- `session_id` (m2o optional) + +### 5.3 `ows.machine.session` (Timer-Sessions) +- `machine_id` (Char oder m2o auf bestehendes Maschinenmodell) +- `start_ts`, `stop_ts` +- `duration_s` (computed) +- `state` (running/stopped/aborted) +- `origin` (sensor/manual/sim) +- `confidence_summary` +- `event_ids` (o2m) + +> Hinweis: Wenn du bereits `ows.machine` aus deinem open_workshop nutzt, referenziert `machine_id` direkt dieses Modell. + +--- + +## 6. Verarbeitungslogik (Phase 1: minimal, robust) + +### 6.1 Session-Automat (State Machine) +- Eingang: Events `run_start`, `run_stop`, optional `heartbeat` +- Regeln: + - `run_start` erstellt neue Session, wenn keine läuft + - `run_start` während laufender Session → nur Log, keine neue Session + - `run_stop` schließt laufende Session + - Timeout-Regel: wenn `heartbeat` ausbleibt (z. B. 10 min) → Session `aborted` + +### 6.2 Hysterese (Simulation als Stellvertreter für Sensorik) +In Simulation definierst du: +- Start, wenn „running“ mindestens **2s stabil** +- Stop, wenn „idle“ mindestens **15s stabil** +Diese Parameter als Odoo Systemparameter, z. B.: +- `ows_iot.start_debounce_s` +- `ows_iot.stop_debounce_s` + +--- + +## 7. Simulation (Software statt Hardware) + +### 7.1 Device Simulator: Maschine +- Konfigurierbar: + - Muster: random, fixed schedule, manuell per CLI + - Zustände: idle/running/fault + - Optional: „power_w“ und „vibration“ als Felder im Payload +- Publiziert MQTT in realistischen Intervallen + +### 7.2 Device Simulator: Waage +- Modi: + - stream weight (mehrfach pro Sekunde) + - stable_weight nur auf „stabil“ + - tare/zero Events per CLI + +### 7.3 Bridge Simulator (MQTT → Odoo) +- Abonniert alle relevanten Topics +- Validiert Schema v1 +- POSTet Events an Odoo +- Retry-Queue (lokal) bei Odoo-Ausfall +- Metriken/Logs: + - gesendete Events, Fehlerquoten, Latenz + +--- + +## 8. Milestones & Deliverables (NEU: Hardware-First Approach) + +### Phase 1: Python-Prototyp (Standalone, ohne Odoo) + +#### M0 – Projekt Setup & MQTT Verbindung (0.5 Tag) +**Deliverables** +- Python Projekt Struktur +- Requirements.txt (paho-mqtt, pyyaml, etc.) +- Config-Datei (YAML): MQTT Broker Settings +- MQTT Client Basis-Verbindung testen + +**Test**: Verbindung zum MQTT Broker herstellen, Topics anzeigen + +--- + +#### M1 – Shelly PM Mini G3 Integration (1 Tag) +**Deliverables** +- MQTT Subscriber für Shelly-Topics +- Parser für Shelly JSON-Payloads (beide Formate): + - Status-Updates (full data mit voltage, current, apower) + - NotifyStatus Events (einzelne Werte) +- Datenextraktion: `apower`, `timestamp`, `device_id` +- Logging aller empfangenen Shelly-Messages + +**Test**: Shelly-Daten empfangen und parsen, Console-Output + +--- + +#### M2 – Event-Normalisierung & Unified Schema (1 Tag) +**Deliverables** +- Unified Event Schema v1 Implementation +- Shelly-to-Unified Konverter: + - `apower` Updates → `power_measurement` Events + - Enrichment mit `device_id`, `machine_id`, `timestamp` +- Event-UID Generierung +- JSON-Export der normalisierten Events + +**Test**: Shelly-Daten werden zu Schema-v1-konformen Events konvertiert + +--- + +#### M3 – Session Detection Engine (1-2 Tage) +**Deliverables** +- State Machine für run_start/run_stop Detection +- Konfigurierbare Parameter pro Device: + - `power_threshold` (z.B. 50W) + - `start_debounce_s` (z.B. 3s) + - `stop_debounce_s` (z.B. 15s) +- Session-Objekte: + - `session_id`, `machine_id`, `start_ts`, `stop_ts`, `duration_s` + - `events` (Liste der zugehörigen Events) +- Session-Export (JSON/CSV) + +**Test**: +- Maschine anschalten → `run_start` Event +- Maschine ausschalten → `run_stop` Event +- Sessions werden korrekt erkannt und gespeichert + +--- + +#### M4 – Multi-Device Support & Config (0.5-1 Tag) +**Deliverables** +- Device-Registry in Config: + ```yaml + devices: + - shelly_id: "shellypmminig3-48f6eeb73a1c" + machine_name: "Shaper Origin" + power_threshold: 50 + start_debounce_s: 3 + stop_debounce_s: 15 + ``` +- Dynamisches Laden mehrerer Devices +- Parallele Session-Tracking pro Device + +**Test**: Mehrere Shelly-Devices in Config, jedes wird separat getrackt + +--- + +#### M5 – Monitoring & Robustheit (0.5 Tag) +**Deliverables** +- Reconnect-Logik bei MQTT-Disconnect +- Error-Handling bei ungültigen Payloads +- Statistiken: empfangene Events, aktive Sessions, Fehler +- Strukturiertes Logging (Logger-Konfiguration) + +**Test**: MQTT Broker Neustart → Client reconnected automatisch + +--- + +### Phase 2: Odoo-Integration + +#### M6 – Odoo Modul Grundgerüst (0.5 Tag) +**Deliverables** +- Odoo Modul `ows_iot_bridge` +- Modelle: `ows.iot.device`, `ows.iot.event`, `ows.machine.session` +- Admin UI: Device-Liste, Event-Log, Session-Übersicht + +--- + +#### M7 – REST Endpoint & Authentication (1 Tag) +**Deliverables** +- Controller `/ows/iot/event` (POST) +- Token-basierte Authentifizierung +- Event-Validation gegen Schema v1 +- Event-Speicherung in `ows.iot.event` + +**Test**: Python-Script POSTet Events an Odoo, werden gespeichert + +--- + +#### M8 – Python-Bridge Integration (1 Tag) +**Deliverables** +- Python-Prototyp wird zur Odoo-Bridge: + - Events werden an Odoo Endpoint gesendet (statt nur File-Export) + - Retry-Queue bei Odoo-Ausfall +- Bridge läuft als separater Service (Docker optional) + +**Test**: End-to-End: Shelly → MQTT → Bridge → Odoo → Event sichtbar in Odoo + +--- + +#### M9 – Session-Engine in Odoo (1 Tag) +**Deliverables** +- Session-Logik aus Python-Prototyp nach Odoo portieren +- Events → Session-Zuordnung +- Session-Updates bei run_start/run_stop +- Admin UI: Sessions anzeigen, Filter nach Maschine/Zeitraum + +**Test**: Sessions werden in Odoo korrekt erstellt und geschlossen + +--- + +#### M10 – Tests & Dokumentation (1 Tag) +**Deliverables** +- Unit Tests: Event-Parsing, Session-Detection +- Integration Tests: MQTT → Odoo End-to-End +- Dokumentation: Setup-Anleitung, Config-Beispiele, API-Docs + +--- + +### Optional M11 – POS/Anzeige (später) +- Realtime Anzeige im POS oder auf Display +- Live Power-Consumption in Odoo + +--- + +## 9. Testplan (Simulation-first) + +### 9.1 Unit Tests (Odoo) +- Auth: gültiger Token → 200 +- ungültig/fehlend → 401/403 +- Schema-Validation → 400 +- Idempotenz: duplicate `event_uid` → 409 oder duplicate-flag + +### 9.2 Integration Tests +- Sequenz: start → heartbeat → stop → Session duration plausibel +- stop ohne start → kein Crash, Event loggt Fehlerzustand +- Timeout: start → keine heartbeat → Session aborted + +### 9.3 Last-/Stabilitätstest (Simulator) +- 20 Maschinen, je 1 Event/s, 1h Lauf +- Ziel: Odoo bleibt stabil, Event-Insert performant, Queue läuft nicht über + +--- + +## 10. Betriebs- und Sicherheitskonzept + +- Token-Rotation möglich (neues Token, altes deaktivieren) +- Reverse Proxy: + - HTTPS + - Rate limiting auf `/ows/iot/event` + - optional Basic WAF Regeln +- Payload-Größenlimit (z. B. 16–64 KB) +- Bridge persistiert Retry-Queue auf Disk + +--- + +## 11. Offene Entscheidungen (später, nicht blocker für MVP) + +- Event-Consumer in Odoo vs. Bridge-only Webhook (empfohlen: Webhook) +- Genaues Mapping zu bestehenden open_workshop Modellen (`ows.machine`, Nutzer/RFID) +- Abrechnung: Preisregeln, Rundungen, POS-Integration +- Realtime: Odoo Bus vs. eigener WebSocket Service + +--- + +## 12. Nächste Schritte (AKTUALISIERT: Hardware-First) + +### Sofort starten (Phase 1 - Python Prototyp): +1. **M0**: Python-Projekt aufsetzen, MQTT-Verbindung testen +2. **M1**: Shelly PM Mini G3 Daten empfangen und parsen +3. **M2**: Event-Normalisierung implementieren (Shelly → Schema v1) +4. **M3**: Session Detection Engine (run_start/run_stop basierend auf Power) +5. **M4**: Multi-Device Config-Support + +### Danach (Phase 2 - Odoo Integration): +6. **M6-M9**: Odoo Module + Bridge + Session-Engine +7. **M10**: Tests & Dokumentation + +### Offene Informationen benötigt: +- [ ] Genaue MQTT Topic-Namen des Shelly PM Mini G3 +- [ ] Typische Power-Werte des Shaper Origin (Idle vs. Running) +- [ ] Gewünschte Schwellenwerte für Start/Stop-Detection +- [ ] MQTT Broker Zugangsdaten (Host, Port, User, Password) + +--- + +## Anhang C: Shelly PM Mini G3 Payload-Beispiele + +### Format 1: Full Status Update +```json +{ + "id": 0, + "voltage": 234.7, + "current": 0.289, + "apower": 59.7, + "freq": 49.9, + "aenergy": { + "total": 256.325, + "by_minute": [415.101, 830.202, 830.202], + "minute_ts": 1769100576 + }, + "ret_aenergy": { + "total": 0.000, + "by_minute": [0.000, 0.000, 0.000], + "minute_ts": 1769100576 + } +} +``` + +### Format 2: NotifyStatus Event +```json +{ + "src": "shellypmminig3-48f6eeb73a1c", + "dst": "shellypmminig3/events", + "method": "NotifyStatus", + "params": { + "ts": 1769100615.87, + "pm1:0": { + "id": 0, + "current": 0.185 + } + } +} +``` + +### Mapping zu Unified Event Schema +```json +{ + "schema_version": "v1", + "event_uid": "generated-uuid", + "ts": "2026-01-22T10:30:15Z", + "source": "shelly_pm_mini_g3", + "device_id": "shellypmminig3-48f6eeb73a1c", + "entity_type": "machine", + "entity_id": "shaper-origin-01", + "event_type": "power_measurement", + "confidence": "high", + "payload": { + "apower": 59.7, + "voltage": 234.7, + "current": 0.289, + "frequency": 49.9, + "total_energy_kwh": 0.256325 + } +} +``` + +### Session Detection Beispiel +```yaml +# Device Config +device_id: shellypmminig3-48f6eeb73a1c +machine_name: Shaper Origin +power_threshold: 50 # Watt +start_debounce_s: 3 # Power > threshold für 3s → run_start +stop_debounce_s: 15 # Power < threshold für 15s → run_stop +``` + +**Session Flow**: +1. `apower: 5W` (idle) → keine Session +2. `apower: 120W` → Power > 50W → Debounce startet +3. Nach 3s immer noch > 50W → `run_start` Event, Session öffnet +4. `apower: 95W, 110W, 88W` → Session läuft weiter +5. `apower: 12W` → Power < 50W → Stop-Debounce startet +6. Nach 15s immer noch < 50W → `run_stop` Event, Session schließt +7. Session: `duration = stop_ts - start_ts` + +--- + +## Anhang A: Beispiel-Event (Maschine run_start) +```json +{ + "schema_version": "v1", + "event_uid": "c2a7d6f1-7d8d-4a63-9a7f-4e6d7b0d9e2a", + "ts": "2026-01-10T12:34:56Z", + "source": "simulator", + "device_id": "esp32-fraser-01", + "entity_type": "machine", + "entity_id": "formatkreissaege", + "event_type": "run_start", + "confidence": "high", + "payload": { + "power_w": 820, + "vibration": 0.73, + "reason": "power_threshold" + } +} +``` + +## Anhang B: Beispiel-Event (Waage stable_weight) +```json +{ + "schema_version": "v1", + "event_uid": "b6d0a2c5-9b1f-4a0b-8b19-7f2e1b8f3d11", + "ts": "2026-01-10T12:40:12Z", + "source": "simulator", + "device_id": "scale-sim-01", + "entity_type": "scale", + "entity_id": "waage-01", + "event_type": "stable_weight", + "confidence": "high", + "payload": { + "weight_g": 1532.4, + "unit": "g", + "stable_ms": 1200 + } +} +``` diff --git a/open_workshop_mqtt/python_prototype/.gitignore b/open_workshop_mqtt/python_prototype/.gitignore new file mode 100644 index 0000000..6e8b16f --- /dev/null +++ b/open_workshop_mqtt/python_prototype/.gitignore @@ -0,0 +1,31 @@ +# Config with secrets +config.yaml + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +env/ +venv/ +ENV/ +build/ +dist/ +*.egg-info/ + +# Data & Logs +data/ +logs/ +*.log + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/open_workshop_mqtt/python_prototype/README.md b/open_workshop_mqtt/python_prototype/README.md new file mode 100644 index 0000000..ad80873 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/README.md @@ -0,0 +1,225 @@ +# Open Workshop IoT Bridge - Python Prototype + +MQTT-basierte IoT-Integration für Odoo 18 Community Edition + +## Projektstatus + +**Stand: 2026-01-22** + +**Phase 1: Standalone Python Prototype** (aktuell) +- ✅ M0: Projekt Setup & MQTT Connection (abgeschlossen) + - Virtual Environment erstellt + - MQTT Client mit TLS/SSL Support + - Verbindung zu mqtt.majufilo.eu:8883 erfolgreich +- ✅ M1: Shelly PM Mini G3 Integration (abgeschlossen) + - Parser für Status Messages (shaperorigin/status/pm1:0) + - Datenextraktion: apower, voltage, current, frequency, total_energy + - Custom MQTT Topic Prefix Support (shaperorigin) + - Live-Monitoring funktioniert +- ✅ M2: Event-Normalisierung (abgeschlossen) + - Event Schema v1 implementiert + - UUID-basierte Event IDs + - ISO 8601 UTC Timestamps + - Machine/Device Mapping + - Metrics Normalisierung (power_w, voltage_v, current_a, frequency_hz) +- ✅ M3: Session Detection Engine (abgeschlossen) + - State Machine: IDLE → STARTING → RUNNING → STOPPING → IDLE + - Power-basierte Schwellenwerte (konfigurierbar pro Maschine) + - Debounce-Logik (Start/Stop getrennt konfigurierbar) + - Session Events: session_start, session_end mit Duration + - Persistente Speicherung (JSON) +- ⏳ M4: Multi-Device Support (vorbereitet, Config-ready) +- ⏳ M5: Monitoring & Robustheit + +**Phase 2: Odoo Integration** (geplant) +- M6-M10: Odoo Module + Bridge + Tests + +## Beschreibung + +Dieser Python-Prototyp dient als Grundlage für die spätere Odoo-Integration. Er: +- Empfängt MQTT Events von IoT-Geräten (z.B. Shelly PM Mini G3) +- Normalisiert die Daten in ein einheitliches Event-Schema (Event Schema v1) +- Erkennt Maschinenlaufzeit-Sessions basierend auf Power-Schwellenwerten +- Speichert Events und Sessions persistent (JSONL/JSON) +- Unterstützt mehrere Geräte parallel (topic_prefix basiert) + +Der Code ist so strukturiert, dass er später direkt in eine Odoo-Bridge übernommen werden kann. +Die JSON-Speicherung ist für Migration zu Odoo-Models vorbereitet: +- `data/events.jsonl` → `open_workshop.power_event` +- `data/sessions.json` → `open_workshop.session` + +## Installation + +### Voraussetzungen +- Python 3.8+ +- MQTT Broker (z.B. Mosquitto) +- pip + +### Setup + +1. **Dependencies installieren** +```bash +pip install -r requirements.txt +``` + +2. **Konfiguration erstellen** +```bash +cp config.yaml.example config.yaml +``` + +3. **config.yaml anpassen** +Bearbeite `config.yaml` und setze: +- MQTT Broker Host/Port/Credentials +- Device-Konfiguration (Shelly IDs, Schwellenwerte) +- MQTT Topics + +## Verwendung + +### Starten +```bash +cd /pfad/zu/python_prototype +source venv/bin/activate +python main.py +``` + +Dieser Befehl: +- Verbindet sich mit dem MQTT Broker (TLS/SSL) +- Abonniert die konfigurierten Topics +- Empfängt Shelly PM Mini G3 Status Messages +- Normalisiert Events (Event Schema v1) +- Detektiert Session Start/End Events +- Speichert in `data/events.jsonl` und `data/sessions.json` + +### Erwartete Ausgabe +``` +2026-01-22 19:36:17 - __main__ - INFO - === Open Workshop IoT Bridge Starting === +2026-01-22 19:36:17 - mqtt_client - INFO - TLS/SSL enabled for port 8883 +2026-01-22 19:36:17 - mqtt_client - INFO - Connected to MQTT Broker at mqtt.majufilo.eu:8883 +2026-01-22 19:36:17 - mqtt_client - INFO - Subscribed to topic: shaperorigin/# +2026-01-22 19:36:17 - __main__ - INFO - IoT Bridge started successfully +2026-01-22 19:36:17 - __main__ - INFO - Listening for MQTT messages... (Press Ctrl+C to stop) +2026-01-22 19:36:17 - session_detector - INFO - 🟡 Shaper Origin: Power 43.3W >= 30W → STARTING +2026-01-22 19:36:20 - session_detector - INFO - 🟢 Shaper Origin: Session START (debounce 3.0s) +2026-01-22 19:36:20 - __main__ - INFO - 🚀 SESSION START +2026-01-22 19:37:03 - session_detector - INFO - 🟠 Shaper Origin: Power 0.0W < 30W → STOPPING +2026-01-22 19:37:18 - session_detector - INFO - 🔴 Shaper Origin: Session END (debounce 15.0s) +2026-01-22 19:37:18 - __main__ - INFO - 🏁 SESSION END - Duration: 58s (0.97 min) +``` + +## Konfiguration + +### MQTT Broker +```yaml +mqtt: + host: "localhost" + port: 1883 + username: "" + password: "" + topics: + - "shellies/+/status" +``` + +### Devices +```yaml +devices: + - topic_prefix: "shaperorigin" # Custom MQTT Prefix (im Shelly konfiguriert) + machine_name: "Shaper Origin" + machine_id: "shaper-origin-01" + device_type: "shelly_pm_mini_g3" + power_threshold: 30 # Watt (Schwellenwert für Session-Erkennung) + start_debounce_s: 3 # Verzögerung bis Session Start + stop_debounce_s: 15 # Verzögerung bis Session End + enabled: true +``` + +**Multi-Device Support:** Einfach weitere Geräte hinzufügen mit unterschiedlichen `topic_prefix`. +Jedes Gerät benötigt im Shelly einen eigenen Custom MQTT Prefix. + +## Projektstruktur + +``` +python_prototype/ +├── main.py # Entry point & Orchestration +├── mqtt_client.py # MQTT Client wrapper (TLS/SSL) +├── shelly_parser.py # Shelly PM Mini G3 Message Parser +├── event_normalizer.py # Event Schema v1 Normalizer +├── session_detector.py # Session Detection State Machine +├── event_storage.py # Persistent Storage (JSONL/JSON) +├── config.yaml # Configuration (nicht im Git) +├── config.yaml.example # Config template +├── requirements.txt # Python dependencies +├── README.md # Diese Datei +├── data/ # Output directory +│ ├── events.jsonl # Events (JSON Lines) +│ └── sessions.json # Sessions (JSON Array) +└── logs/ # Log files + └── ows_iot_bridge.log +``` + +## Nächste Schritte + +### M4: Multi-Device Support (vorbereitet) +- Zweites Shelly-Device konfigurieren +- Parallele Überwachung mehrerer Maschinen testen + +### M5: Monitoring & Robustheit +- MQTT Reconnect-Logik +- Error Handling & Recovery +- Systemd Service Setup +- Health Monitoring + +### M6+: Odoo Integration +- Odoo Models: `open_workshop.machine`, `open_workshop.session` +- Migration: JSON → Odoo Database +- Views & Dashboards +- Live-Monitoring in Odoo + +## Gespeicherte Daten + +### Events (data/events.jsonl) +JSON Lines Format - ein Event pro Zeile: +```json +{"event_id":"uuid","event_type":"power_measurement","timestamp":"2026-01-22T18:45:09.985Z","machine":{"machine_id":"shaper-origin-01","machine_name":"Shaper Origin"},"metrics":{"power_w":43.8,"voltage_v":230.2,"current_a":0.19}} +``` + +### Sessions (data/sessions.json) +JSON Array mit Session-Objekten: +```json +[ + { + "session_id": "uuid", + "machine_id": "shaper-origin-01", + "machine_name": "Shaper Origin", + "start_time": "2026-01-22T18:52:59.000Z", + "end_time": "2026-01-22T18:54:01.995Z", + "duration_s": 62, + "start_power_w": 37.1, + "end_power_w": 0.0, + "status": "completed" + } +] +``` + +## Troubleshooting + +### Connection refused +``` +Error: Failed to connect to MQTT Broker: [Errno 111] Connection refused +``` +→ Prüfe ob MQTT Broker läuft und erreichbar ist + +### Permission denied +``` +PermissionError: [Errno 13] Permission denied: 'logs/ows_iot_bridge.log' +``` +→ Stelle sicher, dass das logs/ Verzeichnis beschreibbar ist + +### Invalid config +``` +Error: Config file not found: config.yaml +``` +→ Erstelle config.yaml von config.yaml.example + +## Support + +Siehe Feature Request Dokument: `FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md` diff --git a/open_workshop_mqtt/python_prototype/config.yaml.example b/open_workshop_mqtt/python_prototype/config.yaml.example new file mode 100644 index 0000000..e0a63a4 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/config.yaml.example @@ -0,0 +1,58 @@ +# MQTT Broker Configuration +mqtt: + host: "localhost" # MQTT Broker IP/Hostname + port: 1883 # Standard MQTT Port (1883 unencrypted, 8883 encrypted) + username: "" # Optional: MQTT Username + password: "" # Optional: MQTT Password + client_id: "ows_iot_bridge_prototype" + keepalive: 60 + + # Topics to subscribe + topics: + - "shellies/+/status" # Shelly Status Updates + - "shellypmminig3/events" # Shelly Events + - "shellies/+/online" # Shelly Online Status + # Add more topics as needed + +# Device Configuration +devices: + - shelly_id: "shellypmminig3-48f6eeb73a1c" + machine_name: "Shaper Origin" + machine_id: "shaper-origin-01" + device_type: "shelly_pm_mini_g3" + + # Power threshold for run detection (in Watts) + power_threshold: 50 + + # Debounce times (in seconds) + start_debounce_s: 3 # Power > threshold for X seconds → run_start + stop_debounce_s: 15 # Power < threshold for Y seconds → run_stop + + enabled: true + + # Example for additional device + # - shelly_id: "shellypmminig3-xxxxxxxxxxxx" + # machine_name: "CNC Mill" + # machine_id: "cnc-mill-01" + # device_type: "shelly_pm_mini_g3" + # power_threshold: 100 + # start_debounce_s: 5 + # stop_debounce_s: 20 + # enabled: true + +# Logging Configuration +logging: + level: "INFO" # DEBUG, INFO, WARNING, ERROR + format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + file: "logs/ows_iot_bridge.log" # Optional: log to file + console: true + +# Output Configuration +output: + # Where to store events and sessions + events_file: "data/events.json" + sessions_file: "data/sessions.json" + + # Console output + print_events: true + print_sessions: true diff --git a/open_workshop_mqtt/python_prototype/event_normalizer.py b/open_workshop_mqtt/python_prototype/event_normalizer.py new file mode 100644 index 0000000..a99cfbd --- /dev/null +++ b/open_workshop_mqtt/python_prototype/event_normalizer.py @@ -0,0 +1,192 @@ +""" +Event Normalizer for Open Workshop IoT Bridge + +Converts device-specific data formats into a unified Event Schema v1 +for consistent processing across different IoT device types. +""" + +import logging +import uuid +from datetime import datetime, timezone +from typing import Dict, Optional + + +class EventNormalizer: + """ + Normalizes device-specific events into unified Event Schema v1 + + Event Schema v1: + { + "event_id": "uuid4", + "event_type": "power_measurement", + "timestamp": "2026-01-22T18:30:45.123456Z", + "machine": { + "machine_id": "shaper-origin-01", + "machine_name": "Shaper Origin" + }, + "device": { + "device_id": "48f6eeb73a1c", + "device_type": "shelly_pm_mini_g3", + "topic_prefix": "shaperorigin" + }, + "metrics": { + "power_w": 52.9, + "voltage_v": 234.8, + "current_a": 0.267, + "frequency_hz": 49.9 + }, + "raw_data": { ... } # Optional: Original device data + } + """ + + def __init__(self, device_config: list = None): + """ + Initialize normalizer with device configuration + + Args: + device_config: List of device configurations from config.yaml + """ + self.logger = logging.getLogger(__name__) + self.device_config = device_config or [] + + # Build topic_prefix to machine mapping + self.prefix_machine_map = {} + for device in self.device_config: + topic_prefix = device.get('topic_prefix') + if topic_prefix: + self.prefix_machine_map[topic_prefix] = { + 'machine_id': device.get('machine_id'), + 'machine_name': device.get('machine_name'), + 'device_type': device.get('device_type', 'unknown') + } + + def normalize_shelly_event(self, shelly_data: Dict) -> Optional[Dict]: + """ + Convert Shelly PM Mini G3 data to Event Schema v1 + + Args: + shelly_data: Parsed Shelly data from ShellyParser + + Returns: + Normalized event dict or None if data incomplete + """ + try: + # Extract device ID and find topic prefix + device_id = shelly_data.get('device_id', 'unknown') + topic_prefix = self._find_topic_prefix(device_id, shelly_data) + + if not topic_prefix: + self.logger.warning(f"Could not determine topic_prefix for device {device_id}") + return None + + # Get machine info from config + machine_info = self.prefix_machine_map.get(topic_prefix) + if not machine_info: + self.logger.warning(f"No machine config found for topic_prefix: {topic_prefix}") + return None + + # Build normalized event + event = { + "event_id": str(uuid.uuid4()), + "event_type": self._determine_event_type(shelly_data), + "timestamp": self._normalize_timestamp(shelly_data.get('timestamp')), + "machine": { + "machine_id": machine_info['machine_id'], + "machine_name": machine_info['machine_name'] + }, + "device": { + "device_id": device_id, + "device_type": machine_info['device_type'], + "topic_prefix": topic_prefix + }, + "metrics": self._extract_metrics(shelly_data), + "raw_data": shelly_data # Keep original for debugging + } + + return event + + except Exception as e: + self.logger.error(f"Failed to normalize Shelly event: {e}", exc_info=True) + return None + + def _find_topic_prefix(self, device_id: str, shelly_data: Dict) -> Optional[str]: + """ + Find topic_prefix for device + + Device ID can be: + - topic_prefix itself (e.g. 'shaperorigin' from status messages) + - actual device ID (e.g. '48f6eeb73a1c' from RPC events) + """ + # Check if device_id is already a known topic_prefix + if device_id in self.prefix_machine_map: + return device_id + + # Otherwise, we need to infer it (currently only one device configured) + # For multiple devices, we'd need more sophisticated matching + if len(self.prefix_machine_map) == 1: + return list(self.prefix_machine_map.keys())[0] + + # TODO: For multi-device setups, implement device_id to topic_prefix mapping + self.logger.warning(f"Cannot map device_id {device_id} to topic_prefix") + return None + + def _determine_event_type(self, shelly_data: Dict) -> str: + """Determine event type from Shelly data""" + msg_type = shelly_data.get('message_type', 'unknown') + + if msg_type in ['status', 'event']: + return 'power_measurement' + + return 'unknown' + + def _normalize_timestamp(self, timestamp_str: Optional[str]) -> str: + """ + Normalize timestamp to ISO 8601 UTC format + + Args: + timestamp_str: ISO timestamp string or None + + Returns: + ISO 8601 UTC timestamp string + """ + if timestamp_str: + try: + # Parse and ensure UTC + dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + return dt.astimezone(timezone.utc).isoformat().replace('+00:00', 'Z') + except Exception as e: + self.logger.warning(f"Failed to parse timestamp {timestamp_str}: {e}") + + # Fallback: current time in UTC + return datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') + + def _extract_metrics(self, shelly_data: Dict) -> Dict: + """ + Extract metrics from Shelly data + + Returns dict with standardized metric names + """ + metrics = {} + + # Power (always present in our use case) + # Shelly uses 'apower' (active power) + if 'apower' in shelly_data: + metrics['power_w'] = shelly_data['apower'] + + # Voltage (only in status messages) + if 'voltage' in shelly_data and shelly_data['voltage'] is not None: + metrics['voltage_v'] = shelly_data['voltage'] + + # Current (in status and some events) + if 'current' in shelly_data and shelly_data['current'] is not None: + metrics['current_a'] = shelly_data['current'] + + # Frequency (only in status messages) + if 'frequency' in shelly_data and shelly_data['frequency'] is not None: + metrics['frequency_hz'] = shelly_data['frequency'] + + # Energy counters (if available) + if 'total_energy' in shelly_data and shelly_data['total_energy'] is not None: + metrics['energy_total_wh'] = shelly_data['total_energy'] + + return metrics diff --git a/open_workshop_mqtt/python_prototype/event_storage.py b/open_workshop_mqtt/python_prototype/event_storage.py new file mode 100644 index 0000000..a1c4b9e --- /dev/null +++ b/open_workshop_mqtt/python_prototype/event_storage.py @@ -0,0 +1,188 @@ +""" +Event Storage Module - Persists events and sessions to JSON files +For later migration to Odoo database models +""" + +import json +import logging +from pathlib import Path +from typing import Dict, Optional +from datetime import datetime + + +class EventStorage: + """ + Stores events and sessions to JSON files + + Format designed for easy migration to Odoo models: + - events.jsonl: JSON Lines format (one event per line) → open_workshop.power_event + - sessions.json: JSON array of sessions → open_workshop.session + """ + + def __init__(self, events_file: str = "data/events.jsonl", + sessions_file: str = "data/sessions.json"): + self.logger = logging.getLogger(__name__) + self.events_file = Path(events_file) + self.sessions_file = Path(sessions_file) + + # Ensure data directory exists + self.events_file.parent.mkdir(parents=True, exist_ok=True) + + # Initialize sessions file if it doesn't exist + if not self.sessions_file.exists(): + self._write_sessions([]) + + def store_event(self, event: Dict) -> bool: + """ + Append event to JSONL file (one JSON object per line) + + Format for Odoo migration: + { + "event_id": "uuid", + "event_type": "power_measurement", + "timestamp": "ISO 8601", + "machine": {"machine_id": "...", "machine_name": "..."}, + "device": {...}, + "metrics": {"power_w": 45.7, "voltage_v": 230.2, ...} + } + """ + try: + with open(self.events_file, 'a', encoding='utf-8') as f: + # Write one JSON object per line (JSONL format) + json.dump(event, f, ensure_ascii=False) + f.write('\n') + + self.logger.debug(f"Event {event.get('event_id', 'N/A')} stored") + return True + + except Exception as e: + self.logger.error(f"Failed to store event: {e}") + return False + + def store_session_event(self, session_event: Dict) -> bool: + """ + Store session_start or session_end event + + Updates sessions.json with structured session data + + Format for Odoo migration (open_workshop.session): + { + "session_id": "uuid", + "machine_id": "shaper-origin-01", + "machine_name": "Shaper Origin", + "start_time": "2026-01-22T18:36:20.993Z", + "end_time": "2026-01-22T18:38:01.993Z", // null if running + "duration_s": 101, // null if running + "start_power_w": 45.7, + "end_power_w": 0.0, // null if running + "status": "completed" // or "running" + } + """ + try: + event_type = session_event.get('event_type') + session_id = session_event.get('session_id') + + if event_type == 'session_start': + return self._store_session_start(session_event) + elif event_type == 'session_end': + return self._store_session_end(session_event) + else: + self.logger.warning(f"Unknown session event type: {event_type}") + return False + + except Exception as e: + self.logger.error(f"Failed to store session event: {e}") + return False + + def _store_session_start(self, event: Dict) -> bool: + """Create new session record""" + sessions = self._read_sessions() + + machine = event.get('machine', {}) + session_data = event.get('session_data', {}) + + new_session = { + 'session_id': event['session_id'], + 'machine_id': machine.get('machine_id'), + 'machine_name': machine.get('machine_name'), + 'start_time': session_data.get('start_time'), + 'end_time': None, + 'duration_s': None, + 'start_power_w': event.get('power_w'), + 'end_power_w': None, + 'status': 'running' + } + + sessions.append(new_session) + self._write_sessions(sessions) + + self.logger.info(f"Session {event['session_id'][:8]}... started") + return True + + def _store_session_end(self, event: Dict) -> bool: + """Update existing session with end data""" + sessions = self._read_sessions() + session_id = event['session_id'] + + # Find and update session + for session in sessions: + if session['session_id'] == session_id: + session_data = event.get('session_data', {}) + + session['end_time'] = session_data.get('end_time') + session['duration_s'] = session_data.get('duration_s') + session['end_power_w'] = event.get('power_w') + session['status'] = 'completed' + + self._write_sessions(sessions) + + duration_min = session['duration_s'] / 60 + self.logger.info(f"Session {session_id[:8]}... completed ({duration_min:.1f} min)") + return True + + self.logger.error(f"Session {session_id} not found for update") + return False + + def _read_sessions(self) -> list: + """Read all sessions from JSON file""" + try: + if self.sessions_file.exists(): + with open(self.sessions_file, 'r', encoding='utf-8') as f: + return json.load(f) + return [] + except Exception as e: + self.logger.error(f"Failed to read sessions: {e}") + return [] + + def _write_sessions(self, sessions: list) -> bool: + """Write sessions to JSON file""" + try: + with open(self.sessions_file, 'w', encoding='utf-8') as f: + json.dump(sessions, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + self.logger.error(f"Failed to write sessions: {e}") + return False + + def get_active_sessions(self) -> list: + """Get all running sessions""" + sessions = self._read_sessions() + return [s for s in sessions if s['status'] == 'running'] + + def get_session_by_id(self, session_id: str) -> Optional[Dict]: + """Get session by ID""" + sessions = self._read_sessions() + for session in sessions: + if session['session_id'] == session_id: + return session + return None + + def get_sessions_by_machine(self, machine_id: str, limit: int = 10) -> list: + """Get recent sessions for a machine""" + sessions = self._read_sessions() + machine_sessions = [s for s in sessions if s['machine_id'] == machine_id] + + # Sort by start_time descending + machine_sessions.sort(key=lambda x: x['start_time'], reverse=True) + + return machine_sessions[:limit] diff --git a/open_workshop_mqtt/python_prototype/main.py b/open_workshop_mqtt/python_prototype/main.py new file mode 100644 index 0000000..7b63558 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/main.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +Open Workshop IoT Bridge - Python Prototype +Main entry point for MQTT to Odoo bridge + +Phase 1: Standalone prototype (without Odoo) +""" + +import sys +import logging +import yaml +import time +import signal +import json +from pathlib import Path +from typing import Dict + +from mqtt_client import MQTTClient +from shelly_parser import ShellyParser +from event_normalizer import EventNormalizer +from session_detector import SessionDetector +from event_storage import EventStorage + + +class IoTBridge: + """Main IoT Bridge Application""" + + def __init__(self, config_path: str = 'config.yaml'): + """ + Initialize IoT Bridge + + Args: + config_path: Path to config.yaml file + """ + # Load configuration + self.config = self._load_config(config_path) + + # Setup logging + self._setup_logging() + + self.logger = logging.getLogger(__name__) + self.logger.info("=== Open Workshop IoT Bridge Starting ===") + + # Initialize Shelly Parser with device config + self.shelly_parser = ShellyParser(device_config=self.config.get('devices', [])) + + # Initialize Event Normalizer + self.event_normalizer = EventNormalizer(device_config=self.config.get('devices', [])) + + # Initialize Session Detector + self.session_detector = SessionDetector(device_config=self.config.get('devices', [])) + + # Initialize Event Storage + output_config = self.config.get('output', {}) + self.event_storage = EventStorage( + events_file=output_config.get('events_file', 'data/events.jsonl'), + sessions_file=output_config.get('sessions_file', 'data/sessions.json') + ) + + # Initialize MQTT Client + self.mqtt_client = MQTTClient( + config=self.config['mqtt'], + message_callback=self.on_mqtt_message + ) + + self.running = False + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _load_config(self, config_path: str) -> dict: + """Load configuration from YAML file""" + config_file = Path(config_path) + + if not config_file.exists(): + print(f"Error: Config file not found: {config_path}") + print(f"Please copy config.yaml.example to config.yaml and adjust settings") + sys.exit(1) + + with open(config_file, 'r') as f: + config = yaml.safe_load(f) + + return config + + def _setup_logging(self): + """Setup logging configuration""" + log_config = self.config.get('logging', {}) + + # Create logs directory if logging to file + if log_config.get('file'): + log_file = Path(log_config['file']) + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Configure logging + log_level = getattr(logging, log_config.get('level', 'INFO').upper()) + log_format = log_config.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + handlers = [] + + # Console handler + if log_config.get('console', True): + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter(log_format)) + handlers.append(console_handler) + + # File handler + if log_config.get('file'): + file_handler = logging.FileHandler(log_config['file']) + file_handler.setFormatter(logging.Formatter(log_format)) + handlers.append(file_handler) + + logging.basicConfig( + level=log_level, + format=log_format, + handlers=handlers + ) + + def _signal_handler(self, signum, frame): + """Handle shutdown signals""" + self.logger.info(f"Received signal {signum}, shutting down...") + self.stop() + + def on_mqtt_message(self, topic: str, payload): + """ + Callback for incoming MQTT messages + + Args: + topic: MQTT topic + payload: Message payload (parsed JSON or raw string) + """ + # Log RAW topic for analysis + if 'shellypmminig3' in topic and 'debug' not in topic: + self.logger.info(f"RAW Topic: {topic}") + + # Only process JSON payloads + if not isinstance(payload, dict): + return + + # Parse Shelly message + parsed_data = self.shelly_parser.parse_message(topic, payload) + + if parsed_data: + # Normalize to Event Schema v1 + normalized_event = self.event_normalizer.normalize_shelly_event(parsed_data) + + if normalized_event: + # Store event (for later Odoo migration) + self.event_storage.store_event(normalized_event) + + # Log normalized event (condensed) + metrics = normalized_event.get('metrics', {}) + power_w = metrics.get('power_w', 'N/A') + self.logger.debug( + f"Event: {normalized_event['machine']['machine_name']} - " + f"Power: {power_w}W" + ) + + # Process through Session Detector + session_event = self.session_detector.process_event(normalized_event) + + if session_event: + # Session state change occurred! + self._log_session_event(session_event) + + # Store session event (for Odoo migration) + self.event_storage.store_session_event(session_event) + + def _log_session_event(self, session_event: Dict): + """Log session start/end events""" + self.logger.info("\n" + "=" * 70) + + if session_event['event_type'] == 'session_start': + self.logger.info("🚀 SESSION START") + else: + self.logger.info("🏁 SESSION END") + + self.logger.info("=" * 70) + self.logger.info(f"Session ID: {session_event['session_id']}") + self.logger.info(f"Machine: {session_event['machine']['machine_name']} ({session_event['machine']['machine_id']})") + self.logger.info(f"Timestamp: {session_event['timestamp']}") + self.logger.info(f"Power: {session_event['power_w']:.1f}W") + + session_data = session_event.get('session_data', {}) + self.logger.info(f"Start Time: {session_data.get('start_time')}") + + if 'end_time' in session_data: + self.logger.info(f"End Time: {session_data['end_time']}") + duration_s = session_data.get('duration_s', 0) + duration_min = duration_s / 60 + self.logger.info(f"Duration: {duration_s}s ({duration_min:.2f} min)") + + self.logger.info("=" * 70 + "\n") + + def start(self): + """Start the IoT Bridge""" + self.logger.info("Starting IoT Bridge...") + + # Create data directory + data_dir = Path('data') + data_dir.mkdir(exist_ok=True) + + # Connect to MQTT + if not self.mqtt_client.connect(): + self.logger.error("Failed to connect to MQTT Broker") + return False + + # Start MQTT loop + self.mqtt_client.start() + + # Wait for connection + if not self.mqtt_client.wait_for_connection(timeout=10): + self.logger.error("MQTT connection timeout") + return False + + self.logger.info("IoT Bridge started successfully") + self.logger.info("Listening for MQTT messages... (Press Ctrl+C to stop)") + + self.running = True + + # Main loop + try: + while self.running: + time.sleep(1) + + # TODO: Hier können später periodische Tasks ausgeführt werden + # z.B. Session-Timeout-Checks + + except KeyboardInterrupt: + self.logger.info("Interrupted by user") + + return True + + def stop(self): + """Stop the IoT Bridge""" + self.running = False + self.logger.info("Stopping IoT Bridge...") + self.mqtt_client.stop() + self.logger.info("IoT Bridge stopped") + + +def main(): + """Main entry point""" + # Check if config file exists + config_file = Path('config.yaml') + + if not config_file.exists(): + print("\n" + "="*60) + print("Configuration file not found!") + print("="*60) + print("\nPlease create config.yaml from the example:") + print(" cp config.yaml.example config.yaml") + print("\nThen edit config.yaml with your MQTT broker settings.") + print("="*60 + "\n") + sys.exit(1) + + # Start bridge + bridge = IoTBridge('config.yaml') + bridge.start() + + +if __name__ == '__main__': + main() diff --git a/open_workshop_mqtt/python_prototype/mqtt_client.py b/open_workshop_mqtt/python_prototype/mqtt_client.py new file mode 100644 index 0000000..5e31eb2 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/mqtt_client.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +""" +MQTT Client für Open Workshop IoT Bridge +Verbindet sich mit MQTT Broker und empfängt Device Events +""" + +import logging +import time +import json +import ssl +from typing import Callable, Optional +import paho.mqtt.client as mqtt + + +class MQTTClient: + """MQTT Client Wrapper für Device Event Empfang""" + + def __init__(self, config: dict, message_callback: Optional[Callable] = None): + """ + Initialize MQTT Client + + Args: + config: MQTT configuration dict from config.yaml + message_callback: Callback function for incoming messages + """ + self.config = config + self.message_callback = message_callback + self.logger = logging.getLogger(__name__) + + # Create MQTT Client + self.client = mqtt.Client( + client_id=config.get('client_id', 'ows_iot_bridge'), + protocol=mqtt.MQTTv5 + ) + + # Set callbacks + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + + # Set username/password if provided + if config.get('username') and config.get('password'): + self.client.username_pw_set( + config['username'], + config['password'] + ) + + # Enable TLS/SSL if port is 8883 + if config.get('port') == 8883: + self.client.tls_set(cert_reqs=ssl.CERT_NONE) + self.client.tls_insecure_set(True) + self.logger.info("TLS/SSL enabled for port 8883") + + self.connected = False + self.topics = config.get('topics', []) + + def _on_connect(self, client, userdata, flags, rc, properties=None): + """Callback when connected to MQTT broker""" + if rc == 0: + self.connected = True + self.logger.info(f"Connected to MQTT Broker at {self.config['host']}:{self.config['port']}") + + # Subscribe to all configured topics + for topic in self.topics: + self.client.subscribe(topic) + self.logger.info(f"Subscribed to topic: {topic}") + else: + self.logger.error(f"Failed to connect to MQTT Broker, return code {rc}") + self.connected = False + + def _on_disconnect(self, client, userdata, rc, properties=None): + """Callback when disconnected from MQTT broker""" + self.connected = False + if rc != 0: + self.logger.warning(f"Unexpected disconnect from MQTT Broker (rc={rc}). Reconnecting...") + else: + self.logger.info("Disconnected from MQTT Broker") + + def _on_message(self, client, userdata, msg): + """Callback when message received""" + try: + topic = msg.topic + payload = msg.payload.decode('utf-8') + + # Log all shellypmminig3 topics (exclude debug) + if 'shellypmminig3' in topic and 'debug' not in topic: + self.logger.info(f"📡 TOPIC: {topic}") + + self.logger.debug(f"Message received on topic '{topic}'") + + # Try to parse as JSON + try: + payload_json = json.loads(payload) + except json.JSONDecodeError: + # Debug logs are expected to be non-JSON, only warn for non-debug topics + if 'debug' not in topic: + self.logger.warning(f"Message on '{topic}' is not valid JSON: {payload[:100]}") + payload_json = None + + # Call user callback if provided + if self.message_callback: + self.message_callback(topic, payload_json or payload) + else: + # Default: just log + self.logger.info(f"Topic: {topic}") + if payload_json: + self.logger.info(f"Payload: {json.dumps(payload_json, indent=2)}") + else: + self.logger.info(f"Payload: {payload}") + + except Exception as e: + self.logger.error(f"Error processing message: {e}", exc_info=True) + + def connect(self): + """Connect to MQTT broker""" + try: + self.logger.info(f"Connecting to MQTT Broker {self.config['host']}:{self.config['port']}...") + self.client.connect( + self.config['host'], + self.config['port'], + self.config.get('keepalive', 60) + ) + return True + except Exception as e: + self.logger.error(f"Failed to connect to MQTT Broker: {e}") + return False + + def start(self): + """Start the MQTT client loop (non-blocking)""" + self.client.loop_start() + + def stop(self): + """Stop the MQTT client loop""" + self.client.loop_stop() + self.client.disconnect() + self.logger.info("MQTT Client stopped") + + def publish(self, topic: str, payload: dict): + """ + Publish message to MQTT topic + + Args: + topic: MQTT topic + payload: Message payload (will be JSON encoded) + """ + try: + payload_json = json.dumps(payload) + result = self.client.publish(topic, payload_json) + if result.rc == mqtt.MQTT_ERR_SUCCESS: + self.logger.debug(f"Published to {topic}") + return True + else: + self.logger.error(f"Failed to publish to {topic}, rc={result.rc}") + return False + except Exception as e: + self.logger.error(f"Error publishing to {topic}: {e}") + return False + + def wait_for_connection(self, timeout: int = 10) -> bool: + """ + Wait for connection to be established + + Args: + timeout: Maximum seconds to wait + + Returns: + True if connected, False if timeout + """ + start_time = time.time() + while not self.connected and (time.time() - start_time) < timeout: + time.sleep(0.1) + + return self.connected diff --git a/open_workshop_mqtt/python_prototype/requirements.txt b/open_workshop_mqtt/python_prototype/requirements.txt new file mode 100644 index 0000000..63fd606 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/requirements.txt @@ -0,0 +1,17 @@ +# MQTT Client +paho-mqtt>=2.0.0 + +# Configuration +pyyaml>=6.0 + +# Logging & Utilities +python-dateutil>=2.8.2 + +# For UUID generation +uuid>=1.30 + +# Optional: für spätere JSON Schema Validation +jsonschema>=4.17.0 + +# Optional: für besseres Logging +colorlog>=6.7.0 diff --git a/open_workshop_mqtt/python_prototype/session_detector.py b/open_workshop_mqtt/python_prototype/session_detector.py new file mode 100644 index 0000000..ada22fc --- /dev/null +++ b/open_workshop_mqtt/python_prototype/session_detector.py @@ -0,0 +1,267 @@ +""" +Session Detection Engine for Open Workshop IoT Bridge + +Detects machine run sessions based on power consumption thresholds +with debounce logic to avoid false starts/stops. +""" + +import logging +import uuid +from datetime import datetime, timezone +from typing import Dict, Optional +from enum import Enum + + +class SessionState(Enum): + """Session detection states""" + IDLE = "idle" # Machine off (power < threshold) + STARTING = "starting" # Power above threshold, waiting for debounce + RUNNING = "running" # Confirmed run session active + STOPPING = "stopping" # Power below threshold, waiting for debounce + + +class SessionDetector: + """ + Detects machine run sessions based on power measurements + + State Machine: + IDLE -> STARTING -> RUNNING -> STOPPING -> IDLE + + - IDLE: Power < threshold + - STARTING: Power >= threshold for < start_debounce_s + - RUNNING: Power >= threshold for >= start_debounce_s + - STOPPING: Power < threshold for < stop_debounce_s (while in RUNNING) + - Back to IDLE: Power < threshold for >= stop_debounce_s + """ + + def __init__(self, device_config: list = None): + """ + Initialize session detector + + Args: + device_config: List of device configurations from config.yaml + """ + self.logger = logging.getLogger(__name__) + self.device_config = device_config or [] + + # Build machine_id to config mapping + self.machine_config = {} + for device in self.device_config: + machine_id = device.get('machine_id') + if machine_id: + self.machine_config[machine_id] = { + 'power_threshold': device.get('power_threshold', 50), + 'start_debounce_s': device.get('start_debounce_s', 3), + 'stop_debounce_s': device.get('stop_debounce_s', 15), + 'machine_name': device.get('machine_name', 'Unknown'), + } + + # State tracking per machine + self.machine_states = {} # machine_id -> state info + + def process_event(self, event: Dict) -> Optional[Dict]: + """ + Process a normalized event and detect session changes + + Args: + event: Normalized event from EventNormalizer + + Returns: + Session event dict if state change occurred, None otherwise + + Session Event Format: + { + "session_id": "uuid4", + "event_type": "session_start" | "session_end", + "timestamp": "ISO 8601 UTC", + "machine": { "machine_id": "...", "machine_name": "..." }, + "power_w": 123.4, + "session_data": { + "start_time": "ISO 8601 UTC", + "end_time": "ISO 8601 UTC", # only for session_end + "duration_s": 123 # only for session_end + } + } + """ + # Extract machine info + machine = event.get('machine', {}) + machine_id = machine.get('machine_id') + + if not machine_id: + self.logger.warning("Event missing machine_id, skipping") + return None + + # Get machine config + config = self.machine_config.get(machine_id) + if not config: + self.logger.warning(f"No config found for machine {machine_id}") + return None + + # Extract power measurement + metrics = event.get('metrics', {}) + power_w = metrics.get('power_w') + + if power_w is None: + self.logger.debug(f"Event missing power_w, skipping") + return None + + # Initialize machine state if needed + if machine_id not in self.machine_states: + self.machine_states[machine_id] = { + 'state': SessionState.IDLE, + 'state_since': datetime.now(timezone.utc), + 'current_session_id': None, + 'session_start_time': None, + 'last_power': None, + } + + state_info = self.machine_states[machine_id] + timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00')) + + # Update last power + state_info['last_power'] = power_w + + # Process state machine + return self._process_state_machine( + machine_id=machine_id, + machine_name=machine.get('machine_name'), + power_w=power_w, + timestamp=timestamp, + config=config, + state_info=state_info + ) + + def _process_state_machine( + self, + machine_id: str, + machine_name: str, + power_w: float, + timestamp: datetime, + config: Dict, + state_info: Dict + ) -> Optional[Dict]: + """ + Process state machine logic + + Returns session event if state change occurred + """ + current_state = state_info['state'] + threshold = config['power_threshold'] + start_debounce = config['start_debounce_s'] + stop_debounce = config['stop_debounce_s'] + + time_in_state = (timestamp - state_info['state_since']).total_seconds() + + # State machine transitions + if current_state == SessionState.IDLE: + if power_w >= threshold: + # Transition to STARTING + self.logger.info(f"🟡 {machine_name}: Power {power_w:.1f}W >= {threshold}W → STARTING") + state_info['state'] = SessionState.STARTING + state_info['state_since'] = timestamp + + elif current_state == SessionState.STARTING: + if power_w < threshold: + # False start, back to IDLE + self.logger.info(f"⚪ {machine_name}: Power dropped before debounce → IDLE") + state_info['state'] = SessionState.IDLE + state_info['state_since'] = timestamp + + elif time_in_state >= start_debounce: + # Debounce passed, transition to RUNNING + session_id = str(uuid.uuid4()) + state_info['state'] = SessionState.RUNNING + state_info['state_since'] = timestamp + state_info['current_session_id'] = session_id + state_info['session_start_time'] = timestamp + + self.logger.info(f"🟢 {machine_name}: Session START (debounce {time_in_state:.1f}s) - Power: {power_w:.1f}W") + + # Generate session_start event + return { + 'session_id': session_id, + 'event_type': 'session_start', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': power_w, + 'session_data': { + 'start_time': timestamp.isoformat().replace('+00:00', 'Z') + } + } + + elif current_state == SessionState.RUNNING: + if power_w < threshold: + # Transition to STOPPING + self.logger.info(f"🟠 {machine_name}: Power {power_w:.1f}W < {threshold}W → STOPPING") + state_info['state'] = SessionState.STOPPING + state_info['state_since'] = timestamp + + elif current_state == SessionState.STOPPING: + if power_w >= threshold: + # Power back up, back to RUNNING + self.logger.info(f"🟢 {machine_name}: Power back up → RUNNING") + state_info['state'] = SessionState.RUNNING + state_info['state_since'] = timestamp + + elif time_in_state >= stop_debounce: + # Debounce passed, session ended + session_id = state_info['current_session_id'] + start_time = state_info['session_start_time'] + duration_s = (timestamp - start_time).total_seconds() + + self.logger.info(f"🔴 {machine_name}: Session END (debounce {time_in_state:.1f}s) - Duration: {duration_s:.1f}s") + + # Generate session_end event + session_event = { + 'session_id': session_id, + 'event_type': 'session_end', + 'timestamp': timestamp.isoformat().replace('+00:00', 'Z'), + 'machine': { + 'machine_id': machine_id, + 'machine_name': machine_name + }, + 'power_w': power_w, + 'session_data': { + 'start_time': start_time.isoformat().replace('+00:00', 'Z'), + 'end_time': timestamp.isoformat().replace('+00:00', 'Z'), + 'duration_s': int(duration_s) + } + } + + # Reset to IDLE + state_info['state'] = SessionState.IDLE + state_info['state_since'] = timestamp + state_info['current_session_id'] = None + state_info['session_start_time'] = None + + return session_event + + return None + + def get_machine_state(self, machine_id: str) -> Optional[str]: + """Get current state of a machine""" + state_info = self.machine_states.get(machine_id) + if state_info: + return state_info['state'].value + return None + + def get_active_sessions(self) -> Dict: + """ + Get all currently active sessions + + Returns dict: machine_id -> session info + """ + active_sessions = {} + + for machine_id, state_info in self.machine_states.items(): + if state_info['state'] == SessionState.RUNNING: + active_sessions[machine_id] = { + 'session_id': state_info['current_session_id'], + 'start_time': state_info['session_start_time'].isoformat().replace('+00:00', 'Z'), + 'current_power': state_info['last_power'] + } + + return active_sessions diff --git a/open_workshop_mqtt/python_prototype/setup.sh b/open_workshop_mqtt/python_prototype/setup.sh new file mode 100755 index 0000000..a02a885 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/setup.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Setup script for Open Workshop IoT Bridge Python Prototype + +echo "===================================" +echo "Open Workshop IoT Bridge - Setup" +echo "===================================" +echo "" + +# Check if Python 3 is installed +if ! command -v python3 &> /dev/null; then + echo "Error: Python 3 is not installed" + exit 1 +fi + +echo "Python version: $(python3 --version)" +echo "" + +# Create virtual environment +echo "Creating virtual environment..." +python3 -m venv venv + +if [ $? -ne 0 ]; then + echo "Error: Failed to create virtual environment" + exit 1 +fi + +echo "✓ Virtual environment created" +echo "" + +# Activate virtual environment +echo "Activating virtual environment..." +source venv/bin/activate + +# Upgrade pip +echo "Upgrading pip..." +pip install --upgrade pip + +# Install requirements +echo "" +echo "Installing dependencies from requirements.txt..." +pip install -r requirements.txt + +if [ $? -ne 0 ]; then + echo "Error: Failed to install requirements" + exit 1 +fi + +echo "" +echo "✓ Dependencies installed" +echo "" + +# Check if config.yaml exists +if [ ! -f "config.yaml" ]; then + echo "Creating config.yaml from template..." + cp config.yaml.example config.yaml + echo "✓ config.yaml created" + echo "" + echo "⚠️ Please edit config.yaml with your MQTT broker settings!" + echo "" +fi + +# Create data and logs directories +mkdir -p data logs + +echo "===================================" +echo "Setup complete!" +echo "===================================" +echo "" +echo "Next steps:" +echo " 1. Edit config.yaml with your MQTT broker settings" +echo " 2. Activate venv: source venv/bin/activate" +echo " 3. Run the bridge: python main.py" +echo "" diff --git a/open_workshop_mqtt/python_prototype/shelly_parser.py b/open_workshop_mqtt/python_prototype/shelly_parser.py new file mode 100644 index 0000000..e4419f8 --- /dev/null +++ b/open_workshop_mqtt/python_prototype/shelly_parser.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +""" +Shelly PM Mini G3 Parser +Parst MQTT Messages vom Shelly PM Mini G3 +""" + +import logging +from typing import Dict, Optional +from datetime import datetime + + +class ShellyParser: + """Parser für Shelly PM Mini G3 MQTT Messages""" + + def __init__(self, device_config: list = None): + """ + Initialize parser + + Args: + device_config: List of device configurations from config.yaml + """ + self.logger = logging.getLogger(__name__) + self.device_config = device_config or [] + + # Build topic-prefix to machine mapping + self.prefix_machine_map = {} + for device in self.device_config: + topic_prefix = device.get('topic_prefix') + if topic_prefix: + self.prefix_machine_map[topic_prefix] = { + 'machine_id': device.get('machine_id'), + 'machine_name': device.get('machine_name'), + 'device_id': None # Will be extracted from messages + } + + def parse_message(self, topic: str, payload: dict) -> Optional[Dict]: + """ + Parse Shelly MQTT message + + Args: + topic: MQTT topic + payload: Message payload (already parsed as JSON) + + Returns: + Parsed data dict or None if not a relevant message + """ + # Ignore debug logs + if 'debug/log' in topic: + return None + + # Parse different message types + if '/status/pm1:0' in topic: + return self._parse_status_message(topic, payload) + elif '/events/rpc' in topic: + return self._parse_rpc_event(topic, payload) + elif '/telemetry' in topic: + return self._parse_telemetry(topic, payload) + + return None + + def _parse_status_message(self, topic: str, payload: dict) -> Dict: + """ + Parse full status message + Topic: shaperorigin/status/pm1:0 + """ + # Extract device ID from topic prefix + device_id = self._extract_device_id_from_topic(topic) + + data = { + 'message_type': 'status', + 'device_id': device_id, + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'voltage': payload.get('voltage'), + 'current': payload.get('current'), + 'apower': payload.get('apower'), # Active Power in Watts + 'frequency': payload.get('freq'), + 'total_energy': payload.get('aenergy', {}).get('total'), + } + + self.logger.debug(f"Parsed status message: apower={data['apower']}W") + return data + + def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]: + """ + Parse RPC NotifyStatus event + Topic: shellypmminig3/events/rpc + """ + if payload.get('method') != 'NotifyStatus': + return None + + device_id = payload.get('src', '').replace('shellypmminig3-', '') + params = payload.get('params', {}) + pm_data = params.get('pm1:0', {}) + + # Get timestamp from params or use current time + ts = params.get('ts') + if ts: + timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' + else: + timestamp = datetime.utcnow().isoformat() + 'Z' + + data = { + 'message_type': 'event', + 'device_id': device_id, + 'timestamp': timestamp, + 'apower': pm_data.get('apower'), + 'current': pm_data.get('current'), + 'voltage': pm_data.get('voltage'), + } + + # Only return if we have actual data + if data['apower'] is not None or data['current'] is not None: + self.logger.debug(f"Parsed RPC event: {pm_data}") + return data + + return None + + def _parse_telemetry(self, topic: str, payload: dict) -> Dict: + """ + Parse telemetry message + Topic: shelly/pmmini/shellypmminig3-48f6eeb73a1c/telemetry + """ + # Extract device ID from topic + parts = topic.split('/') + device_id = parts[2] if len(parts) > 2 else 'unknown' + device_id = device_id.replace('shellypmminig3-', '') + + # Get timestamp + ts = payload.get('ts') + if ts: + timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' + else: + timestamp = datetime.utcnow().isoformat() + 'Z' + + data = { + 'message_type': 'telemetry', + 'device_id': device_id, + 'timestamp': timestamp, + 'voltage': payload.get('voltage_v'), + 'current': payload.get('current_a'), + 'apower': payload.get('power_w'), # Active Power in Watts + 'frequency': payload.get('freq_hz'), + 'total_energy': payload.get('energy_wh'), + } + + self.logger.debug(f"Parsed telemetry: apower={data['apower']}W") + return data + + def _extract_device_id_from_topic(self, topic: str) -> str: + """Extract device ID (topic prefix) from topic string""" + # Topic format: shaperorigin/status/pm1:0 + # Extract the prefix (shaperorigin) + parts = topic.split('/') + if len(parts) > 0: + topic_prefix = parts[0] + # Check if this prefix is in our config + if topic_prefix in self.prefix_machine_map: + # Use the actual device_id if we've learned it, otherwise use prefix + device_info = self.prefix_machine_map[topic_prefix] + return device_info.get('device_id') or topic_prefix + return topic_prefix + return 'unknown' + + def get_power_value(self, parsed_data: Dict) -> Optional[float]: + """ + Extract power value from parsed data + + Returns: + Power in Watts or None + """ + return parsed_data.get('apower') + + def get_device_id(self, parsed_data: Dict) -> str: + """Get device ID from parsed data""" + return parsed_data.get('device_id', 'unknown')