diff --git a/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md b/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md index a30ac91..cac6416 100644 --- a/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md +++ b/open_workshop_mqtt/FEATURE_REQUEST_OPEN_WORKSHOP_MQTT_IoT.md @@ -32,49 +32,143 @@ Ziel: **Odoo-18-Community (self-hosted)** soll **Geräte-Events** (Timer/Maschin --- -## 2. Zielarchitektur (Simulation-first) +## 2. Zielarchitektur (Docker-First, Sidecar-Pattern) ### 2.1 Komponenten -1. **MQTT Broker** : Mosquitto in Docker vorhanden -2. **Device Simulator(s)** (Python/Node) - - veröffentlicht MQTT Topics wie echte Geräte -3. **Gateway/Bridge (Software)** - - abonniert MQTT Topics - - validiert/normalisiert Payload - - sendet Events via HTTPS an Odoo (Webhook) -4. **Odoo Modul** `ows_iot_bridge` - - REST Controller `/ows/iot/event` - - Modelle für Devices, Events, Sessions - - Business-Regeln für Session-Start/Stop/Hysterese (softwareseitig) -5. Optional später: **Realtime-Feed** (WebSocket) für POS/Display +1. **MQTT Broker**: Mosquitto in Docker (bereits vorhanden) +2. **IoT Bridge Service**: Separater Docker Container + - Image: `iot_mqtt_bridge_for_odoo` + - Source: `iot_bridge/` Unterverzeichnis + - MQTT Client (subscribed auf Topics) + - Session Detection Engine (State Machine) + - Event-Normalisierung & Parsing + - Kommunikation mit Odoo via REST API +3. **Odoo Container**: Business Logic & Konfiguration + - REST API für Bridge-Konfiguration: `GET /ows/iot/config` + - REST API für Event-Empfang: `POST /ows/iot/event` + - Models: Devices, Events, Sessions + - Admin UI für Device-Management +4. **Docker Compose**: Orchestrierung aller Services + - Shared Network + - Shared Volumes (optional) + - Services starten zusammen: `docker compose up -d` ### 2.2 Datenfluss -1. Simulator publiziert MQTT → `hobbyhimmel/machines//state` -2. Bridge konsumiert MQTT, mappt auf Event-Format -3. Bridge POSTet JSON an Odoo Endpoint -4. Odoo speichert Roh-Event + erzeugt ggf. Session-Updates +``` +Shelly PM → MQTT Broker ← Bridge Service → REST API → Odoo + (Subscribe) (POST) (Business Logic) +``` + +1. Bridge subscribed auf MQTT Topics (z.B. `shaperorigin/status/pm1:0`) +2. Bridge normalisiert Payload zu Unified Event Schema +3. Bridge erkennt Sessions via State Machine (Dual-Threshold, Debounce) +4. Bridge sendet Events via `POST /ows/iot/event` an Odoo +5. Odoo speichert Events + aktualisiert Session-Status + +### 2.3 Vorteile Sidecar-Pattern +- ✅ **Klare Trennung**: Odoo = Business Logic, Bridge = MQTT/Retry/Queue +- ✅ **Unabhängige Prozesse**: Bridge restart ohne Odoo-Downtime +- ✅ **Docker Best Practice**: Ein Prozess pro Container +- ✅ **Einfaches Setup**: `docker compose up -d` startet alles +- ✅ **Kein Overhead**: Gleiche Network/Volumes, kein Extra-Komplexität --- -## 3. Schnittstelle zu Odoo (Kern des Projekts) +## 3. Schnittstellen zwischen Bridge und Odoo -### 3.1 Authentifizierung -- Pro Device oder pro Bridge ein **API-Token** (Bearer) -- Header: `Authorization: Bearer ` -- Token in Odoo in `ows.iot.device` oder `ir.config_parameter` hinterlegt -- Optional: IP-Allowlist / Rate-Limit (in Reverse Proxy) +### 3.1 Bridge → Odoo: Event-Empfang -### 3.2 Endpoint (REST/Webhook) -- `POST /ows/iot/event` -- Content-Type: `application/json` -- Antwort: - - `200 OK` mit `{ "status": "ok", "event_id": "...", "session_id": "..." }` - - Fehler: `400` (Schema), `401/403` (Auth), `409` (Duplikat), `500` (Server) +**Endpoint:** `POST /ows/iot/event` -### 3.3 Idempotenz / Duplikaterkennung -- Jedes Event enthält eine eindeutige `event_uid` -- Odoo legt Unique-Constraint auf `event_uid` (pro device) -- Wiederholte Events liefern `409 Conflict` oder `200 OK (duplicate=true)` (Designentscheidung) +**Authentifizierung (optional):** +- **Empfohlen für**: Produktiv-Umgebung, Multi-Host-Setup, externe Zugriffe +- **Nicht nötig für**: Docker-Compose-Setup (isoliertes Netzwerk) +- Header: `Authorization: Bearer ` (falls aktiviert) +- Token wird Bridge als ENV-Variable übergeben: `ODOO_TOKEN=...` +- Token in Odoo: `ir.config_parameter` → `ows_iot.bridge_token` +- Aktivierung: `ir.config_parameter` → `ows_iot.require_token = true` + +**Request Body:** + +**Variante 1: Session Lifecycle Events** +```json +{ + "schema_version": "v1", + "event_uid": "uuid", + "ts": "2026-01-31T10:30:15Z", + "device_id": "shellypmminig3-48f6eeb73a1c", + "event_type": "session_started", + "payload": { "session_id": "sess-abc123" } +} +``` + +**Variante 2: Session Heartbeat (periodisch, aggregiert)** +```json +{ + "schema_version": "v1", + "event_uid": "uuid", + "ts": "2026-01-31T10:35:00Z", + "device_id": "shellypmminig3-48f6eeb73a1c", + "event_type": "session_heartbeat", + "payload": { + "session_id": "sess-abc123", + "interval_start": "2026-01-31T10:30:00Z", + "interval_end": "2026-01-31T10:35:00Z", + "interval_working_s": 200, + "interval_standby_s": 100, + "current_state": "WORKING", + "avg_power_w": 142.3 + } +} +``` + +**Response:** +- `200 OK`: `{ "status": "ok", "event_id": 123, "session_id": 456 }` +- `400 Bad Request`: Schema-Fehler +- `401 Unauthorized`: Token fehlt/ungültig (nur wenn Auth aktiviert) +- `409 Conflict`: Duplikat (wenn `event_uid` bereits existiert) +- `500 Internal Server Error`: Odoo-Fehler + +**Idempotenz:** +- `event_uid` hat Unique-Constraint in Odoo +- Wiederholte Events → `409 Conflict` (Bridge ignoriert) + +--- + +### 3.2 Bridge ← Odoo: Konfiguration abrufen + +**Endpoint:** `GET /ows/iot/config` + +**Authentifizierung (optional):** +- Header: `Authorization: Bearer ` (falls aktiviert) + +**Response:** +```json +{ + "devices": [ + { + "device_id": "shellypmminig3-48f6eeb73a1c", + "mqtt_topic": "shaperorigin/status/pm1:0", + "parser_type": "shelly_pm_mini_g3", + "machine_name": "Shaper Origin", + "session_config": { + "strategy": "power_threshold", + "standby_threshold_w": 20, + "working_threshold_w": 100, + "start_debounce_s": 3, + "stop_debounce_s": 15, + "message_timeout_s": 20, + "heartbeat_interval_s": 300 + } + } + ] +} +``` + +**Wann aufgerufen:** +- Bridge-Start: Initiale Config laden +- Alle 5 Minuten: Config-Refresh (neue Devices, geänderte Schwellenwerte) +- Bei 401/403 Response von `/ows/iot/event`: Token evtl. ungültig geworden --- @@ -105,10 +199,11 @@ Pflichtfelder: ### 4.3 Event-Typen (v1) **Maschine/Timer** -- `run_start` (Start einer Laufphase) -- `run_stop` (Ende einer Laufphase) -- `heartbeat` (optional, laufend während running) -- `state_changed` (idle/running/fault) +- `session_started` (Session-Start, Bridge erkannt Aktivität) +- `session_heartbeat` (periodische Aggregation während laufender Session) +- `session_ended` (Session-Ende nach Timeout oder manuell) +- `session_timeout` (Session automatisch beendet wegen Inaktivität) +- `state_changed` (optional, Echtzeit-State-Change: IDLE/STANDBY/WORKING) - `fault` (Fehler mit Code/Severity) **Waage** @@ -144,34 +239,48 @@ Pflichtfelder: ### 5.3 `ows.machine.session` (Timer-Sessions) - `machine_id` (Char oder m2o auf bestehendes Maschinenmodell) +- `session_id` (external, von Bridge generiert) - `start_ts`, `stop_ts` -- `duration_s` (computed) -- `state` (running/stopped/aborted) +- `duration_s` (computed: stop_ts - start_ts) +- `total_working_time_s` (Summe aller WORKING-Intervalle) +- `total_standby_time_s` (Summe aller STANDBY-Intervalle) +- `state` (running/stopped/aborted/timeout) - `origin` (sensor/manual/sim) -- `confidence_summary` -- `event_ids` (o2m) +- `billing_units` (computed: ceil(total_working_time_s / billing_unit_seconds)) +- `event_ids` (o2m → session_heartbeat Events) > Hinweis: Wenn du bereits `ows.machine` aus deinem open_workshop nutzt, referenziert `machine_id` direkt dieses Modell. --- -## 6. Verarbeitungslogik (Phase 1: minimal, robust) +## 6. Verarbeitungslogik (Bridge & Odoo) -### 6.1 Session-Automat (State Machine) -- Eingang: Events `run_start`, `run_stop`, optional `heartbeat` -- Regeln: - - `run_start` erstellt neue Session, wenn keine läuft - - `run_start` während laufender Session → nur Log, keine neue Session - - `run_stop` schließt laufende Session - - Timeout-Regel: wenn `heartbeat` ausbleibt (z. B. 10 min) → Session `aborted` +### 6.1 Bridge: State Tracking & Aggregation +**State Machine (5 Zustände):** +- `IDLE` (0-20W): Maschine aus/Leerlauf +- `STARTING` (Debounce): Power > standby_threshold für < start_debounce_s +- `STANDBY` (20-100W): Maschine an, aber nicht aktiv arbeitend +- `WORKING` (>100W): Maschine arbeitet aktiv +- `STOPPING` (Debounce): Power < standby_threshold für < stop_debounce_s -### 6.2 Hysterese (Simulation als Stellvertreter für Sensorik) -In Simulation definierst du: -- Start, wenn „running“ mindestens **2s stabil** -- Stop, wenn „idle“ mindestens **15s stabil** -Diese Parameter als Odoo Systemparameter, z. B.: -- `ows_iot.start_debounce_s` -- `ows_iot.stop_debounce_s` +**Aggregation Logic:** +- Bridge trackt intern State-Wechsel (1 msg/s von Shelly) +- Alle `heartbeat_interval_s` (z.B. 300s = 5 Min): + - Berechne `interval_working_s` (Summe aller WORKING-Zeiten) + - Berechne `interval_standby_s` (Summe aller STANDBY-Zeiten) + - Sende `session_heartbeat` an Odoo +- Bei Session-Start: `session_started` Event +- Bei Session-Ende: `session_ended` Event mit Totals + +### 6.2 Odoo: Session-Aggregation & Billing +- Empfängt `session_heartbeat` Events +- Summiert `total_working_time_s` und `total_standby_time_s` +- Berechnet `billing_units` nach Maschinen-Konfiguration: + ```python + billing_unit_minutes = machine.billing_unit_minutes # z.B. 5 + billing_units = ceil(total_working_time_s / (billing_unit_minutes * 60)) + ``` +- Timeout-Regel: wenn kein Heartbeat für `2 * heartbeat_interval_s` → Session `timeout` --- @@ -289,46 +398,94 @@ Diese Parameter als Odoo Systemparameter, z. B.: --- -### Phase 2: Odoo-Integration +### Phase 2: Docker-Container-Architektur -#### M6 – Odoo Modul Grundgerüst (0.5 Tag) -**Deliverables** -- Odoo Modul `ows_iot_bridge` -- Modelle: `ows.iot.device`, `ows.iot.event`, `ows.machine.session` -- Admin UI: Device-Liste, Event-Log, Session-Übersicht +#### M6 – IoT Bridge Docker Container (2-3 Tage) +**Deliverables:** +- Verzeichnis: `iot_bridge/` + - `main.py` - Bridge Hauptprogramm + - `mqtt_client.py` - MQTT Client (von Odoo portiert) + - `session_detector.py` - State Machine (von Odoo portiert) + - `parsers/` - Shelly, Tasmota, Generic + - `odoo_client.py` - REST API Client für Odoo + - `config.py` - Config Management (ENV + Odoo) + - `requirements.txt` - Python Dependencies + - `Dockerfile` - Multi-stage Build +- Docker Image: `iot_mqtt_bridge_for_odoo` +- ENV-Variablen: + - `ODOO_URL` (z.B. `http://odoo:8069`) - **Pflicht** + - `MQTT_URL` (z.B. `mqtt://mosquitto:1883`) - **Pflicht** + - `ODOO_TOKEN` (API Token für Bridge) - **Optional**, nur wenn Auth in Odoo aktiviert + - `LOG_LEVEL`, `CONFIG_REFRESH_INTERVAL` - Optional + +**Test:** +- Bridge startet via `docker run` +- Verbindet zu MQTT Broker +- Holt Config von Odoo via `GET /ows/iot/config` +- Subscribed auf Topics --- -#### M7 – REST Endpoint & Authentication (1 Tag) -**Deliverables** -- Controller `/ows/iot/event` (POST) -- Token-basierte Authentifizierung +#### M7 – Odoo REST API Endpoints (1 Tag) +**Deliverables:** +- Controller: `controllers/iot_api.py` + - `GET /ows/iot/config` - Bridge-Konfiguration + - `POST /ows/iot/event` - Event-Empfang +- **Optionale** Token-Authentifizierung (Bearer, via `ows_iot.require_token`) - Event-Validation gegen Schema v1 -- Event-Speicherung in `ows.iot.event` +- Event → Session Mapping -**Test**: Python-Script POSTet Events an Odoo, werden gespeichert +**Test:** +- `curl -H "Authorization: Bearer " http://localhost:8069/ows/iot/config` +- `curl -X POST -H "Authorization: Bearer " -d '{...}' http://localhost:8069/ows/iot/event` --- -#### M8 – Python-Bridge Integration (1 Tag) -**Deliverables** -- Python-Prototyp wird zur Odoo-Bridge: - - Events werden an Odoo Endpoint gesendet (statt nur File-Export) - - Retry-Queue bei Odoo-Ausfall -- Bridge läuft als separater Service (Docker optional) +#### M8 – Docker Compose Integration (0.5 Tag) +**Deliverables:** +- `docker-compose.yaml` Update: + ```yaml + services: + odoo: + # ... existing config ... + + iot_bridge: + image: iot_mqtt_bridge_for_odoo + environment: + ODOO_URL: http://odoo:8069 + MQTT_URL: mqtt://mosquitto:1883 + # ODOO_TOKEN: ${IOT_BRIDGE_TOKEN} # Optional: nur für Produktiv + depends_on: + - odoo + - mosquitto + networks: + - odoo_network + restart: unless-stopped + ``` +- `.env` Template mit `IOT_BRIDGE_TOKEN` (optional, für Produktiv-Setup) +- README Update: Setup-Anleitung -**Test**: End-to-End: Shelly → MQTT → Bridge → Odoo → Event sichtbar in Odoo +**Test:** +- `docker compose up -d` startet Odoo + Bridge + Mosquitto +- Bridge subscribed Topics +- Events erscheinen in Odoo UI --- -#### M9 – Session-Engine in Odoo (1 Tag) -**Deliverables** -- Session-Logik aus Python-Prototyp nach Odoo portieren -- Events → Session-Zuordnung -- Session-Updates bei run_start/run_stop -- Admin UI: Sessions anzeigen, Filter nach Maschine/Zeitraum +#### M9 – End-to-End Tests & Dokumentation (1 Tag) +**Deliverables:** +- Integration Tests: Shelly → MQTT → Bridge → Odoo +- Session Tests: run_start/run_stop Detection +- Container Restart Tests: Bridge/Odoo Recovery +- Dokumentation: + - `iot_bridge/README.md` - Bridge Architektur + - `DEPLOYMENT.md` - Docker Compose Setup + - API Docs - REST Endpoints -**Test**: Sessions werden in Odoo korrekt erstellt und geschlossen +**Test:** +- End-to-End: Shelly PM einschalten → Session erscheint in Odoo +- Container Restart → Sessions werden recovered +- Config-Änderung in Odoo → Bridge lädt neu (nach 5 Min) --- @@ -380,7 +537,8 @@ Diese Parameter als Odoo Systemparameter, z. B.: ## 11. Offene Entscheidungen (später, nicht blocker für MVP) - Event-Consumer in Odoo vs. Bridge-only Webhook (empfohlen: Webhook) -- Genaues Mapping zu bestehenden open_workshop Modellen (`ows.machine`, Nutzer/RFID) +- Genaues Mapping zu bestehenden open_workshop Modellen (`ows.machine`) +- User-Zuordnung zu Sessions (manuell über UI oder zukünftig via RFID/NFC) - Abrechnung: Preisregeln, Rundungen, POS-Integration - Realtime: Odoo Bus vs. eigener WebSocket Service @@ -489,26 +647,54 @@ stop_debounce_s: 15 # Power < threshold für 15s → run_stop --- -## Anhang A: Beispiel-Event (Maschine run_start) +## Anhang A: Beispiel-Event (Session Heartbeat) + +### Szenario: 5 Minuten mit mehreren State-Wechseln + +**Bridge-internes Tracking (nicht an Odoo gesendet):** +``` +10:00:00 120W WORKING (Session startet) +10:01:30 35W STANDBY (State-Wechsel) +10:02:15 155W WORKING (State-Wechsel) +10:03:00 28W STANDBY (State-Wechsel) +10:04:30 142W WORKING (State-Wechsel) +10:05:00 138W WORKING (Heartbeat-Intervall erreicht) +``` + +**Aggregiertes Event an Odoo:** ```json { "schema_version": "v1", - "event_uid": "c2a7d6f1-7d8d-4a63-9a7f-4e6d7b0d9e2a", - "ts": "2026-01-10T12:34:56Z", - "source": "simulator", - "device_id": "esp32-fraser-01", + "event_uid": "heartbeat-sess-abc123-10-05-00", + "ts": "2026-01-31T10:05:00Z", + "device_id": "shellypmminig3-48f6eeb73a1c", "entity_type": "machine", - "entity_id": "formatkreissaege", - "event_type": "run_start", + "entity_id": "shaper-origin-01", + "event_type": "session_heartbeat", "confidence": "high", "payload": { - "power_w": 820, - "vibration": 0.73, - "reason": "power_threshold" + "session_id": "sess-abc123", + "interval_start": "2026-01-31T10:00:00Z", + "interval_end": "2026-01-31T10:05:00Z", + "interval_working_s": 200, + "interval_standby_s": 100, + "current_state": "WORKING", + "avg_power_w": 142.3, + "state_change_count": 4 } } ``` +**Odoo Verarbeitung:** +```python +# Session Update +session.total_working_time_s += 200 # += interval_working_s +session.total_standby_time_s += 100 # += interval_standby_s + +# Billing Berechnung (5-Minuten-Einheiten) +billing_units = ceil(session.total_working_time_s / 300) # 200s / 300s = 0.67 → 1 Einheit +``` + ## Anhang B: Beispiel-Event (Waage stable_weight) ```json { diff --git a/open_workshop_mqtt/README.md b/open_workshop_mqtt/README.md index da45689..8f67f86 100644 --- a/open_workshop_mqtt/README.md +++ b/open_workshop_mqtt/README.md @@ -1,43 +1,114 @@ # Open Workshop MQTT -MQTT IoT Device Integration for Odoo 18 +**MQTT IoT Bridge for Odoo 18** - Sidecar-Container-Architektur + +## Architektur-Übersicht + +``` +┌─────────────┐ MQTT ┌──────────────┐ REST API ┌────────────┐ +│ Shelly PM │ ────────────────► │ IoT Bridge │ ──────────────► │ Odoo 18 │ +│ (Hardware) │ │ (Container) │ │ (Business) │ +└─────────────┘ └──────────────┘ └────────────┘ + │ │ + ▼ │ + ┌──────────────┐ │ + │ Mosquitto │ ◄──────────────────────┘ + │ MQTT Broker │ (Config via API) + └──────────────┘ +``` + +### Komponenten + +1. **IoT Bridge** (Separater Docker Container) + - MQTT Client (subscribed auf Device-Topics) + - Session Detection Engine (State Machine) + - Event-Parsing & Normalisierung + - REST Client für Odoo-Kommunikation + - Image: `iot_mqtt_bridge_for_odoo` + - Source: `iot_bridge/` Verzeichnis + +2. **Odoo Module** (Business Logic) + - Device-Management UI + - REST API für Bridge (`/ows/iot/config`, `/ows/iot/event`) + - Event-Speicherung & Session-Verwaltung + - Analytics & Reporting + +3. **MQTT Broker** (Mosquitto) + - Message-Transport zwischen Hardware und Bridge ## Features -- ✅ **MQTT Broker Connection** - Connect to external MQTT brokers (Mosquitto, etc.) -- ✅ **Device Management** - Configure and monitor IoT devices -- ✅ **Session Tracking** - Automatic runtime session detection -- ✅ **Flexible Parsers** - Support for Shelly PM Mini G3, Tasmota, Generic JSON -- ✅ **Session Strategies** - Power threshold, Last Will Testament, Manual control +- ✅ **Sidecar-Pattern** - Bridge als separater Container (Docker Best Practice) +- ✅ **Klare Trennung** - Odoo = Business, Bridge = MQTT/Retry/Queue +- ✅ **Auto-Config** - Bridge holt Device-Config von Odoo via REST API +- ✅ **Session Detection** - Dual-Threshold, Debounce, Timeout (in Bridge) +- ✅ **Flexible Parsers** - Shelly PM Mini G3, Tasmota, Generic JSON - ✅ **Analytics** - Pivot tables and graphs for runtime analysis -- ✅ **Auto-Reconnect** - Exponential backoff on connection loss -- ✅ **Message Logging** - Debug log for MQTT messages +- ✅ **Auto-Reconnect** - Exponential backoff on MQTT connection loss +- ✅ **Idempotenz** - Event-UID verhindert Duplikate ## Installation -1. Install Python dependencies: - ```bash - pip install paho-mqtt - ``` +### 1. Docker Compose Setup -2. Install the module in Odoo: - - Apps → Update Apps List - - Search for "Open Workshop MQTT" - - Click Install +```yaml +services: + odoo: + # ... existing config ... + + iot_bridge: + image: iot_mqtt_bridge_for_odoo + build: + context: ./extra-addons/open_workshop/open_workshop_mqtt/iot_bridge + environment: + ODOO_URL: http://odoo:8069 + ODOO_TOKEN: ${IOT_BRIDGE_TOKEN} + MQTT_URL: mqtt://mosquitto:1883 + depends_on: + - odoo + - mosquitto + restart: unless-stopped +``` + +### 2. Odoo Module Installation + +```bash +docker compose exec odoo odoo -u open_workshop_mqtt +``` + +### 3. Bridge Token generieren + +Odoo UI: +- Settings → Technical → System Parameters +- Create: `ows_iot.bridge_token` = `` + +Add to `.env`: +``` +IOT_BRIDGE_TOKEN= +``` + +### 4. Start Services + +```bash +docker compose up -d +``` ## Quick Start -1. **Create MQTT Connection** - - MQTT → Connections → Create - - Enter broker details (host, port, credentials) - - Click "Test Connection" then "Start" - -2. **Add Device** +1. **Add Device in Odoo** - MQTT → Devices → Create - - Select connection - - Configure device ID and topic pattern - - Choose parser type (Shelly, Tasmota, etc.) - - Set session detection strategy + - Device ID: `shellypmminig3-48f6eeb73a1c` + - MQTT Topic: `shaperorigin/status/pm1:0` + - Parser: Shelly PM Mini G3 + - Session Strategy: Power Threshold + - Standby: 20W + - Working: 100W + - Debounce: 3s / 15s + +2. **Bridge Auto-Config** + - Bridge holt Device-Config via `GET /ows/iot/config` + - Subscribed auf Topic automatisch + - Startet Session Detection 3. **Monitor Sessions** - MQTT → Sessions diff --git a/open_workshop_mqtt/TODO.md b/open_workshop_mqtt/TODO.md index 0cf6955..40c6ac5 100644 --- a/open_workshop_mqtt/TODO.md +++ b/open_workshop_mqtt/TODO.md @@ -16,12 +16,23 @@ --- -### ✅ Phase 2: Odoo Integration (100% FERTIG!) +### 🔄 Phase 2: ARCHITEKTUR-REDESIGN (IN ARBEIT) -**Architektur:** Odoo macht ALLES direkt (kein REST API, keine externe Bridge) +**ALTE Architektur (VERALTET):** ``` Shelly PM → MQTT Broker → Odoo (MQTT Client) → Session Detection → Sessions ``` +❌ **Problem:** MQTT Client in Odoo = zu eng gekoppelt, Container-Restart-Issues + +**NEUE Architektur (Sidecar-Pattern):** +``` +Shelly PM → MQTT Broker ← Bridge Container → REST API → Odoo (Business Logic) +``` +✅ **Vorteile:** +- Klare Prozess-Trennung (Odoo = HTTP+Business, Bridge = MQTT+Retry) +- Bridge restart unabhängig von Odoo +- Docker Best Practice (ein Prozess pro Container) +- Einfache Orchestrierung via `docker compose up -d` #### ✅ M6: Odoo Modul Grundgerüst (FERTIG - 100%) - [x] Modul `open_workshop_mqtt` erstellt @@ -63,130 +74,125 @@ Shelly PM → MQTT Broker → Odoo (MQTT Client) → Session Detection → Sessi --- -#### ✅ M7: Session-Engine in Odoo (FERTIG - 100%) [war M9] +#### ~~❌ M7: Session-Engine in Odoo~~ (GESTRICHEN - wird zu Bridge!) -**Was funktioniert:** -- [x] `mqtt.session` Model mit Live-Tracking Fields -- [x] Session CRUD (Create, Read, Update, Delete) -- [x] Session Views (Form, List, Pivot, Graph, Analytics) -- [x] Device-Session Verknüpfung -- [x] **VOLLSTÄNDIGE SessionDetector State Machine** portiert aus Python Prototype: - - [x] Dual-Threshold Detection (standby_threshold_w, working_threshold_w) - - [x] Debounce Timer (start_debounce_s, stop_debounce_s) - - [x] 5-State Machine: IDLE → STARTING → STANDBY/WORKING → STOPPING → IDLE - - [x] Timeout Detection (message_timeout_s) - - [x] Duration Tracking (standby_duration_s, working_duration_s) - - [x] State Recovery nach Odoo Restart - - [x] `end_reason` Logic (normal/timeout/power_drop) -- [x] **ENV-PASSING ARCHITECTURE** - Odoo Cursor Management gelöst: - - [x] SessionDetector speichert nur Primitives (device_id, device_name) - - [x] Frische `env` wird bei jedem Aufruf übergeben - - [x] Alle 20+ Method Signatures refactored - - [x] Keine "Cursor already closed" Fehler mehr! +**Alte Architektur (in Odoo implementiert):** +- ✅ SessionDetector State Machine in `services/session_detector.py` +- ✅ 5-State Machine: IDLE → STARTING → STANDBY/WORKING → STOPPING → IDLE +- ✅ Dual-Threshold Detection + Debounce +- ✅ Alle 7 Unit Tests grün -**LIVE GETESTET (29./30. Januar 2026):** -- [x] MQTT Messages fließen von Shelly PM → Broker → Odoo -- [x] SessionDetector wird bei jeder Message aufgerufen -- [x] State Transitions funktionieren: STANDBY → STOPPING → STANDBY → WORKING -- [x] Sessions werden erstellt und in DB gespeichert -- [x] Live-Updates: current_power_w, current_state, last_message_time -- [x] Timeout Detection funktioniert (Session ENDED nach 20s ohne Messages) +**NEUE Architektur:** +- 🔄 SessionDetector wird in Bridge Container portiert +- 🔄 Odoo bekommt nur noch fertige Events (run_start/run_stop) +- 🔄 Sessions werden in Odoo via REST API erstellt -**Code-Location:** -- ✅ State Machine: `services/session_detector.py` (341 Zeilen, vollständig portiert) -- ✅ Integration: `services/iot_bridge_service.py` (_process_session, _get_or_create_detector) -- ✅ Model: `models/mqtt_session.py` (erweitert mit Live-Tracking Fields) -- ✅ Strategy Config: `models/mqtt_device.py` + `views/mqtt_device_views.xml:106` +**Code wird wiederverwendet:** +- `services/session_detector.py` → `iot_bridge/session_detector.py` (leicht angepasst) +- State Machine Logic bleibt identisch +- Nur Odoo-Abhängigkeiten (env, cursor) werden entfernt -**✅ TESTS REPARIERT (30. Januar 2026):** -- [x] **Unit Tests für env-passing angepasst**: - - File: `tests/test_session_detector.py` - - Fix: SessionDetector Signatur geändert zu `SessionDetector(device.id, device.name)` - - Fix: Alle process_power_event() Aufrufe mit `self.env` erweitert - - Alle 7 Tests erfolgreich angepasst: - - [x] `test_01_idle_to_starting_on_power_above_threshold()` - - [x] `test_02_starting_to_idle_on_power_drop()` - - [x] `test_03_standby_to_working_transition()` - - [x] `test_04_working_to_stopping_transition()` - - [x] `test_05_timeout_detection()` - - [x] `test_06_duration_tracking()` - - [x] `test_07_state_recovery_after_restart()` +--- -- [x] **test_mqtt_mocked.py repariert**: - - Problem: Tests verwendeten falsche Service-Methoden-Signaturen - - Fix: `start_connection_with_env(connection_id, env)` statt `start_connection_with_env(env)` - - Fix: `stop_connection(connection_id)` Parameter hinzugefügt - - Fix: `device.topic_pattern` statt nicht-existierendem `device.device_id` - - Fix: IotBridgeService direkt instanziiert (kein Odoo-Model) - - Alle 4 Mock-Tests funktionieren jetzt +#### 🔧 M7: IoT Bridge Docker Container (IN ARBEIT - 0%) -**Test Status (30. Januar 2026 - 16:40 Uhr):** -``` -✅ ALLE TESTS GRÜN - run-tests.sh erfolgreich: -- 26 Tests total -- 0 failed -- 0 errors +**Ziel:** Separater Container für MQTT Bridge -Test-Suites: -- ✅ TestSessionDetector (7 Tests) - State Machine vollständig getestet -- ✅ TestMQTTConnectionMocked (4 Tests) - Mock-basierte Service Tests -- ✅ TestDeviceStatus (4 Tests) - Device Model Tests -- ✅ TestTopicMatching (4 Tests) - MQTT Topic Pattern Tests -- ⏭️ TestMQTTConnection (3 Tests) - Skipped (echte Broker-Tests) -- ⏭️ TestSessionDetection (4 Tests) - Skipped (hängen in TransactionCase) +**Location:** `iot_bridge/` Unterverzeichnis -🎉 Phase 2 SessionDetector ist FERTIG! +**Tasks:** +- [ ] Projekt-Struktur erstellen: + ``` + iot_bridge/ + main.py # Bridge Hauptprogramm + mqtt_client.py # MQTT Client (aus services/ portiert) + session_detector.py # State Machine (aus services/ portiert) + odoo_client.py # REST API Client für Odoo + config.py # Config Management + parsers/ + shelly_parser.py + tasmota_parser.py + requirements.txt + Dockerfile + README.md + ``` +- [ ] Docker Image: `iot_mqtt_bridge_for_odoo` +- [ ] ENV-Variablen: + - `ODOO_URL` (z.B. `http://odoo:8069`) + - `ODOO_TOKEN` (API Token) + - `MQTT_URL` (z.B. `mqtt://mosquitto:1883`) +- [ ] Code-Portierung aus Odoo: + - SessionDetector State Machine + - MQTT Client (ohne Odoo-Abhängigkeiten) + - Shelly Parser +- [ ] Retry-Queue bei Odoo-Ausfall (lokal, in-memory oder SQLite) +- [ ] Config-Refresh alle 5 Min via `GET /ows/iot/config` + +**Test:** +- `docker build -t iot_mqtt_bridge_for_odoo iot_bridge/` +- `docker run -e ODOO_URL=... -e ODOO_TOKEN=... iot_mqtt_bridge_for_odoo` + +--- + +#### 🔧 M8: Odoo REST API Endpoints (IN ARBEIT - 0%) + +**Ziel:** Odoo bietet REST API für Bridge-Kommunikation + +**Tasks:** +- [ ] Controller: `controllers/iot_api.py` + - [ ] `GET /ows/iot/config` - Device-Config für Bridge + - [ ] `POST /ows/iot/event` - Event-Empfang von Bridge +- [ ] Authentifizierung: + - [ ] Token in `ir.config_parameter`: `ows_iot.bridge_token` + - [ ] Bearer Token Validation in Controller +- [ ] Event-Validation: + - [ ] Schema v1 prüfen + - [ ] Unique-Constraint auf `event_uid` + - [ ] 409 Conflict bei Duplikaten +- [ ] Session-Mapping: + - [ ] Events → Session-Updates + - [ ] run_start/run_stop Events + +**Test:** +```bash +# Config abrufen +curl -H "Authorization: Bearer " http://localhost:8069/ows/iot/config + +# Event senden +curl -X POST -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"schema_version":"v1","event_uid":"...","device_id":"..."}' \ + http://localhost:8069/ows/iot/event ``` --- -#### ✅ M8: Test-Infrastruktur repariert (FERTIG - 100%) [war M10] +#### 🔧 M9: Docker Compose Integration (TODO - 0%) -**Problem:** -- Alte Tests mit echtem MQTT Broker hängen in TransactionCase -- `test_mqtt_mocked.py` erstellt, aber nicht aktiv +**Tasks:** +- [ ] `docker-compose.yaml` Update: + ```yaml + services: + iot_bridge: + image: iot_mqtt_bridge_for_odoo + build: + context: ./extra-addons/open_workshop/open_workshop_mqtt/iot_bridge + environment: + ODOO_URL: http://odoo:8069 + ODOO_TOKEN: ${IOT_BRIDGE_TOKEN} + MQTT_URL: mqtt://mosquitto:1883 + depends_on: + - odoo + - mosquitto + restart: unless-stopped + ``` +- [ ] `.env` Template mit `IOT_BRIDGE_TOKEN` +- [ ] `DEPLOYMENT.md` - Setup-Anleitung -**✅ ERLEDIGT:** -1. [x] **Test-Infrastruktur funktioniert** - - `test_session_detector.py`: Alle 7 Tests grün - - `test_mqtt_mocked.py`: Alle 4 Tests grün - - Hängende Tests mit echtem Broker: Skipped (bekanntes TransactionCase Problem) - - `run-tests.sh` läuft fehlerfrei durch - -2. [x] **run-tests.sh verbessert** - - Zeigt klare Zusammenfassung: "✓✓✓ ALL TESTS PASSED ✓✓✓" oder "✗✗✗ TESTS FAILED ✗✗✗" - - Parsed Odoo Test-Output korrekt - - Exit Code korrekt (0 = success, 1 = failure) - - Logs in `test_YYYYMMDD_HHMMSS.log` - -3. [x] **Test-Coverage komplett für SessionDetector** - - State Machine: Alle 5 States getestet (IDLE/STARTING/STANDBY/WORKING/STOPPING) - - Debounce Logic: Start + Stop Timer getestet - - Duration Tracking: Standby/Working Zeiten akkurat - - Timeout Detection: 20s Message Timeout funktioniert - - State Recovery: Nach Restart funktioniert - -**Dokumentation (am Ende):** -- [x] README.md erstellt -- [x] Feature Request aktuell -- [x] TODO.md (dieses Dokument) -- [ ] API Dokumentation (`/docs/API.md`) - - Endpoint Schema (Event v1) - - Authentication - - Error Codes -- [ ] Deployment Guide (`/docs/DEPLOYMENT.md`) - - Production Setup - - Docker Compose Example - - Systemd Service -- [ ] Troubleshooting Guide (`/docs/TROUBLESHOOTING.md`) - ---- - -## ZusCODE IMPLEMENTIERT (manuell getestet) -1. **MQTT Connection** - Broker Verbindung mit TLS, Auto-Reconnect ✅ -2. **Device Management** - Devices anlegen, konfigurieren ✅ -3. **Message Parsing** - Shelly PM Mini G3 Payloads parsen ✅ -4. **Session Detection** - Dual-Threshold State Machine ✅ +**Test:** +- `docker compose up -d` startet Odoo + Bridge +- Bridge logs zeigen MQTT connection +- Events erscheinen in Odoo UI - Standby/Working Thresholds ✅ - Debounce (Start: 3s, Stop: 15s) ✅ - 5-State Machine (IDLE → STARTING → STANDBY/WORKING → STOPPING → IDLE) ✅ diff --git a/open_workshop_mqtt/models/mqtt_connection.py b/open_workshop_mqtt/models/mqtt_connection.py index 6aaa058..ca2c04c 100644 --- a/open_workshop_mqtt/models/mqtt_connection.py +++ b/open_workshop_mqtt/models/mqtt_connection.py @@ -157,75 +157,22 @@ class MqttConnection(models.Model): # ========== Action Methods ========== def action_start(self): - """Start MQTT Bridge Service for this connection""" + """Start MQTT connection - DEPRECATED: Use standalone IoT Bridge container""" self.ensure_one() - - if self.state == 'connected': - raise UserError(_('Connection is already running')) - - try: - from ..services.iot_bridge_service import IotBridgeService - - _logger.info(f"Starting MQTT connection: {self.name}") - - # Get service instance - service = IotBridgeService.get_instance(self.env) - - # Start connection - if service.start_connection(self.id): - # Invalidate cache to force refresh - self.invalidate_recordset(['state', 'last_connected', 'last_error']) - - # Reload current view - return { - 'type': 'ir.actions.client', - 'tag': 'reload', - } - else: - raise UserError(_('Failed to start connection - check logs for details')) - - except Exception as e: - _logger.error(f"Failed to start MQTT connection: {e}", exc_info=True) - self.write({ - 'state': 'error', - 'last_error': str(e), - }) - raise UserError(_('Failed to start connection: %s') % str(e)) + raise UserError(_( + 'The integrated MQTT service has been removed.\n\n' + 'Please use the standalone IoT Bridge container instead.\n' + 'See docker-compose.dev.yaml for setup instructions.' + )) def action_stop(self): - """Stop MQTT Bridge Service for this connection""" + """Stop MQTT connection - DEPRECATED: Use standalone IoT Bridge container""" self.ensure_one() - - if self.state == 'stopped': - raise UserError(_('Connection is already stopped')) - - try: - from ..services.iot_bridge_service import IotBridgeService - - _logger.info(f"Stopping MQTT connection: {self.name}") - - # Get service instance - service = IotBridgeService.get_instance(self.env) - - # Check if connection is actually running in service - is_running = service.is_running(self.id) - - if is_running: - # Stop the actual MQTT connection - service.stop_connection(self.id) - else: - _logger.warning(f"Connection {self.id} not running in service, updating state only") - - # Update state in current transaction (will be committed by Odoo) - self.write({ - 'state': 'stopped', - }) - - return True - - except Exception as e: - _logger.error(f"Failed to stop MQTT connection: {e}", exc_info=True) - raise UserError(_('Failed to stop connection: %s') % str(e)) + raise UserError(_( + 'The integrated MQTT service has been removed.\n\n' + 'Please use the standalone IoT Bridge container instead.\n' + 'See docker-compose.dev.yaml for setup instructions.' + )) def action_test_connection(self): """Test MQTT connection""" @@ -360,59 +307,11 @@ class MqttConnection(models.Model): # ========== Auto-Start on Odoo Restart ========== @api.model def _register_hook(self): - """Auto-start all connected connections when Odoo starts""" + """Auto-start disabled - using standalone IoT Bridge container""" res = super()._register_hook() - # First: Reset any zombie 'connected' states from previous Odoo instance - # (Container restart kills Python process without calling on_disconnect) - from ..services.iot_bridge_service import IotBridgeService - service = IotBridgeService.get_instance(self.env) - service._reset_connection_states_after_restart() - service._cleanup_stale_sessions_after_restart() - service._restore_detector_states() - # Then: Auto-start connections that should be running - self.auto_start_all_connections() + _logger.info("MQTT auto-start disabled - use standalone IoT Bridge container") return res - @api.model - def auto_start_all_connections(self): - """ - Auto-start all connections that were running before Odoo restart - Searches for connections with last_connected timestamp (not state='connected'!) - because state gets reset during restart - """ - try: - # Find connections that were running before restart (have last_connected timestamp) - connections = self.search([ - ('last_connected', '!=', False), - ('state', 'in', ['stopped', 'connecting', 'error']) # Any non-connected state - ]) - - if not connections: - _logger.info("No connections to auto-start (no previously connected connections found)") - return - - _logger.info(f"Auto-starting {len(connections)} MQTT connections that were running before restart...") - - for connection in connections: - try: - _logger.info(f"Auto-starting connection: {connection.name}") - - from ..services.iot_bridge_service import IotBridgeService - service = IotBridgeService.get_instance(self.env) - - if service.start_connection(connection.id): - _logger.info(f"Successfully auto-started: {connection.name}") - else: - _logger.warning(f"Failed to auto-start: {connection.name}") - - except Exception as e: - _logger.error(f"Error auto-starting connection {connection.name}: {e}", exc_info=True) - - _logger.info("Auto-start completed") - - except Exception as e: - _logger.error(f"Error in auto_start_all_connections: {e}", exc_info=True) - # ========== CRUD Overrides ========== @api.ondelete(at_uninstall=False) def _unlink_if_not_connected(self): diff --git a/open_workshop_mqtt/models/mqtt_device.py b/open_workshop_mqtt/models/mqtt_device.py index af72b4e..cf81c9c 100644 --- a/open_workshop_mqtt/models/mqtt_device.py +++ b/open_workshop_mqtt/models/mqtt_device.py @@ -238,25 +238,10 @@ class MqttDevice(models.Model): """ Auto-subscribe when device is added to running connection or when topic_pattern changes + + DEPRECATED: Auto-subscribe disabled - use standalone IoT Bridge container """ - # Track changes that require re-subscription - needs_resubscribe = 'connection_id' in vals or 'topic_pattern' in vals or 'active' in vals - result = super().write(vals) - - if needs_resubscribe: - # Import here to avoid circular dependency - from ..services.iot_bridge_service import IoTBridgeService - - for device in self: - if device.active and device.connection_id.state == 'connected': - # Device is active and connection is running → subscribe - _logger.info(f"Auto-subscribing device {device.id} ({device.name}) to running connection") - IoTBridgeService.get_instance(self.env.registry, device.connection_id.database_name).subscribe_device( - device.connection_id.id, - device.id - ) - return result # ========== Default Values ========== diff --git a/open_workshop_mqtt/services/__init__.py b/open_workshop_mqtt/services/__init__.py deleted file mode 100644 index 0884300..0000000 --- a/open_workshop_mqtt/services/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -*- coding: utf-8 -*- - -from . import iot_bridge_service diff --git a/open_workshop_mqtt/services/iot_bridge_service.py b/open_workshop_mqtt/services/iot_bridge_service.py deleted file mode 100644 index 2f44e3b..0000000 --- a/open_workshop_mqtt/services/iot_bridge_service.py +++ /dev/null @@ -1,1112 +0,0 @@ -# -*- coding: utf-8 -*- -""" -IoT Bridge Service - MQTT Connection & Message Routing Manager -================================================================ - -ZWECK: ------- -Der IotBridgeService ist ein Singleton-Service, der als zentrale Schaltstelle zwischen -Odoo und MQTT-fähigen IoT-Geräten fungiert. Er verwaltet alle MQTT-Verbindungen und -leitet eingehende MQTT-Messages an die entsprechenden Parser und Session-Detektoren weiter. - -WIE FUNKTIONIERT EIN SERVICE IN ODOO 18: ------------------------------------------ -Im Gegensatz zu Odoo-Models (die direkt in der Datenbank gespeichert werden) ist ein -Service eine reine Python-Klasse, die: -- NICHT in der Datenbank persistiert wird -- Im RAM des Odoo-Prozesses läuft -- Als Singleton pro Datenbank existiert (eine Instanz pro DB) -- Nicht über das ORM (self.env['model.name']) zugegriffen wird -- Direkt über IotBridgeService.get_instance(env) instanziiert wird - -WANN WIRD DER SERVICE GESTARTET: ---------------------------------- -1. **Lazy Initialization**: Der Service wird NICHT beim Odoo-Start automatisch geladen, - sondern erst beim ersten Zugriff via get_instance(env). - -2. **Erste Verwendung**: Typischerweise beim ersten Start einer MQTT-Connection: - - User klickt "Start" Button in mqtt.connection Form - - mqtt_connection.start_connection() wird aufgerufen - - Diese Methode ruft IotBridgeService.get_instance(self.env) - - Wenn noch keine Instanz existiert → Service wird jetzt erstellt - -3. **Nach Registry Reload**: Wenn Odoo Module aktualisiert/installiert werden: - - Odoo lädt das Registry neu (alle Models werden neu geladen) - - Der Service erkennt den Registry-Wechsel - - Alte Instanz wird mit cleanup() aufgeräumt - - Neue Instanz wird erstellt - - Alle vorher laufenden Connections werden automatisch neu gestartet - -4. **Nach Container Restart**: - - Python-Prozess startet neu - - Service-Instanz existiert NICHT mehr (war nur im RAM) - - mqtt_connection._register_hook() wird beim Odoo-Start aufgerufen - - Recovery-Methoden werden ausgeführt (siehe unten) - - auto_start_all_connections() startet alle Connections neu - - Beim ersten Connection-Start wird der Service lazy initialisiert - -HAUPTAUFGABEN DES SERVICE: ---------------------------- -1. **MQTT Client Management**: - - Erstellt und verwaltet MqttClient-Instanzen (eine pro mqtt.connection) - - Speichert aktive Clients in self._clients Dict: {connection_id: MqttClient} - - Startet/Stoppt MQTT-Verbindungen auf Anforderung - - Überwacht Verbindungsstatus und Reconnects - -2. **Message Routing**: - - Empfängt MQTT-Messages via _on_message() Callback - - Findet passendes mqtt.device via Topic-Pattern-Matching - - Routet Message an zuständigen Parser (z.B. ShellyParser) - - Speichert geparsete Daten in mqtt.message Tabelle - -3. **Session Detection**: - - Erstellt SessionDetector pro Gerät (bei Bedarf) - - Speichert Detektoren in self._detectors Dict: {device_id: SessionDetector} - - Detektoren analysieren Message-Patterns und erkennen Sessions - - Erstellt/Beendet mqtt.session Records automatisch - -4. **Restart Recovery** (nach Container-Neustart): - - _reset_connection_states_after_restart(): - Setzt "zombie" Connections (state='connected' in DB, aber tot) auf 'stopped' - - _cleanup_stale_sessions_after_restart(): - Schließt offene Sessions mit end_reason='restart' oder 'timeout' - - _restore_detector_states(): - Stellt SessionDetector-States für laufende Sessions wieder her - - Diese Methoden werden von mqtt_connection._register_hook() BEFORE auto_start - aufgerufen, um sauberen Zustand zu garantieren. - -5. **Timeout Worker**: - - Background-Thread der alle 30 Sekunden läuft - - Prüft alle SessionDetector auf Message-Timeouts - - Beendet Sessions automatisch bei Inaktivität - -6. **Connection Callbacks**: - - _on_connect(): Wird bei MQTT-Connect aufgerufen, subscribt Topics - - _on_disconnect(): Wird bei MQTT-Disconnect aufgerufen, aktualisiert DB-State - - _on_message(): Wird bei eingehender MQTT-Message aufgerufen, routet zu Parser - -SINGLETON-PATTERN: ------------------- -Pro Datenbank existiert genau EINE Service-Instanz: -- _instances = {} speichert: {db_name: IotBridgeService} -- get_instance(env) prüft ob Instanz für env.cr.dbname existiert -- Falls nicht → neue Instanz wird erstellt -- Falls ja, aber Registry geändert → alte Instanz cleanup, neue erstellt -- Verhindert mehrfache MQTT-Connections zum gleichen Broker - -THREAD-SAFETY: --------------- -Der Service läuft in einem Multi-Threaded Environment: -- self._running_lock schützt self._clients Dict -- self._lock (class-level) schützt _instances Dict -- Callbacks (_on_message etc.) laufen in MQTT-Client-Threads! -- Jeder DB-Zugriff in Callbacks nutzt new_cursor() für Thread-Safety - -LEBENSZYKLUS-BEISPIEL: ----------------------- -1. Odoo startet → Service existiert noch nicht -2. User öffnet mqtt.connection Form, klickt "Start" -3. start_connection() → get_instance(env) → Service wird erstellt -4. Service erstellt MqttClient, speichert in self._clients -5. MqttClient verbindet zu Broker -6. Broker sendet Messages → _on_message() Callback -7. Service routet zu Parser → speichert in DB -8. User installiert neues Modul → Registry Reload -9. Service erkennt Reload → cleanup() alte Instanz -10. Neue Instanz wird erstellt → Connections neu gestartet -11. Docker Container restart → Python beendet -12. Odoo startet neu → _register_hook() → Recovery → auto_start -13. auto_start → get_instance() → Service lazy erstellt -14. Connections werden neu gestartet -""" - -import logging -import threading -from datetime import datetime -from typing import Dict, Optional -from odoo import api, SUPERUSER_ID - -from .mqtt_client import MqttClient -from .parsers.shelly_parser import ShellyParser -from .session_detector import SessionDetector - -_logger = logging.getLogger(__name__) - - -class IotBridgeService: - """ - Singleton service managing MQTT connections - One instance per Odoo environment - """ - - _instances: Dict[str, 'IotBridgeService'] = {} - _lock = threading.Lock() - - def __init__(self, env): - """ - Initialize IoT Bridge Service - - ACHTUNG: Diese Methode wird NICHT beim Odoo-Start aufgerufen! - Sie wird erst beim ersten get_instance() Aufruf ausgeführt (Lazy Init). - - Args: - env: Odoo environment (enthält DB-Connection, User-Context etc.) - """ - self.env = env - self.registry = env.registry - self.db_name = env.cr.dbname - - # Dictionary: connection_id (int) -> MqttClient instance - # Speichert alle aktiven MQTT-Verbindungen - self._clients: Dict[int, MqttClient] = {} - - # Dictionary: device_id (int) -> SessionDetector instance - # Speichert Session-Detektoren für Geräte mit laufenden Sessions - self._detectors: Dict[int, SessionDetector] = {} - - # Lock für Thread-Safety beim Zugriff auf self._clients - self._running_lock = threading.Lock() - - # MQTT Message Parser (aktuell nur Shelly-Format) - # TODO: Multi-Parser-Support für verschiedene Gerätetypen - self._parser = ShellyParser() - - # Timeout worker thread - self._timeout_worker_running = False - self._timeout_worker_thread = None - - _logger.info(f"IoT Bridge Service initialized for database '{self.db_name}'") - - # Start timeout worker - self._start_timeout_worker() - - # Note: Restart recovery is called from mqtt_connection._register_hook() - # to ensure zombie 'connected' states are cleaned before auto_start - - def cleanup(self): - """ - Cleanup all MQTT connections before registry reload - Called before instance is replaced - """ - _logger.info(f"Cleaning up IoT Bridge Service for '{self.db_name}' ({len(self._clients)} active connections)") - - # Stop timeout worker - self._stop_timeout_worker() - - with self._running_lock: - # Stop all MQTT clients - for connection_id in list(self._clients.keys()): - try: - client = self._clients[connection_id] - _logger.info(f"Stopping connection {connection_id} before reload") - client.disconnect() - del self._clients[connection_id] - except Exception as e: - _logger.error(f"Error stopping connection {connection_id} during cleanup: {e}") - - # Clear all detectors - self._detectors.clear() - - _logger.info(f"IoT Bridge Service cleanup completed for '{self.db_name}'") - - @classmethod - def get_instance(cls, env) -> 'IotBridgeService': - """ - Get singleton instance for this environment - - SINGLETON-PATTERN: Pro Datenbank existiert nur EINE Service-Instanz. - Diese Methode ist der EINZIGE Weg, eine IotBridgeService-Instanz zu erhalten. - - REGISTRY-RELOAD-DETECTION: - Wenn Odoo Module installiert/aktualisiert werden, wird das Registry neu geladen. - Diese Methode erkennt das und: - 1. Räumt die alte Instanz auf (stoppt alle MQTT-Connections) - 2. Erstellt eine neue Instanz mit dem neuen Registry - 3. Startet alle vorher laufenden Connections neu - - Args: - env: Odoo environment (aus self.env in Models) - - Returns: - IotBridgeService instance für diese Datenbank - - Beispiel-Nutzung: - from ..services.iot_bridge_service import IotBridgeService - service = IotBridgeService.get_instance(self.env) - service.start_connection(connection_id) - """ - db_name = env.cr.dbname - - with cls._lock: - # Check if registry has changed (reload happened) - registry_changed = False - if db_name in cls._instances: - old_instance = cls._instances[db_name] - if old_instance.registry != env.registry: - # Registry changed! Cleanup old instance - _logger.warning(f"Registry reload detected for '{db_name}' - cleaning up old instance") - old_instance.cleanup() - del cls._instances[db_name] - registry_changed = True - - if db_name not in cls._instances: - cls._instances[db_name] = cls(env) - - # If registry changed, restart connections - if registry_changed: - _logger.info(f"Restarting MQTT connections after registry reload") - cls._instances[db_name]._restart_connections_after_reload() - - return cls._instances[db_name] - - def _restart_connections_after_reload(self): - """ - Restart all connections that were running before registry reload - Called automatically after registry reload is detected - """ - try: - # Use fresh cursor - with self.registry.cursor() as cr: - env = api.Environment(cr, SUPERUSER_ID, {}) - - # Find connections that should be running - Connection = env['mqtt.connection'] - connections = Connection.search([ - ('last_connected', '!=', False), - ]) - - if not connections: - _logger.info("No connections to restart after reload") - return - - _logger.info(f"Restarting {len(connections)} MQTT connections after reload...") - - for connection in connections: - try: - _logger.info(f"Restarting connection: {connection.name}") - self.start_connection(connection.id) - except Exception as e: - _logger.error(f"Error restarting connection {connection.name}: {e}") - - cr.commit() - _logger.info(f"Successfully restarted {len(connections)} connections after reload") - - except Exception as e: - _logger.error(f"Error in _restart_connections_after_reload: {e}", exc_info=True) - - def start_connection(self, connection_id: int) -> bool: - """ - Start MQTT connection for given connection_id - - Args: - connection_id: ID of mqtt.connection record - - Returns: - bool: True if connection started successfully - """ - with self._running_lock: - # Check if already running - if connection_id in self._clients: - _logger.warning(f"Connection {connection_id} is already running") - return False - - try: - # Use a fresh cursor to read connection data - conn_data = None - with self.registry.cursor() as new_cr: - env = api.Environment(new_cr, SUPERUSER_ID, {}) - return self._start_connection_internal(connection_id, env, use_new_cursor=True) - except Exception as e: - _logger.error(f"Error starting connection {connection_id}: {e}", exc_info=True) - return False - - def start_connection_with_env(self, connection_id: int, env) -> bool: - """ - Start MQTT connection using existing environment (for tests) - - Args: - connection_id: ID of mqtt.connection record - env: Odoo environment to use - - Returns: - bool: True if connection started successfully - """ - with self._running_lock: - # Check if already running - if connection_id in self._clients: - _logger.warning(f"Connection {connection_id} is already running") - return False - - try: - return self._start_connection_internal(connection_id, env, use_new_cursor=False) - except Exception as e: - _logger.error(f"Error starting connection {connection_id}: {e}", exc_info=True) - return False - - def _start_connection_internal(self, connection_id: int, env, use_new_cursor: bool) -> bool: - """ - Internal method to start MQTT connection - - Args: - connection_id: ID of mqtt.connection record - env: Environment to use - use_new_cursor: Whether to use new cursors for updates - - Returns: - bool: True if connection started successfully - """ - # Load connection record - connection = env['mqtt.connection'].browse(connection_id) - if not connection.exists(): - _logger.error(f"Connection {connection_id} not found") - return False - - _logger.info(f"Starting MQTT connection: {connection.name}") - - # Store connection data - conn_data = { - 'id': connection.id, - 'host': connection.host, - 'port': int(connection.port), - 'client_id': connection.client_id, - 'username': connection.username or None, - 'password': connection.password or None, - 'use_tls': connection.use_tls, - 'verify_cert': connection.verify_cert, - 'ca_cert_path': connection.ca_cert_path or None, - 'auto_reconnect': connection.auto_reconnect, - 'reconnect_delay_min': connection.reconnect_delay_min, - 'reconnect_delay_max': connection.reconnect_delay_max, - } - - # Create MQTT client - client = MqttClient( - connection_id=conn_data['id'], - host=conn_data['host'], - port=conn_data['port'], - client_id=conn_data['client_id'], - username=conn_data['username'], - password=conn_data['password'], - use_tls=conn_data['use_tls'], - verify_cert=conn_data['verify_cert'], - ca_cert_path=conn_data['ca_cert_path'], - auto_reconnect=conn_data['auto_reconnect'], - reconnect_delay_min=conn_data['reconnect_delay_min'], - reconnect_delay_max=conn_data['reconnect_delay_max'], - on_message_callback=self._on_message, - on_connect_callback=self._on_connect, - on_disconnect_callback=self._on_disconnect, - ) - - # Connect - if client.connect(): - self._clients[connection_id] = client - - # NOTE: Subscription happens in _on_connect() callback! - # This ensures topics are subscribed both on initial connect AND on reconnects. - # We do NOT subscribe here to avoid duplicate subscriptions. - - # Update connection state - def update_state(): - if use_new_cursor: - with self.registry.cursor() as new_cr: - env = api.Environment(new_cr, SUPERUSER_ID, {}) - conn = env['mqtt.connection'].browse(connection_id) - conn.write({ - 'state': 'connecting', - 'last_error': False, - }) - new_cr.commit() - else: - connection.write({ - 'state': 'connecting', - 'last_error': False, - }) - - update_state() - return True - else: - _logger.error(f"Failed to connect client for connection {connection_id}") - - def update_error(): - if use_new_cursor: - with self.registry.cursor() as new_cr: - env = api.Environment(new_cr, SUPERUSER_ID, {}) - conn = env['mqtt.connection'].browse(connection_id) - conn.write({ - 'state': 'error', - 'last_error': 'Failed to initiate connection', - }) - new_cr.commit() - else: - connection.write({ - 'state': 'error', - 'last_error': 'Failed to initiate connection', - }) - - update_error() - return False - - def _subscribe_device_topics_with_env(self, connection_id, env): - """Subscribe to device topics using existing environment""" - client = self._clients.get(connection_id) - if not client: - return - - # Load devices - devices = env['mqtt.device'].search([ - ('connection_id', '=', connection_id), - ('active', '=', True) - ]) - - for device in devices: - try: - client.subscribe(device.topic_pattern) - _logger.info(f"Subscribed to topic: {device.topic_pattern}") - except Exception as e: - _logger.error(f"Failed to subscribe to {device.topic_pattern}: {e}") - - def stop_connection(self, connection_id: int) -> bool: - """ - Stop MQTT connection - - Args: - connection_id: ID of mqtt.connection record - - Returns: - bool: True if connection stopped successfully - """ - with self._running_lock: - if connection_id not in self._clients: - _logger.warning(f"Connection {connection_id} is not running") - return False - - try: - _logger.info(f"Stopping MQTT connection {connection_id}") - - # Disconnect client - client = self._clients[connection_id] - client.disconnect() - - # Remove from active clients - del self._clients[connection_id] - - # Update connection state - with self.registry.cursor() as new_cr: - env = api.Environment(new_cr, SUPERUSER_ID, {}) - connection = env['mqtt.connection'].browse(connection_id) - connection.write({ - 'state': 'stopped', - }) - new_cr.commit() - - return True - - except Exception as e: - _logger.error(f"Error stopping connection {connection_id}: {e}", exc_info=True) - return False - - def _subscribe_device_topics(self, connection_id): - """ - Subscribe to all active device topics for this connection - - Args: - connection_id: ID of mqtt.connection - """ - client = self._clients.get(connection_id) - if not client: - return - - # Use fresh cursor to load devices - with self.registry.cursor() as new_cr: - env = api.Environment(new_cr, SUPERUSER_ID, {}) - connection = env['mqtt.connection'].browse(connection_id) - - for device in connection.device_ids.filtered('active'): - try: - client.subscribe(device.topic_pattern, qos=0) - _logger.info(f"Subscribed to device topic: {device.topic_pattern}") - except Exception as e: - _logger.error(f"Failed to subscribe to {device.topic_pattern}: {e}") - - new_cr.commit() - - def subscribe_device(self, connection_id, device_id): - """ - Subscribe to a single device's topic (used when device is added to running connection) - - Args: - connection_id: ID of mqtt.connection - device_id: ID of mqtt.device to subscribe - """ - client = self._clients.get(connection_id) - if not client: - _logger.warning(f"Cannot subscribe device {device_id}: Connection {connection_id} not running") - return - - # Use fresh cursor to load device - with self.registry.cursor() as new_cr: - env = api.Environment(new_cr, SUPERUSER_ID, {}) - device = env['mqtt.device'].browse(device_id) - - if not device.exists() or not device.active: - _logger.warning(f"Device {device_id} does not exist or is inactive") - return - - try: - client.subscribe(device.topic_pattern, qos=0) - _logger.info(f"Auto-subscribed device {device_id} ({device.name}) to topic: {device.topic_pattern}") - except Exception as e: - _logger.error(f"Failed to auto-subscribe device {device_id} to {device.topic_pattern}: {e}") - - new_cr.commit() - - # ========== MQTT Callbacks ========== - # Diese Methoden werden vom MqttClient aufgerufen (in separatem Thread!) - # Wichtig: Jeder DB-Zugriff benötigt new_cursor() für Thread-Safety - - def _on_connect(self, connection_id: int): - """ - Callback: Wird aufgerufen wenn MQTT-Verbindung (neu) hergestellt wird - - WICHTIG: Diese Methode wird in folgenden Fällen aufgerufen: - 1. Beim initialen Connect (User klickt "Start") - 2. Bei jedem automatischen Reconnect nach Verbindungsabbruch - 3. Nach Container-Restart beim auto_start - - TOPIC-SUBSCRIPTION: - Dies ist die EINZIGE Stelle wo Topics subscribed werden! - Warum? Um doppelte Subscriptions zu vermeiden, die zu duplizierten Messages führen. - - THREAD-KONTEXT: - Diese Methode läuft im MQTT-Client-Thread, NICHT im Odoo-Main-Thread! - Deshalb: new_cursor() für jeden DB-Zugriff. - - Args: - connection_id: ID des mqtt.connection Records - """ - _logger.info(f"MQTT connection {connection_id} established") - - try: - # Update connection state and subscribe to device topics - from odoo import fields - from odoo.api import Environment - - with self.registry.cursor() as new_cr: - new_env = Environment(new_cr, SUPERUSER_ID, {}) - connection = new_env['mqtt.connection'].browse(connection_id) - - # Subscribe to all active device topics - client = self._clients.get(connection_id) - if client: - for device in connection.device_ids.filtered('active'): - try: - # The subscribe() method in MqttClient checks if already subscribed - client.subscribe(device.topic_pattern, qos=0) - _logger.info(f"Subscribed to device topic: {device.topic_pattern}") - except Exception as e: - _logger.error(f"Failed to subscribe to {device.topic_pattern}: {e}") - - # Update connection state - connection.write({ - 'state': 'connected', - 'last_connected': fields.Datetime.now(), - 'last_error': False, - }) - new_cr.commit() - except Exception as e: - _logger.error(f"Failed to update connection state: {e}", exc_info=True) - - def _on_disconnect(self, connection_id: int, rc: int): - """ - Callback: Wird aufgerufen wenn MQTT-Verbindung getrennt wird - - WANN WIRD DIESE METHODE AUFGERUFEN: - 1. Bei manuellem Disconnect (User klickt "Stop") - 2. Bei Netzwerk-/Broker-Problemen (automatischer Disconnect) - 3. Bei Broker-Shutdown - - WICHTIG: Wird NICHT aufgerufen bei: - - Container-Restart (Python-Prozess wird abrupt beendet) - - Odoo-Shutdown (keine Zeit für sauberen Disconnect) - - Deshalb: Recovery nach Restart in _reset_connection_states_after_restart() - - RECONNECT: - Wenn auto_reconnect=True, versucht MqttClient automatisch neu zu verbinden. - Dann wird _on_connect() wieder aufgerufen. - - THREAD-KONTEXT: - Läuft im MQTT-Client-Thread → new_cursor() für DB-Zugriff - - Args: - connection_id: ID des mqtt.connection Records - rc: Return code (0 = clean disconnect, >0 = unexpected disconnect) - """ - _logger.warning(f"MQTT connection {connection_id} disconnected (rc={rc})") - - try: - # Update connection state in database - from odoo.api import Environment - - with self.registry.cursor() as new_cr: - new_env = Environment(new_cr, SUPERUSER_ID, {}) - connection = new_env['mqtt.connection'].browse(connection_id) - - if rc == 0: - # Clean disconnect - connection.write({'state': 'stopped'}) - else: - # Unexpected disconnect - connection.write({ - 'state': 'connecting', # Will reconnect automatically - 'last_error': f'Unexpected disconnect (code {rc})', - }) - - new_cr.commit() - except Exception as e: - _logger.error(f"Failed to update connection state: {e}", exc_info=True) - - def _on_message(self, connection_id: int, topic: str, payload: str, qos: int, retain: bool): - """ - Callback: Wird aufgerufen wenn MQTT-Message empfangen wird - - WICHTIGSTE METHODE! Hier passiert die gesamte Message-Verarbeitung: - - ABLAUF: - 1. Message in mqtt.message Tabelle speichern (für Logging/Debugging) - 2. Passende mqtt.device finden via Topic-Pattern-Matching - 3. Wenn Device gefunden: - a) Payload parsen (via ShellyParser oder anderen Parser) - b) Session Detection: Message an SessionDetector weiterleiten - c) SessionDetector analysiert ob Session läuft/startet/endet - - THREAD-KONTEXT: - Diese Methode wird vom MQTT-Client-Thread aufgerufen, NICHT vom Odoo-Thread! - Jede Sekunde können dutzende Messages eintreffen → Performance wichtig! - - WARUM new_cursor(): - - Odoo-Cursor sind nicht Thread-Safe - - Jeder DB-Zugriff aus diesem Thread braucht eigenen Cursor - - Cursor wird nach commit() automatisch geschlossen - - Args: - connection_id: ID des mqtt.connection Records - topic: MQTT Topic (z.B. "shaperorigin/status/pm1:0") - payload: Message-Inhalt (meist JSON-String) - qos: Quality of Service (0, 1 oder 2) - retain: Retained-Flag vom Broker - """ - _logger.debug(f"Message received on connection {connection_id}, topic {topic}") - - try: - # Store message in database - from odoo.api import Environment - - with self.registry.cursor() as new_cr: - new_env = Environment(new_cr, SUPERUSER_ID, {}) - - # Find matching device - device = self._find_device_for_topic(new_env, connection_id, topic) - - # Create message record - new_env['mqtt.message'].create({ - 'connection_id': connection_id, - 'device_id': device.id if device else False, - 'topic': topic, - 'payload': payload, - 'qos': qos, - 'retain': retain, - 'direction': 'inbound', - }) - - new_cr.commit() - - # Process message for session detection - if device: - self._process_session(new_env, device, topic, payload) - new_cr.commit() - - except Exception as e: - _logger.error(f"Error processing message: {e}", exc_info=True) - - def _find_device_for_topic(self, env, connection_id: int, topic: str): - """ - Find device matching the MQTT topic - - Implements MQTT topic pattern matching: - - # wildcard: matches multiple levels (a/# matches a/b, a/b/c) - - + wildcard: matches single level (a/+/c matches a/b/c but not a/b/d/c) - - Exact match: no wildcards - - Args: - env: Odoo environment - connection_id: Connection ID - topic: MQTT topic from incoming message - - Returns: - mqtt.device record or False - """ - devices = env['mqtt.device'].search([ - ('connection_id', '=', connection_id), - ('active', '=', True), - ]) - - for device in devices: - if self._mqtt_topic_matches(device.topic_pattern, topic): - return device - - return False - - def _mqtt_topic_matches(self, pattern: str, topic: str) -> bool: - """ - Check if MQTT topic matches pattern - - MQTT Wildcard rules: - - # must be last character and matches any number of levels - - + matches exactly one level - - Exact match otherwise - - Examples: - pattern='sensor/#' topic='sensor/temp' -> True - pattern='sensor/#' topic='sensor/temp/1' -> True - pattern='sensor/+/temp' topic='sensor/1/temp' -> True - pattern='sensor/+/temp' topic='sensor/1/2/temp' -> False - pattern='sensor/temp' topic='sensor/temp' -> True - - Args: - pattern: MQTT topic pattern (can contain # or +) - topic: Actual MQTT topic from message - - Returns: - bool: True if topic matches pattern - """ - # Exact match (no wildcards) - if pattern == topic: - return True - - # Multi-level wildcard # (must be at end) - if pattern.endswith('/#'): - prefix = pattern[:-2] # Remove /# - # Topic must start with prefix and have / after it - return topic.startswith(prefix + '/') or topic == prefix - - if pattern.endswith('#') and '/' not in pattern: - # Pattern is just '#' - matches everything - return True - - # Single-level wildcard + - if '+' in pattern: - pattern_parts = pattern.split('/') - topic_parts = topic.split('/') - - # Must have same number of levels - if len(pattern_parts) != len(topic_parts): - return False - - # Check each level - for p, t in zip(pattern_parts, topic_parts): - if p == '+': - # + matches any single level - continue - elif p == '#': - # # can only be at end, but we check for + here - # This shouldn't happen if pattern is valid - return False - elif p != t: - # Exact match required - return False - - return True - - # No match - return False - - def get_client(self, connection_id: int) -> Optional[MqttClient]: - """ - Get MQTT client for connection - - Args: - connection_id: Connection ID - - Returns: - MqttClient or None - """ - return self._clients.get(connection_id) - - def is_running(self, connection_id: int) -> bool: - """ - Check if connection is running - - Args: - connection_id: Connection ID - - Returns: - bool: True if running - """ - return connection_id in self._clients - - def _process_session(self, env, device, topic: str, payload: str): - """Process session using SessionDetector state machine""" - try: - _logger.info(f"🔧 _process_session called for device {device.name}, topic: {topic}") - - # Parse message - parsed = self._parser.parse_message(topic, payload) - if not parsed: - _logger.warning(f"⚠️ Parser returned None for topic {topic}, payload: {payload[:100]}") - return - - _logger.debug(f"✓ Parsed: {parsed}") - - power = self._parser.get_power_value(parsed) - if power is None: - _logger.warning(f"⚠️ No power value in parsed data: {parsed}") - return - - _logger.info(f"✓ Extracted power: {power}W from device {device.name}") - - # Update device status - timestamp = datetime.now() - _logger.debug(f"About to write device status: last_message_time={timestamp}, last_power_w={power}") - try: - device.write({ - 'last_message_time': timestamp, - 'last_power_w': power, - }) - _logger.debug(f"✓ Device status updated") - except Exception as write_error: - _logger.error(f"❌ device.write() FAILED: {write_error}", exc_info=True) - - # Get or create detector for this device - detector = self._get_or_create_detector(device) - _logger.info(f"✓ Got detector for device {device.name}, calling process_power_event({power}, {timestamp})") - - # Process power event through state machine (pass env!) - detector.process_power_event(env, power, timestamp) - _logger.info(f"✓ process_power_event completed") - - except Exception as e: - _logger.error(f"Session processing error for device {device.name}: {e}", exc_info=True) - - def _get_or_create_detector(self, device) -> SessionDetector: - """Get or create SessionDetector for device""" - if device.id not in self._detectors: - self._detectors[device.id] = SessionDetector(device.id, device.name) - return self._detectors[device.id] - - def _start_timeout_worker(self): - """Start background thread to check for session timeouts""" - if self._timeout_worker_running: - return - - self._timeout_worker_running = True - self._timeout_worker_thread = threading.Thread( - target=self._timeout_worker_loop, - daemon=True, - name=f"SessionTimeout-{self.db_name}" - ) - self._timeout_worker_thread.start() - _logger.info(f"Session timeout worker started for database '{self.db_name}'") - - def _stop_timeout_worker(self): - """Stop timeout worker thread""" - if not self._timeout_worker_running: - return - - _logger.info(f"Stopping session timeout worker for '{self.db_name}'") - self._timeout_worker_running = False - - if self._timeout_worker_thread: - self._timeout_worker_thread.join(timeout=2.0) - self._timeout_worker_thread = None - - def _timeout_worker_loop(self): - """Worker loop to check for session timeouts every second""" - import time - - while self._timeout_worker_running: - try: - # Check timeouts with new cursor - with self.registry.cursor() as new_cr: - new_env = api.Environment(new_cr, SUPERUSER_ID, {}) - current_time = datetime.now() - - # Check timeout for each detector - with self._running_lock: - for device_id, detector in list(self._detectors.items()): - try: - # Refresh device in new environment - device = new_env['mqtt.device'].browse(device_id) - if not device.exists(): - # Device was deleted - del self._detectors[device_id] - continue - - # Check timeout (pass env) - detector.check_timeout(new_env, current_time) - except Exception as e: - _logger.error(f"Error checking timeout for device {device_id}: {e}") - - new_cr.commit() - - # Sleep 1 second - time.sleep(1.0) - - except Exception as e: - _logger.error(f"Error in timeout worker loop: {e}", exc_info=True) - time.sleep(1.0) - - _logger.info(f"Session timeout worker stopped for '{self.db_name}'") - - def _restore_detector_states(self): - """Restore detector states from running sessions in DB""" - try: - with self.registry.cursor() as new_cr: - new_env = api.Environment(new_cr, SUPERUSER_ID, {}) - - # Find all devices with running sessions - running_sessions = new_env['mqtt.session'].search([ - ('status', '=', 'running') - ]) - - if not running_sessions: - _logger.info(f"No running sessions to restore for '{self.db_name}'") - return - - _logger.info(f"Restoring {len(running_sessions)} running session(s) for '{self.db_name}'") - - # Group sessions by device to avoid creating multiple detectors per device - devices_restored = set() - - for session in running_sessions: - try: - device = session.device_id - if not device or not device.exists(): - continue - - # Skip if we already restored this device - if device.id in devices_restored: - _logger.debug(f"Detector already restored for device {device.name}, skipping") - continue - - # Create detector and restore state - detector = SessionDetector(device.id, device.name) - detector.restore_state_from_db(new_env) - self._detectors[device.id] = detector - devices_restored.add(device.id) - - _logger.info(f"Restored detector for device {device.name} (state={detector.state})") - except Exception as e: - _logger.error(f"Error restoring detector for session {session.id}: {e}") - - new_cr.commit() - - except Exception as e: - _logger.error(f"Error in _restore_detector_states: {e}", exc_info=True) - - def _reset_connection_states_after_restart(self): - """ - Reset connection states after container restart. - After restart, connections show 'connected' in DB but real connections are gone. - """ - try: - with self.registry.cursor() as new_cr: - new_env = api.Environment(new_cr, SUPERUSER_ID, {}) - Connection = new_env['mqtt.connection'] - - # Find connections with state='connected' - connected = Connection.search([('state', '=', 'connected')]) - if connected: - _logger.info(f"Resetting {len(connected)} connection states after Odoo restart") - connected.write({'state': 'stopped'}) - new_cr.commit() - - except Exception as e: - _logger.error(f"Error in _reset_connection_states_after_restart: {e}", exc_info=True) - - def _cleanup_stale_sessions_after_restart(self): - """ - Close running sessions after container restart. - - Sessions are closed with end_reason='restart' and end_time=last_message_time - because the container crash interrupted them at that point. - - Very old sessions (>session_cleanup_hours) are marked as 'timeout' instead, - as they were likely already dead before the restart. - """ - try: - from datetime import datetime, timedelta - - with self.registry.cursor() as new_cr: - new_env = api.Environment(new_cr, SUPERUSER_ID, {}) - Session = new_env['mqtt.session'] - - # Find all running sessions - running_sessions = Session.search([('status', '=', 'running')]) - - if not running_sessions: - return - - cleaned_count = 0 - timeout_count = 0 - current_time = datetime.now() - - for session in running_sessions: - # Get cleanup timeout from connection (default 24h if not set) - cleanup_hours = 24 - if session.device_id and session.device_id.connection_id: - cleanup_hours = session.device_id.connection_id.session_cleanup_hours or 24 - - if session.last_message_time: - timeout_threshold = current_time - timedelta(hours=cleanup_hours) - time_diff = (current_time - session.last_message_time).total_seconds() / 3600 - - if session.last_message_time < timeout_threshold: - # Very old session - was likely dead before restart - _logger.warning( - f"Closing stale session {session.session_id} as timeout " - f"(last message {time_diff:.1f}h ago, threshold {cleanup_hours}h)" - ) - session.write({ - 'status': 'completed', - 'end_reason': 'timeout', - 'end_time': session.last_message_time # End at last known activity - }) - timeout_count += 1 - else: - # Recent session - was active when container crashed - _logger.info( - f"Closing session {session.session_id} due to restart " - f"(last message {time_diff:.1f}h ago)" - ) - session.write({ - 'status': 'completed', - 'end_reason': 'restart', # True reason is container restart - 'end_time': session.last_message_time # End at last known activity - }) - cleaned_count += 1 - else: - # No last_message_time - very old session - _logger.warning( - f"Closing session {session.session_id} as timeout " - f"(no last_message_time recorded)" - ) - session.write({ - 'status': 'completed', - 'end_reason': 'timeout', - 'end_time': current_time # No better timestamp available - }) - timeout_count += 1 - - if cleaned_count > 0 or timeout_count > 0: - _logger.info( - f"Closed {cleaned_count} session(s) due to restart, " - f"{timeout_count} stale session(s) as timeout" - ) - - new_cr.commit() - - except Exception as e: - _logger.error(f"Error in _cleanup_stale_sessions_after_restart: {e}", exc_info=True) diff --git a/open_workshop_mqtt/services/mqtt_client.py b/open_workshop_mqtt/services/mqtt_client.py deleted file mode 100644 index ba7bb54..0000000 --- a/open_workshop_mqtt/services/mqtt_client.py +++ /dev/null @@ -1,389 +0,0 @@ -# -*- coding: utf-8 -*- -""" -MQTT Client mit Auto-Reconnect und State Recovery -Migriert von python_prototype/mqtt_client.py (M5) -""" - -import paho.mqtt.client as mqtt -import ssl -import time -import logging -import threading -from typing import Optional, Callable, Dict, Any - -_logger = logging.getLogger(__name__) - - -class MqttClient: - """Enhanced MQTT Client with auto-reconnect and state recovery""" - - def __init__( - self, - connection_id: int, - host: str, - port: int, - client_id: str, - username: Optional[str] = None, - password: Optional[str] = None, - use_tls: bool = True, - verify_cert: bool = False, - ca_cert_path: Optional[str] = None, - auto_reconnect: bool = True, - reconnect_delay_min: int = 1, - reconnect_delay_max: int = 60, - on_message_callback: Optional[Callable] = None, - on_connect_callback: Optional[Callable] = None, - on_disconnect_callback: Optional[Callable] = None, - ): - """ - Initialize MQTT Client - - Args: - connection_id: Database ID of mqtt.connection record - host: MQTT broker hostname - port: MQTT broker port - client_id: MQTT client identifier - username: Authentication username - password: Authentication password - use_tls: Enable TLS/SSL encryption - verify_cert: Verify SSL certificate - ca_cert_path: Path to custom CA certificate - auto_reconnect: Enable automatic reconnection - reconnect_delay_min: Minimum reconnect delay (seconds) - reconnect_delay_max: Maximum reconnect delay (seconds) - on_message_callback: Callback for received messages - on_connect_callback: Callback when connected - on_disconnect_callback: Callback when disconnected - """ - self.connection_id = connection_id - self.host = host - self.port = port - self.client_id = client_id - self.username = username - self.password = password - self.use_tls = use_tls - self.verify_cert = verify_cert - self.ca_cert_path = ca_cert_path - self.auto_reconnect = auto_reconnect - self.reconnect_delay_min = reconnect_delay_min - self.reconnect_delay_max = reconnect_delay_max - - # Callbacks - self.on_message_callback = on_message_callback - self.on_connect_callback = on_connect_callback - self.on_disconnect_callback = on_disconnect_callback - - # State - self._client: Optional[mqtt.Client] = None - self._running = False - self._connected = False - self._reconnect_thread: Optional[threading.Thread] = None - self._subscriptions: Dict[str, int] = {} # topic -> qos - - # Reconnect state - self._reconnect_delay = reconnect_delay_min - self._reconnect_attempt = 0 - - _logger.info(f"MqttClient initialized for connection {connection_id}: {host}:{port}") - - def connect(self) -> bool: - """ - Connect to MQTT broker - - Returns: - bool: True if connection initiated successfully - """ - if self._running: - _logger.warning(f"Client already running for connection {self.connection_id}") - return False - - try: - _logger.info(f"Connecting to MQTT broker: {self.host}:{self.port}") - - # Create MQTT client - self._client = mqtt.Client( - client_id=self.client_id, - protocol=mqtt.MQTTv5 - ) - - # Set callbacks - self._client.on_connect = self._on_connect - self._client.on_disconnect = self._on_disconnect - self._client.on_message = self._on_message - - # Configure authentication - if self.username: - self._client.username_pw_set(self.username, self.password or '') - - # Configure TLS/SSL - if self.use_tls: - tls_context = ssl.create_default_context() - - if not self.verify_cert: - tls_context.check_hostname = False - tls_context.verify_mode = ssl.CERT_NONE - _logger.warning(f"SSL certificate verification disabled") - - if self.ca_cert_path: - tls_context.load_verify_locations(cafile=self.ca_cert_path) - _logger.info(f"Loaded CA certificate from {self.ca_cert_path}") - - self._client.tls_set_context(tls_context) - - # Connect - self._client.connect(self.host, self.port, keepalive=60) - - # Start network loop - self._client.loop_start() - self._running = True - - _logger.info(f"MQTT connection initiated for {self.connection_id}") - return True - - except Exception as e: - _logger.error(f"Failed to connect: {e}", exc_info=True) - self._running = False - return False - - def disconnect(self): - """Disconnect from MQTT broker""" - _logger.info(f"Disconnecting MQTT client {self.connection_id}") - - self._running = False - self._connected = False - - if self._client: - try: - # Stop loop first (non-blocking) - self._client.loop_stop() - - # Disconnect with short timeout to avoid hanging - self._client.disconnect() - - # Give it a moment but don't wait forever - import time - time.sleep(0.1) - - except Exception as e: - _logger.error(f"Error during disconnect: {e}") - finally: - self._client = None - - _logger.info(f"MQTT client {self.connection_id} disconnected") - - def subscribe(self, topic: str, qos: int = 0) -> bool: - """ - Subscribe to MQTT topic - - Args: - topic: MQTT topic pattern - qos: Quality of Service (0, 1, or 2) - - Returns: - bool: True if subscription successful - """ - # Check if already subscribed to avoid duplicate subscriptions - if topic in self._subscriptions: - _logger.debug(f"Already subscribed to {topic}, skipping") - return True - - if not self._connected: - _logger.warning(f"Cannot subscribe - not connected") - # Store subscription for later (will be restored on reconnect) - self._subscriptions[topic] = qos - return False - - try: - result, mid = self._client.subscribe(topic, qos) - if result == mqtt.MQTT_ERR_SUCCESS: - self._subscriptions[topic] = qos - _logger.info(f"Subscribed to topic: {topic} (QoS {qos})") - return True - else: - _logger.error(f"Failed to subscribe to {topic}: {result}") - return False - except Exception as e: - _logger.error(f"Error subscribing to {topic}: {e}") - return False - - def unsubscribe(self, topic: str) -> bool: - """ - Unsubscribe from MQTT topic - - Args: - topic: MQTT topic pattern - - Returns: - bool: True if unsubscription successful - """ - if topic in self._subscriptions: - del self._subscriptions[topic] - - if not self._connected: - return True - - try: - result, mid = self._client.unsubscribe(topic) - if result == mqtt.MQTT_ERR_SUCCESS: - _logger.info(f"Unsubscribed from topic: {topic}") - return True - else: - _logger.error(f"Failed to unsubscribe from {topic}: {result}") - return False - except Exception as e: - _logger.error(f"Error unsubscribing from {topic}: {e}") - return False - - def publish(self, topic: str, payload: str, qos: int = 0, retain: bool = False) -> bool: - """ - Publish message to MQTT topic - - Args: - topic: MQTT topic - payload: Message payload - qos: Quality of Service - retain: Retain message on broker - - Returns: - bool: True if publish successful - """ - if not self._connected: - _logger.warning(f"Cannot publish - not connected") - return False - - try: - result = self._client.publish(topic, payload, qos, retain) - if result.rc == mqtt.MQTT_ERR_SUCCESS: - _logger.debug(f"Published to {topic}: {payload[:100]}") - return True - else: - _logger.error(f"Failed to publish to {topic}: {result.rc}") - return False - except Exception as e: - _logger.error(f"Error publishing to {topic}: {e}") - return False - - # ========== Internal Callbacks ========== - - def _on_connect(self, client, userdata, flags, rc, properties=None): - """Callback when connection is established""" - if rc == 0: - self._connected = True - self._reconnect_delay = self.reconnect_delay_min - self._reconnect_attempt = 0 - - _logger.info(f"Connected to MQTT broker: {self.host}:{self.port}") - - # NOTE: We do NOT restore subscriptions here! - # Subscriptions are handled by the on_connect_callback in IotBridgeService._on_connect() - # which is called below. This avoids duplicate subscriptions. - # The _subscriptions dict is only used for tracking, not for restore. - - # Call external callback (this will subscribe to topics) - if self.on_connect_callback: - try: - self.on_connect_callback(self.connection_id) - except Exception as e: - _logger.error(f"Error in connect callback: {e}") - else: - error_messages = { - 1: 'Connection refused - incorrect protocol version', - 2: 'Connection refused - invalid client identifier', - 3: 'Connection refused - bad username or password', - 4: 'Connection refused - not authorized', - 5: 'Connection refused - not authorized', - } - error_msg = error_messages.get(rc, f'Connection refused - code {rc}') - _logger.error(f"Connection failed: {error_msg}") - - # Trigger reconnect if auto-reconnect is enabled - if self.auto_reconnect and self._running: - self._schedule_reconnect() - - def _on_disconnect(self, client, userdata, rc, properties=None): - """Callback when disconnected from broker""" - self._connected = False - - if rc == 0: - _logger.info(f"Disconnected from MQTT broker (clean)") - else: - _logger.warning(f"Unexpected disconnect from MQTT broker (rc={rc})") - - # Call external callback - if self.on_disconnect_callback: - try: - self.on_disconnect_callback(self.connection_id, rc) - except Exception as e: - _logger.error(f"Error in disconnect callback: {e}") - - # Trigger reconnect if auto-reconnect is enabled and disconnect was unexpected - if rc != 0 and self.auto_reconnect and self._running: - self._schedule_reconnect() - - def _on_message(self, client, userdata, msg): - """Callback when message is received""" - try: - _logger.info(f"📨 MQTT Message received on {msg.topic}: {msg.payload[:100]}") - - # Call external callback - if self.on_message_callback: - self.on_message_callback( - connection_id=self.connection_id, - topic=msg.topic, - payload=msg.payload.decode('utf-8'), - qos=msg.qos, - retain=msg.retain - ) - except Exception as e: - _logger.error(f"Error processing message: {e}", exc_info=True) - - def _schedule_reconnect(self): - """Schedule reconnection attempt with exponential backoff""" - if not self.auto_reconnect or not self._running: - return - - # Don't schedule if reconnect thread is already running - if self._reconnect_thread and self._reconnect_thread.is_alive(): - return - - self._reconnect_attempt += 1 - delay = min(self._reconnect_delay, self.reconnect_delay_max) - - _logger.info(f"Scheduling reconnect attempt {self._reconnect_attempt} in {delay}s...") - - self._reconnect_thread = threading.Thread(target=self._reconnect_worker, args=(delay,)) - self._reconnect_thread.daemon = True - self._reconnect_thread.start() - - def _reconnect_worker(self, delay: int): - """Worker thread for reconnection""" - time.sleep(delay) - - if not self._running: - return - - _logger.info(f"Attempting to reconnect (attempt {self._reconnect_attempt})...") - - try: - if self._client: - self._client.reconnect() - - # Exponential backoff - self._reconnect_delay = min(self._reconnect_delay * 2, self.reconnect_delay_max) - - except Exception as e: - _logger.error(f"Reconnect failed: {e}") - - # Schedule next attempt - if self.auto_reconnect and self._running: - self._schedule_reconnect() - - @property - def is_connected(self) -> bool: - """Check if client is connected""" - return self._connected - - @property - def is_running(self) -> bool: - """Check if client is running""" - return self._running diff --git a/open_workshop_mqtt/services/parsers/__init__.py b/open_workshop_mqtt/services/parsers/__init__.py deleted file mode 100644 index 9f402e7..0000000 --- a/open_workshop_mqtt/services/parsers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -*- coding: utf-8 -*- - -from . import shelly_parser diff --git a/open_workshop_mqtt/services/parsers/shelly_parser.py b/open_workshop_mqtt/services/parsers/shelly_parser.py deleted file mode 100644 index d9aa124..0000000 --- a/open_workshop_mqtt/services/parsers/shelly_parser.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Shelly PM Mini G3 Parser -Parses MQTT Messages from Shelly PM Mini G3 -""" - -import json -import logging -from typing import Dict, Optional -from datetime import datetime - -_logger = logging.getLogger(__name__) - - -class ShellyParser: - """Parser for Shelly PM Mini G3 MQTT Messages""" - - def parse_message(self, topic: str, payload: str) -> Optional[Dict]: - """ - Parse Shelly MQTT message - - Args: - topic: MQTT topic - payload: Message payload (JSON string) - - Returns: - Dict with parsed data or None - """ - try: - # Parse JSON - data = json.loads(payload) - - # Ignore debug logs - if 'debug/log' in topic: - return None - - # Parse different message types - if '/status/pm1:0' in topic: - return self._parse_status_message(topic, data) - elif '/events/rpc' in topic: - return self._parse_rpc_event(topic, data) - elif '/telemetry' in topic: - return self._parse_telemetry(topic, data) - elif '/online' in topic: - return {'online': data == 'true' or data is True} - - return None - - except Exception as e: - _logger.debug(f"Error parsing message from {topic}: {e}") - return None - - def _parse_status_message(self, topic: str, data: dict) -> Optional[Dict]: - """ - Parse Shelly PM status message - Topic: shaperorigin/status/pm1:0 - - Payload format: - { - "id": 0, - "voltage": 230.0, - "current": 0.217, - "apower": 50.0, - "freq": 50.0, - "aenergy": {"total": 12345.6}, - "temperature": {"tC": 35.2} - } - """ - try: - # Extract device ID from topic - device_id = self._extract_device_id_from_topic(topic) - - result = { - 'message_type': 'status', - 'device_id': device_id, - 'timestamp': datetime.utcnow().isoformat() + 'Z', - 'voltage': data.get('voltage'), - 'current': data.get('current'), - 'apower': data.get('apower', 0), # Active Power in Watts - 'frequency': data.get('freq'), - 'total_energy': data.get('aenergy', {}).get('total'), - 'temperature': data.get('temperature', {}).get('tC'), - } - - _logger.debug(f"Parsed status: {result['apower']}W") - return result - - except Exception as e: - _logger.error(f"Error parsing status message: {e}") - return None - - def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]: - """ - Parse RPC NotifyStatus event - Topic: shellypmminig3/events/rpc - """ - try: - if payload.get('method') != 'NotifyStatus': - return None - - device_id = payload.get('src', '').replace('shellypmminig3-', '') - params = payload.get('params', {}) - pm_data = params.get('pm1:0', {}) - - # Get timestamp - ts = params.get('ts') - if ts: - timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' - else: - timestamp = datetime.utcnow().isoformat() + 'Z' - - data = { - 'message_type': 'event', - 'device_id': device_id, - 'timestamp': timestamp, - 'apower': pm_data.get('apower'), - 'current': pm_data.get('current'), - 'voltage': pm_data.get('voltage'), - } - - # Only return if we have actual data - if data['apower'] is not None or data['current'] is not None: - _logger.debug(f"Parsed RPC event: {pm_data}") - return data - - return None - - except Exception as e: - _logger.error(f"Error parsing RPC event: {e}") - return None - - def _parse_telemetry(self, topic: str, payload: dict) -> Optional[Dict]: - """ - Parse telemetry message - Topic: shelly/pmmini/shellypmminig3-xxx/telemetry - """ - try: - # Extract device ID from topic - parts = topic.split('/') - device_id = parts[2] if len(parts) > 2 else 'unknown' - device_id = device_id.replace('shellypmminig3-', '') - - # Get timestamp - ts = payload.get('ts') - if ts: - timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z' - else: - timestamp = datetime.utcnow().isoformat() + 'Z' - - data = { - 'message_type': 'telemetry', - 'device_id': device_id, - 'timestamp': timestamp, - 'voltage': payload.get('voltage_v'), - 'current': payload.get('current_a'), - 'apower': payload.get('power_w'), - 'frequency': payload.get('freq_hz'), - 'total_energy': payload.get('energy_wh'), - } - - _logger.debug(f"Parsed telemetry: apower={data['apower']}W") - return data - - except Exception as e: - _logger.error(f"Error parsing telemetry: {e}") - return None - - def _extract_device_id_from_topic(self, topic: str) -> str: - """ - Extract device ID from topic - Topic format: shaperorigin/status/pm1:0 - Returns: shaperorigin (the topic prefix) - """ - parts = topic.split('/') - if len(parts) > 0: - return parts[0] - return 'unknown' - - def get_power_value(self, parsed_data: Dict) -> Optional[float]: - """Extract power value from parsed data""" - return parsed_data.get('apower') - - def get_device_id(self, parsed_data: Dict) -> str: - """Get device ID from parsed data""" - return parsed_data.get('device_id', 'unknown') diff --git a/open_workshop_mqtt/services/session_detector.py b/open_workshop_mqtt/services/session_detector.py deleted file mode 100644 index e4a60ff..0000000 --- a/open_workshop_mqtt/services/session_detector.py +++ /dev/null @@ -1,340 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Session Detector - State Machine for Session Detection - -State Machine: -IDLE → STARTING → STANDBY → WORKING → STOPPING → IDLE - ↓ ↓ ↓ - IDLE (STANDBY ↔ WORKING) - -Thresholds: -- standby_threshold_w: Power above this triggers session start (e.g., 20W) -- working_threshold_w: Power above this = working state (e.g., 100W) - -Debounce: -- start_debounce_s: Wait time before confirming session start (e.g., 3s) -- stop_debounce_s: Wait time before confirming session end (e.g., 15s) -- message_timeout_s: Max time without messages before timeout (e.g., 20s) -""" - -import logging -import json -import uuid -from datetime import datetime, timedelta - -_logger = logging.getLogger(__name__) - - -class SessionDetector: - """ - State Machine for Power-Based Session Detection - - Attributes: - state: Current state (idle/starting/standby/working/stopping) - current_session_id: UUID of current running session (None if idle) - last_session_id: UUID of last completed session - device: mqtt.device record - env: Odoo environment - - # Timing - state_entered_at: Timestamp when current state was entered - last_message_time: Timestamp of last processed message - - # Durations - standby_duration_s: Time spent in STANDBY state (seconds) - working_duration_s: Time spent in WORKING state (seconds) - - # Config (loaded from device.strategy_config) - standby_threshold_w: Power threshold for session start - working_threshold_w: Power threshold for working state - start_debounce_s: Debounce time for session start - stop_debounce_s: Debounce time for session stop - message_timeout_s: Timeout for no messages - """ - - def __init__(self, device_id, device_name): - """ - Initialize Session Detector for a device - - Args: - device_id: ID of mqtt.device record - device_name: Name of device (for logging) - """ - self.device_id = device_id - self.device_name = device_name - - # State - self.state = 'idle' - self.current_session_id = None - self.last_session_id = None - - # Timing - self.state_entered_at = None - self.last_message_time = None - self.session_start_time = None - - # Durations - self.standby_duration_s = 0 - self.working_duration_s = 0 - - # Load config with default values (will be loaded properly on first call with env) - self.standby_threshold_w = 20 - self.working_threshold_w = 100 - self.start_debounce_s = 3 - self.stop_debounce_s = 15 - self.message_timeout_s = 20 - - _logger.info(f"SessionDetector initialized for device {device_name} (ID={device_id}) " - f"(standby={self.standby_threshold_w}W, working={self.working_threshold_w}W)") - - def _load_config(self): - """Load strategy config from device.strategy_config JSON""" - try: - config = json.loads(self.device.strategy_config or '{}') - self.standby_threshold_w = config.get('standby_threshold_w', 20) - self.working_threshold_w = config.get('working_threshold_w', 100) - self.start_debounce_s = config.get('start_debounce_s', 3) - self.stop_debounce_s = config.get('stop_debounce_s', 15) - self.message_timeout_s = config.get('message_timeout_s', 20) - except json.JSONDecodeError: - _logger.error(f"Invalid strategy_config JSON for device {self.device.name}, using defaults") - self.standby_threshold_w = 20 - self.working_threshold_w = 100 - self.start_debounce_s = 3 - self.stop_debounce_s = 15 - self.message_timeout_s = 20 - - def process_power_event(self, env, power_w, timestamp): - """ - Process a power measurement event - - Args: - env: Odoo environment (with active cursor) - power_w: Power in watts - timestamp: datetime of the measurement - """ - _logger.info(f"🔍 SessionDetector.process_power_event called: device={self.device_name}, power={power_w}W, state={self.state}") - - self.last_message_time = timestamp - - if self.state == 'idle': - _logger.debug(f"Calling _handle_idle({power_w}, {timestamp})") - self._handle_idle(env, power_w, timestamp) - elif self.state == 'starting': - _logger.debug(f"Calling _handle_starting({power_w}, {timestamp})") - self._handle_starting(env, power_w, timestamp) - elif self.state == 'standby': - _logger.debug(f"Calling _handle_standby({power_w}, {timestamp})") - self._handle_standby(env, power_w, timestamp) - elif self.state == 'working': - _logger.debug(f"Calling _handle_working({power_w}, {timestamp})") - self._handle_working(env, power_w, timestamp) - elif self.state == 'stopping': - _logger.debug(f"Calling _handle_stopping({power_w}, {timestamp})") - self._handle_stopping(env, power_w, timestamp) - - # Update session in DB with live data - if self.current_session_id: - self._update_session_live(env, power_w, timestamp) - - def _handle_idle(self, env, power_w, timestamp): - """IDLE State: Wait for power above standby threshold""" - if power_w > self.standby_threshold_w: - self._transition_to(env, 'starting', timestamp) - - def _handle_starting(self, env, power_w, timestamp): - """STARTING State: Debounce period before confirming session start""" - if power_w < self.standby_threshold_w: - # Power dropped → Abort start, back to IDLE - _logger.info(f"Device {self.device_name}: Power dropped during STARTING, back to IDLE") - self._transition_to(env, 'idle', timestamp) - return - - # Check if debounce time elapsed - time_in_state = (timestamp - self.state_entered_at).total_seconds() - if time_in_state >= self.start_debounce_s: - # Debounce complete → Start session - self._start_session(env, power_w, timestamp) - self._transition_to(env, 'standby', timestamp) - - def _handle_standby(self, env, power_w, timestamp): - """STANDBY State: Session running at low power""" - if power_w < self.standby_threshold_w: - # Power dropped → Accumulate duration then start stop sequence - time_in_state = (timestamp - self.state_entered_at).total_seconds() - self.standby_duration_s += time_in_state - self._transition_to(env, 'stopping', timestamp) - elif power_w > self.working_threshold_w: - # Power increased → Accumulate duration then transition to WORKING - time_in_state = (timestamp - self.state_entered_at).total_seconds() - self.standby_duration_s += time_in_state - self._transition_to(env, 'working', timestamp) - - def _handle_working(self, env, power_w, timestamp): - """WORKING State: Session running at high power""" - if power_w < self.standby_threshold_w: - # Power dropped → Accumulate duration then start stop sequence - time_in_state = (timestamp - self.state_entered_at).total_seconds() - self.working_duration_s += time_in_state - self._transition_to(env, 'stopping', timestamp) - elif power_w < self.working_threshold_w: - # Power decreased → Accumulate duration then back to STANDBY - time_in_state = (timestamp - self.state_entered_at).total_seconds() - self.working_duration_s += time_in_state - self._transition_to(env, 'standby', timestamp) - - def _handle_stopping(self, env, power_w, timestamp): - """STOPPING State: Debounce period before ending session""" - if power_w > self.standby_threshold_w: - # Power came back → Resume session - if power_w > self.working_threshold_w: - _logger.info(f"Device {self.device_name}: Power resumed during STOPPING, back to WORKING") - self._transition_to(env, 'working', timestamp) - else: - _logger.info(f"Device {self.device_name}: Power resumed during STOPPING, back to STANDBY") - self._transition_to(env, 'standby', timestamp) - return - - # Check if debounce time elapsed - time_in_state = (timestamp - self.state_entered_at).total_seconds() - if time_in_state >= self.stop_debounce_s: - # Debounce complete → End session - self._end_session(env, 'power_drop', timestamp) - self._transition_to(env, 'idle', timestamp) - - def _transition_to(self, env, new_state, timestamp): - """Transition to a new state""" - old_state = self.state - self.state = new_state - self.state_entered_at = timestamp - - _logger.info(f"Device {self.device_name}: {old_state.upper()} → {new_state.upper()}") - - def _start_session(self, env, power_w, timestamp): - """Start a new session""" - session_id = str(uuid.uuid4()) - self.current_session_id = session_id - self.session_start_time = timestamp - self.standby_duration_s = 0 - self.working_duration_s = 0 - - session = env['mqtt.session'].create({ - 'device_id': self.device_id, - 'session_id': session_id, - 'start_time': timestamp, - 'start_power_w': power_w, - 'status': 'running', - 'current_state': 'standby', - 'current_power_w': power_w, - 'last_message_time': timestamp, - }) - - _logger.info(f"Device {self.device_name}: Session STARTED (ID={session_id[:8]}..., Power={power_w:.1f}W)") - - def _end_session(self, env, reason, timestamp): - """End the current session""" - if not self.current_session_id: - _logger.warning(f"Device {self.device_name}: Tried to end session but no session running") - return - - session = env['mqtt.session'].search([ - ('session_id', '=', self.current_session_id) - ], limit=1) - - if not session: - _logger.error(f"Device {self.device_name}: Session {self.current_session_id} not found in DB!") - self.current_session_id = None - return - - total_duration = (timestamp - self.session_start_time).total_seconds() - - session.write({ - 'end_time': timestamp, - 'end_power_w': 0.0, - 'end_reason': reason, - 'status': 'completed', - 'total_duration_s': total_duration, - 'standby_duration_s': self.standby_duration_s, - 'working_duration_s': self.working_duration_s, - 'current_state': 'idle', - 'current_power_w': 0.0, - }) - - _logger.info(f"Device {self.device_name}: Session ENDED (ID={self.current_session_id[:8]}..., " - f"Reason={reason}, Duration={total_duration:.0f}s, " - f"Standby={self.standby_duration_s:.0f}s, Working={self.working_duration_s:.0f}s)") - - self.last_session_id = self.current_session_id - self.current_session_id = None - - def _update_session_live(self, env, power_w, timestamp): - """Update running session with live data""" - session = env['mqtt.session'].search([ - ('session_id', '=', self.current_session_id) - ], limit=1) - - if not session: - _logger.error(f"Device {self.device_name}: Session {self.current_session_id} not found!") - return - - # Calculate current total duration - if self.session_start_time: - total_duration = (timestamp - self.session_start_time).total_seconds() - else: - total_duration = session.total_duration_s - - session.write({ - 'current_power_w': power_w, - 'current_state': self.state, - 'last_message_time': timestamp, - 'total_duration_s': total_duration, - 'standby_duration_s': self.standby_duration_s, - 'working_duration_s': self.working_duration_s, - }) - - def check_timeout(self, env, current_time): - """ - Check if session timed out (no messages for too long) - - Args: - env: Odoo environment - current_time: Current datetime - """ - if not self.current_session_id or not self.last_message_time: - return - - time_since_last_message = (current_time - self.last_message_time).total_seconds() - - if time_since_last_message > self.message_timeout_s: - _logger.warning(f"Device {self.device_name}: Session TIMEOUT " - f"(no messages for {time_since_last_message:.0f}s)") - self._end_session(env, 'timeout', current_time) - self._transition_to(env, 'idle', current_time) - - def restore_state_from_db(self, env): - """ - Restore detector state from running session in DB (after restart) - - Args: - env: Odoo environment - """ - session = env['mqtt.session'].search([ - ('device_id', '=', self.device_id), - ('status', '=', 'running'), - ], limit=1, order='start_time desc') - - if not session: - _logger.info(f"Device {self.device_name}: No running session to restore") - return - - self.current_session_id = session.session_id - self.state = session.current_state or 'standby' - self.last_message_time = session.last_message_time - self.session_start_time = session.start_time - self.standby_duration_s = session.standby_duration_s or 0 - self.working_duration_s = session.working_duration_s or 0 - self.state_entered_at = session.last_message_time # Best guess - - _logger.info(f"Device {self.device_name}: State RESTORED from DB " - f"(State={self.state}, Session={self.current_session_id[:8]}..., " - f"Standby={self.standby_duration_s:.0f}s, Working={self.working_duration_s:.0f}s)") diff --git a/open_workshop_mqtt/tests/__init__.py b/open_workshop_mqtt/tests/__init__.py index 28d6a4f..19b1fdb 100644 --- a/open_workshop_mqtt/tests/__init__.py +++ b/open_workshop_mqtt/tests/__init__.py @@ -1,9 +1,4 @@ # -*- coding: utf-8 -*- -from . import test_mqtt_connection -from . import test_session_detection -from . import test_device_status -from . import test_mqtt_mocked # Mock-basierte Tests -from . import test_topic_matching # Topic Pattern Matching Tests -from . import test_session_detector # Session Detector Unit Tests -from . import test_no_duplicate_messages # Duplicate Message Detection Tests +# Old service-based tests removed - using new REST API architecture +# See iot_bridge/tests/ for standalone bridge tests diff --git a/open_workshop_mqtt/tests/common.py b/open_workshop_mqtt/tests/common.py index b5baa92..99249ce 100644 --- a/open_workshop_mqtt/tests/common.py +++ b/open_workshop_mqtt/tests/common.py @@ -1,163 +1,22 @@ # -*- coding: utf-8 -*- """ -Common test utilities and base classes -Uses REAL MQTT Broker (like python_prototype tests) +Common test infrastructure for MQTT module + +DEPRECATED: Old service-based tests removed. +Use iot_bridge/tests/ for standalone bridge tests. +Use REST API endpoints for integration tests. """ from odoo.tests import TransactionCase import logging -import yaml -from pathlib import Path _logger = logging.getLogger(__name__) -class MQTTTestCase(TransactionCase): - """ - Base test case for MQTT module - Uses REAL MQTT connection to test.mosquitto.org or configured broker - """ +class MqttTestCase(TransactionCase): + """Base test case for MQTT module - legacy support only""" @classmethod def setUpClass(cls): super().setUpClass() - - # Load MQTT config from python_prototype/config.yaml - config_path = Path(__file__).parent.parent / 'python_prototype' / 'config.yaml' - if config_path.exists(): - with open(config_path) as f: - config = yaml.safe_load(f) - mqtt_conf = config.get('mqtt', {}) - else: - # Fallback: public test broker - mqtt_conf = { - 'host': 'test.mosquitto.org', - 'port': 1883, - 'username': None, - 'password': None, - } - - # Create test connection with REAL broker - cls.connection = cls.env['mqtt.connection'].create({ - 'name': 'Test MQTT Broker (Real)', - 'host': mqtt_conf.get('host', 'test.mosquitto.org'), - 'port': mqtt_conf.get('port', 1883), - 'client_id': 'odoo_test_client', - 'username': mqtt_conf.get('username', False), - 'password': mqtt_conf.get('password', False), - 'use_tls': mqtt_conf.get('port') == 8883, - }) - - # Create test device with unique topic - import time - test_topic = f'odootest/{int(time.time())}' - - cls.device = cls.env['mqtt.device'].create({ - 'name': 'Test Device (Real)', - 'connection_id': cls.connection.id, - 'topic_pattern': f'{test_topic}/#', - 'parser_type': 'shelly_pm', - 'session_strategy': 'power_threshold', - 'strategy_config': '{"standby_threshold_w": 10, "working_threshold_w": 30}', - }) - - cls.test_topic = test_topic - - _logger.info(f"Test setup complete. Using broker: {mqtt_conf.get('host')}:{mqtt_conf.get('port')}") - _logger.info(f"Test topic: {test_topic}") - - def tearDown(self): - """Cleanup after each test - ensure all connections are stopped""" - super().tearDown() - - # Force stop any running connections - from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService - try: - service = IotBridgeService.get_instance(self.env) - if self.connection.id in service._clients: - service.stop_connection(self.connection.id) - _logger.info(f"Cleaned up connection {self.connection.id} in tearDown") - except Exception as e: - _logger.warning(f"Error in tearDown cleanup: {e}") - - def start_connection(self): - """Helper to start MQTT connection""" - # Bypass the ORM's action to directly start via service - from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService - service = IotBridgeService.get_instance(self.env) - - # Start with existing env (not new cursor) - success = service.start_connection_with_env(self.connection.id, self.env) - self.assertTrue(success, "Failed to start connection") - - # Wait for MQTT client to actually connect (check client state, not DB) - import time - client = service._clients.get(self.connection.id) - self.assertIsNotNone(client, "Client not found in service") - - for i in range(10): - if client.is_connected: - break - time.sleep(0.5) - - self.assertTrue(client.is_connected, "Client failed to connect within timeout") - - def stop_connection(self): - """Helper to stop MQTT connection""" - from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService - service = IotBridgeService.get_instance(self.env) - - # Stop connection - success = service.stop_connection(self.connection.id) - self.assertTrue(success, "Failed to stop connection") - - # Verify client is removed - import time - time.sleep(0.5) - client = service._clients.get(self.connection.id) - self.assertIsNone(client, "Client still in service after stop") - - def publish_test_message(self, subtopic, payload): - """ - Publish message to test topic using paho-mqtt - - Args: - subtopic: Subtopic (e.g., 'status/pm1:0') - payload: Message payload (dict or string) - """ - import paho.mqtt.publish as publish - import json - - topic = f'{self.test_topic}/{subtopic}' - payload_str = json.dumps(payload) if isinstance(payload, dict) else payload - - # Get connection config - auth = None - if self.connection.username: - auth = { - 'username': self.connection.username, - 'password': self.connection.password or '', - } - - # TLS config - tls = None - if self.connection.use_tls: - import ssl - tls = { - 'cert_reqs': ssl.CERT_REQUIRED if self.connection.verify_cert else ssl.CERT_NONE - } - - publish.single( - topic, - payload=payload_str, - hostname=self.connection.host, - port=int(self.connection.port), - auth=auth, - tls=tls, - ) - - _logger.info(f"Published test message to {topic}") - - def simulate_mqtt_message(self, subtopic, payload): - """Alias for publish_test_message for compatibility""" - self.publish_test_message(subtopic, payload) + _logger.warning("MqttTestCase is deprecated - use REST API tests instead") diff --git a/open_workshop_mqtt/tests/test_mqtt_connection.py b/open_workshop_mqtt/tests/test_mqtt_connection.py deleted file mode 100644 index 1adbe62..0000000 --- a/open_workshop_mqtt/tests/test_mqtt_connection.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test MQTT Connection Lifecycle with REAL broker -""" - -import unittest -from odoo.tests import tagged -from .common import MQTTTestCase -import time - - -@unittest.skip("HANGS: Real MQTT broker + TransactionCase incompatible - see TODO.md M8") -@tagged('post_install', '-at_install', 'mqtt') -class TestMQTTConnection(MQTTTestCase): - """Test MQTT connection start/stop/restart with REAL broker""" - - def test_01_connection_start_real_broker(self): - """Test starting connection to REAL MQTT broker""" - # Start connection (internally checks client.is_connected) - self.start_connection() - - # Connection is established - tearDown will clean up - - def test_02_connection_stop_real_broker(self): - """Test stopping active MQTT connection""" - # Start first - self.start_connection() - - # Explicitly stop (test the stop function) - self.stop_connection() - - # Verify stopped - from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService - service = IotBridgeService.get_instance(self.env) - client = service._clients.get(self.connection.id) - self.assertIsNone(client, "Client should be removed after stop") - - def test_03_publish_and_receive_message(self): - """Test publishing message and receiving it in Odoo""" - # Start connection - self.start_connection() - - # Wait for subscription - time.sleep(2) - - # Publish test message - test_payload = { - "id": 0, - "voltage": 230.0, - "current": 0.5, - "apower": 50.0, - "freq": 50.0, - } - self.publish_test_message('status/pm1:0', test_payload) - - # Wait for message to arrive - time.sleep(3) - - # Check if message was received - messages = self.env['mqtt.message'].search([ - ('device_id', '=', self.device.id), - ('topic', '=', f'{self.test_topic}/status/pm1:0'), - ]) - - self.assertGreater(len(messages), 0, "No messages received from broker!") - - # tearDown will clean up connection diff --git a/open_workshop_mqtt/tests/test_mqtt_mocked.py b/open_workshop_mqtt/tests/test_mqtt_mocked.py deleted file mode 100644 index 8817c63..0000000 --- a/open_workshop_mqtt/tests/test_mqtt_mocked.py +++ /dev/null @@ -1,125 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test suite mit gemocktem MQTT Client (Unit Tests) -Folgt Odoo Best Practices - siehe microsoft_outlook, payment_mercado_pago -""" - -from unittest.mock import Mock, patch, call -from odoo.tests.common import TransactionCase -from odoo.tests import tagged - - -@tagged('post_install', '-at_install') -class TestMQTTConnectionMocked(TransactionCase): - """Unit Tests mit gemocktem MQTT Client - kein echter Broker nötig""" - - @classmethod - def setUpClass(cls): - super().setUpClass() - - # Create test connection first - cls.connection = cls.env['mqtt.connection'].create({ - 'name': 'Test Broker Mocked', - 'host': 'test.broker.local', - 'port': '1883', - 'client_id': 'test_client_mocked', - }) - - # Create test device - cls.device = cls.env['mqtt.device'].create({ - 'name': 'Test Device Mocked', - 'connection_id': cls.connection.id, - 'topic_pattern': 'test/#', - 'parser_type': 'shelly_pm', - }) - - # Setup mock MQTT client - cls.mqtt_patcher = patch('odoo.addons.open_workshop.open_workshop_mqtt.services.mqtt_client.mqtt.Client') - cls.MockClient = cls.mqtt_patcher.start() - - # Create mock instance - cls.mqtt_client_mock = Mock() - cls.MockClient.return_value = cls.mqtt_client_mock - - # Setup successful responses - cls.mqtt_client_mock.connect.return_value = 0 # MQTT_ERR_SUCCESS - cls.mqtt_client_mock.loop_start.return_value = None - cls.mqtt_client_mock.subscribe.return_value = (0, 1) - cls.mqtt_client_mock.publish.return_value = Mock(rc=0, mid=1) - cls.mqtt_client_mock.is_connected.return_value = True - cls.mqtt_client_mock.disconnect.return_value = 0 - cls.mqtt_client_mock.loop_stop.return_value = None - - # Import service (it's NOT an Odoo model - just a Python class!) - from ..services.iot_bridge_service import IotBridgeService - cls.service = IotBridgeService(cls.env) - - @classmethod - def tearDownClass(cls): - cls.mqtt_patcher.stop() - super().tearDownClass() - - def setUp(self): - super().setUp() - self.mqtt_client_mock.reset_mock() - - def test_01_start_connection_calls_mqtt_methods(self): - """Test dass start_connection die richtigen MQTT Methoden aufruft""" - # Start connection - result = self.service.start_connection_with_env(self.connection.id, self.env) - - self.assertTrue(result, "Connection should start") - - # Verify calls - self.mqtt_client_mock.connect.assert_called_once() - self.mqtt_client_mock.loop_start.assert_called_once() - - # Check connect args - connect_call = self.mqtt_client_mock.connect.call_args - host, port = connect_call[0][0], connect_call[0][1] - self.assertEqual(host, 'test.broker.local') - self.assertEqual(port, 1883) - - def test_02_stop_connection_calls_disconnect(self): - """Test dass stop_connection disconnect/loop_stop aufruft""" - # Start - self.service.start_connection_with_env(self.connection.id, self.env) - self.mqtt_client_mock.reset_mock() - - # Stop - self.service.stop_connection(self.connection.id) - - # Verify - self.mqtt_client_mock.loop_stop.assert_called_once() - self.mqtt_client_mock.disconnect.assert_called_once() - - def test_03_reconnect_after_disconnect(self): - """Test Reconnect nach Disconnect""" - # Connect -> Disconnect -> Connect - self.service.start_connection_with_env(self.connection.id, self.env) - self.service.stop_connection(self.connection.id) - - self.mqtt_client_mock.reset_mock() - result = self.service.start_connection_with_env(self.connection.id, self.env) - - self.assertTrue(result) - self.mqtt_client_mock.connect.assert_called_once() - - def test_04_on_connect_subscribes_topics(self): - """Test dass on_connect callback Topics subscribed""" - # Start - self.service.start_connection_with_env(self.connection.id, self.env) - - # Get the mqtt_client (MqttClient wrapper) and trigger its _on_connect callback - mqtt_client = self.service.get_client(self.connection.id) - mqtt_client._on_connect(self.mqtt_client_mock, None, None, 0) - - # Check subscribes were called - self.assertTrue(self.mqtt_client_mock.subscribe.called) - - # Get all subscribed topics - subscribe_calls = self.mqtt_client_mock.subscribe.call_args_list - topics = [c[0][0] for c in subscribe_calls] - - # Should have subscribed to device's topic_pattern - self.assertIn(self.device.topic_pattern, topics) diff --git a/open_workshop_mqtt/tests/test_no_duplicate_messages.py b/open_workshop_mqtt/tests/test_no_duplicate_messages.py deleted file mode 100644 index 753ad18..0000000 --- a/open_workshop_mqtt/tests/test_no_duplicate_messages.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- - -from odoo.tests import TransactionCase, tagged -from unittest.mock import MagicMock, patch -import logging - -_logger = logging.getLogger(__name__) - - -@tagged('post_install', '-at_install', 'open_workshop_mqtt') -class TestNoDuplicateMessages(TransactionCase): - """Test that MQTT messages are not stored twice in the database""" - - def setUp(self): - super().setUp() - - # Create test connection - self.connection = self.env['mqtt.connection'].create({ - 'name': 'Test Duplicate Check', - 'host': 'test.mosquitto.org', - 'port': 1883, - 'client_id': 'test_duplicate_client', - }) - - # Create test device - self.device = self.env['mqtt.device'].create({ - 'name': 'Test Device Duplicate', - 'connection_id': self.connection.id, - 'topic_pattern': 'test/duplicate/#', - 'parser_type': 'shelly_pm', - 'session_strategy': 'power_threshold', - }) - - def test_no_duplicate_messages_in_database(self): - """ - Test that there are no duplicate messages in the database. - - Duplicates are defined as: - - Same device_id - - Same topic - - Same payload - - create_date within 100ms of each other - - This test queries the actual database to find duplicates. - """ - _logger.info("Checking for duplicate messages in mqtt.message table...") - - # SQL query to find duplicates within 100ms time window - self.env.cr.execute(""" - WITH message_groups AS ( - SELECT - id, - device_id, - topic, - payload, - create_date, - LAG(create_date) OVER ( - PARTITION BY device_id, topic, payload - ORDER BY create_date - ) as prev_create_date - FROM mqtt_message - WHERE device_id IS NOT NULL - ) - SELECT - id, - device_id, - topic, - LEFT(payload, 80) as payload_preview, - create_date, - prev_create_date, - EXTRACT(EPOCH FROM (create_date - prev_create_date)) * 1000 as diff_ms - FROM message_groups - WHERE prev_create_date IS NOT NULL - AND create_date - prev_create_date < INTERVAL '100 milliseconds' - ORDER BY create_date DESC - LIMIT 20; - """) - - duplicates = self.env.cr.fetchall() - - if duplicates: - _logger.error(f"Found {len(duplicates)} duplicate message(s):") - for dup in duplicates: - msg_id, device_id, topic, payload_preview, create_date, prev_create_date, diff_ms = dup - _logger.error( - f" ID {msg_id}: device={device_id}, topic={topic}, " - f"time_diff={diff_ms:.1f}ms, payload={payload_preview}" - ) - - self.fail( - f"Found {len(duplicates)} duplicate message(s) in database! " - f"Messages with identical device/topic/payload within 100ms. " - f"This indicates the MQTT callback is being called multiple times." - ) - else: - _logger.info("✓ No duplicate messages found in database") - - def test_no_duplicate_messages_same_second(self): - """ - Test that there are no messages with identical content in the same second. - This is a stricter check that groups by second instead of milliseconds. - """ - _logger.info("Checking for duplicate messages within same second...") - - self.env.cr.execute(""" - SELECT - device_id, - topic, - LEFT(payload, 50) as payload_preview, - DATE_TRUNC('second', create_date) as second_bucket, - COUNT(*) as count, - MIN(create_date) as first_msg, - MAX(create_date) as last_msg - FROM mqtt_message - WHERE device_id IS NOT NULL - GROUP BY device_id, topic, payload, DATE_TRUNC('second', create_date) - HAVING COUNT(*) > 1 - ORDER BY COUNT(*) DESC, second_bucket DESC - LIMIT 10; - """) - - duplicates = self.env.cr.fetchall() - - if duplicates: - _logger.error(f"Found {len(duplicates)} duplicate message group(s) in same second:") - for dup in duplicates: - device_id, topic, payload_preview, second_bucket, count, first_msg, last_msg = dup - time_diff = (last_msg - first_msg).total_seconds() * 1000 - _logger.error( - f" Device {device_id}, topic={topic}, count={count}, " - f"time_spread={time_diff:.1f}ms, payload={payload_preview}" - ) - - self.fail( - f"Found {len(duplicates)} duplicate message group(s)! " - f"Multiple messages with identical device/topic/payload in same second." - ) - else: - _logger.info("✓ No duplicate message groups found") - - def test_subscription_not_duplicated(self): - """ - Test that MQTT topics are not subscribed multiple times. - - This mocks the MQTT client to verify that subscribe() is called - exactly once per topic, not multiple times. - """ - _logger.info("Testing that subscriptions are not duplicated...") - - from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService - from odoo.addons.open_workshop_mqtt.services.mqtt_client import MqttClient - - # Track subscribe calls - subscribe_calls = [] - - # Mock MqttClient - with patch.object(MqttClient, 'connect', return_value=True): - with patch.object(MqttClient, 'subscribe') as mock_subscribe: - # Track all subscribe calls - def track_subscribe(topic, qos=0): - subscribe_calls.append({'topic': topic, 'qos': qos}) - return True - - mock_subscribe.side_effect = track_subscribe - - # Start connection (this should trigger subscription) - service = IotBridgeService.get_instance(self.env) - service.start_connection_with_env(self.connection.id, self.env) - - # Check that each topic was subscribed exactly once - topic_counts = {} - for call in subscribe_calls: - topic = call['topic'] - topic_counts[topic] = topic_counts.get(topic, 0) + 1 - - duplicated_topics = {t: c for t, c in topic_counts.items() if c > 1} - - if duplicated_topics: - _logger.error(f"Found duplicated subscriptions: {duplicated_topics}") - self.fail( - f"Topics subscribed multiple times: {duplicated_topics}. " - f"Each topic should only be subscribed once!" - ) - else: - _logger.info(f"✓ All {len(topic_counts)} topic(s) subscribed exactly once") diff --git a/open_workshop_mqtt/tests/test_session_detector.py b/open_workshop_mqtt/tests/test_session_detector.py deleted file mode 100644 index 457bed3..0000000 --- a/open_workshop_mqtt/tests/test_session_detector.py +++ /dev/null @@ -1,281 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Test Session Detector - Unit Tests (Mock-based, NO real MQTT!) - -Tests the State Machine logic for session detection: -IDLE → STARTING → STANDBY/WORKING → STOPPING → IDLE - -Following TDD: Tests FIRST, implementation SECOND! -""" - -from odoo.tests.common import TransactionCase -from odoo.tests import tagged -from datetime import datetime, timedelta -from unittest.mock import Mock, patch -import logging - -_logger = logging.getLogger(__name__) - - -@tagged('post_install', '-at_install') -class TestSessionDetector(TransactionCase): - """Unit Tests for Session Detector State Machine""" - - @classmethod - def setUpClass(cls): - super().setUpClass() - - # Create test connection - cls.connection = cls.env['mqtt.connection'].create({ - 'name': 'Test Broker', - 'host': 'test.broker.local', - 'port': '1883', - 'client_id': 'test_client', - }) - - # Create test device with strategy config - cls.device = cls.env['mqtt.device'].create({ - 'name': 'Test Device', - 'connection_id': cls.connection.id, - 'topic_pattern': 'test/#', - 'parser_type': 'shelly_pm', - 'session_strategy': 'power_threshold', - 'strategy_config': '''{ - "standby_threshold_w": 20, - "working_threshold_w": 100, - "start_debounce_s": 3, - "stop_debounce_s": 15, - "message_timeout_s": 20 - }''', - }) - - def setUp(self): - super().setUp() - # Import SessionDetector (will be created) - from ..services.session_detector import SessionDetector - - # Create detector instance with new signature (device_id, device_name) - self.detector = SessionDetector(self.device.id, self.device.name) - self.now = datetime.now() - - def test_01_idle_to_starting_on_power_above_threshold(self): - """ - Power rises above standby_threshold → State = STARTING - After debounce time → State = STANDBY - """ - # Initially IDLE - self.assertEqual(self.detector.state, 'idle') - - # Power 25W (> standby_threshold 20W) - self.detector.process_power_event(self.env, 25.0, self.now) - - # Should transition to STARTING - self.assertEqual(self.detector.state, 'starting') - self.assertIsNone(self.detector.current_session_id) # No session yet! - - # Wait 2 seconds (< start_debounce 3s) - self.detector.process_power_event(self.env, 25.0, self.now + timedelta(seconds=2)) - self.assertEqual(self.detector.state, 'starting') # Still starting - - # Wait 3 seconds total (>= start_debounce) - self.detector.process_power_event(self.env, 25.0, self.now + timedelta(seconds=3)) - - # Should transition to STANDBY and CREATE session - self.assertEqual(self.detector.state, 'standby') - self.assertIsNotNone(self.detector.current_session_id) - - def test_02_starting_to_idle_on_power_drop(self): - """ - STARTING → Power drops below threshold → Back to IDLE (debounce aborted) - """ - # Start with power above threshold - self.detector.process_power_event(self.env, 25.0, self.now) - self.assertEqual(self.detector.state, 'starting') - - # Power drops to 10W (< standby_threshold 20W) after 1s - self.detector.process_power_event(self.env, 10.0, self.now + timedelta(seconds=1)) - - # Should abort STARTING and go back to IDLE - self.assertEqual(self.detector.state, 'idle') - self.assertIsNone(self.detector.current_session_id) - - def test_03_standby_to_working_transition(self): - """ - Session in STANDBY (20W-100W) → Power rises above working_threshold → WORKING - """ - # Create session in STANDBY - self.detector.process_power_event(self.env, 25.0, self.now) - self.detector.process_power_event(self.env, 25.0, self.now + timedelta(seconds=3)) - self.assertEqual(self.detector.state, 'standby') - - # Stay in STANDBY for 5 seconds - self.detector.process_power_event(self.env, 50.0, self.now + timedelta(seconds=8)) - - # Power rises to 120W (> working_threshold 100W) - self.detector.process_power_event(self.env, 120.0, self.now + timedelta(seconds=10)) - - # Should transition to WORKING - self.assertEqual(self.detector.state, 'working') - - # Session should have accumulated ~5s standby time (from t3 to t10 = 7s, but only 5s was at 50W) - # Actually from state_entered_at (t3) to transition (t10) = 7s - self.assertGreater(self.detector.standby_duration_s, 5) - - # Working duration should still be 0 (just transitioned) - self.assertEqual(self.detector.working_duration_s, 0) - - # Stay in WORKING for a bit - self.detector.process_power_event(self.env, 120.0, self.now + timedelta(seconds=15)) - - # Now working duration should be > 0 (we stayed for 5s) - # But detector doesn't accumulate until we leave WORKING! - # So we need to transition out of WORKING to test duration - self.detector.process_power_event(self.env, 50.0, self.now + timedelta(seconds=20)) # Back to STANDBY - - # Now working_duration should be ~5s (from t10 to t20 = 10s) - self.assertGreater(self.detector.working_duration_s, 8) - - def test_04_working_to_stopping_transition(self): - """ - Session in WORKING → Power drops below standby_threshold → STOPPING - After stop_debounce → Session ENDED - """ - # Create session in WORKING - self.detector.process_power_event(self.env, 25.0, self.now) - self.detector.process_power_event(self.env, 25.0, self.now + timedelta(seconds=3)) - self.detector.process_power_event(self.env, 120.0, self.now + timedelta(seconds=5)) - self.assertEqual(self.detector.state, 'working') - session_id = self.detector.current_session_id - - # Power drops to 10W (< standby_threshold 20W) - self.detector.process_power_event(self.env, 10.0, self.now + timedelta(seconds=10)) - - # Should transition to STOPPING - self.assertEqual(self.detector.state, 'stopping') - self.assertEqual(self.detector.current_session_id, session_id) # Session still active - - # Wait 14 seconds (< stop_debounce 15s) - self.detector.process_power_event(self.env, 10.0, self.now + timedelta(seconds=24)) - self.assertEqual(self.detector.state, 'stopping') # Still stopping - - # Wait 15 seconds total (>= stop_debounce) - self.detector.process_power_event(self.env, 10.0, self.now + timedelta(seconds=25)) - - # Should END session and go to IDLE - self.assertEqual(self.detector.state, 'idle') - self.assertIsNone(self.detector.current_session_id) - - # Session should be marked as completed - session = self.env['mqtt.session'].search([('session_id', '=', session_id)]) - self.assertEqual(session.status, 'completed') - self.assertEqual(session.end_reason, 'power_drop') - - def test_05_timeout_detection(self): - """ - Session RUNNING → No messages for > message_timeout_s → Session ENDED (timeout) - """ - # Create running session - self.detector.process_power_event(self.env, 25.0, self.now) - self.detector.process_power_event(self.env, 25.0, self.now + timedelta(seconds=3)) - self.assertEqual(self.detector.state, 'standby') - session_id = self.detector.current_session_id - - # Simulate timeout check 21 seconds later (> message_timeout 20s) - # No new power events! - self.detector.check_timeout(self.env, self.now + timedelta(seconds=24)) - - # Should END session due to timeout - self.assertEqual(self.detector.state, 'idle') - self.assertIsNone(self.detector.current_session_id) - - # Session should be marked as completed with timeout reason - session = self.env['mqtt.session'].search([('session_id', '=', session_id)]) - self.assertEqual(session.status, 'completed') - self.assertEqual(session.end_reason, 'timeout') - - def test_06_duration_tracking(self): - """ - Test that standby_duration_s and working_duration_s are tracked correctly - """ - # Start session - t0 = self.now - self.detector.process_power_event(self.env, 25.0, t0) # STARTING - - # After 3s → STANDBY - t1 = t0 + timedelta(seconds=3) - self.detector.process_power_event(self.env, 25.0, t1) - self.assertEqual(self.detector.state, 'standby') - session_start_time = t1 # Session starts when STANDBY is entered - - # Stay in STANDBY for 20 seconds - t2 = t1 + timedelta(seconds=20) - self.detector.process_power_event(self.env, 50.0, t2) - self.assertEqual(self.detector.state, 'standby') - - # Transition to WORKING (duration accumulated: t2-t1 = 20s standby) - t3 = t2 + timedelta(seconds=1) - self.detector.process_power_event(self.env, 120.0, t3) - self.assertEqual(self.detector.state, 'working') - - # Stay in WORKING for 40 seconds - t4 = t3 + timedelta(seconds=40) - self.detector.process_power_event(self.env, 120.0, t4) - - # End session (duration accumulated: t4-t3 = 40s working) - t5 = t4 + timedelta(seconds=1) - self.detector.process_power_event(self.env, 0.0, t5) - self.assertEqual(self.detector.state, 'stopping') - - # Wait for stop debounce (15s) - t6 = t5 + timedelta(seconds=15) - self.detector.process_power_event(self.env, 0.0, t6) - self.assertEqual(self.detector.state, 'idle') - - # Check durations - session_id = self.detector.last_session_id - session = self.env['mqtt.session'].search([('session_id', '=', session_id)]) - - # standby_duration: t2-t1 = 20s (might be 21s due to t3-t1) - # Actually: transition happens at t3, so standby_duration = t3-t1 = 21s - self.assertAlmostEqual(session.standby_duration_s, 21, delta=2) - - # working_duration: t4-t3 = 40s (might be 41s due to t5-t3) - # Actually: transition happens at t5, so working_duration = t5-t3 = 41s - self.assertAlmostEqual(session.working_duration_s, 41, delta=2) - - # total_duration: t6-t1 = 77s (STARTING 3s + STANDBY 21s + WORKING 41s + STOPPING 15s) - # Actually: session starts at t1, ends at t6 = t5+15 = (t1+21)+41+15 = 77s - self.assertAlmostEqual(session.total_duration_s, 77, delta=5) - - def test_07_state_recovery_after_restart(self): - """ - Test that detector can restore state from existing running session in DB - """ - # Create a running session manually (simulating previous detector instance) - session = self.env['mqtt.session'].create({ - 'device_id': self.device.id, - 'start_time': self.now, - 'status': 'running', - 'current_state': 'working', - 'current_power_w': 120.0, - 'last_message_time': self.now, - 'start_power_w': 25.0, - 'standby_duration_s': 20, - 'working_duration_s': 40, - }) - - # Create NEW detector instance (simulating restart) - from ..services.session_detector import SessionDetector - new_detector = SessionDetector(self.device.id, self.device.name) - - # Should restore state from DB - new_detector.restore_state_from_db(self.env) - - self.assertEqual(new_detector.state, 'working') - self.assertEqual(new_detector.current_session_id, session.session_id) - self.assertEqual(new_detector.standby_duration_s, 20) - self.assertEqual(new_detector.working_duration_s, 40) - - # Should be able to continue session - new_detector.process_power_event(self.env, 120.0, self.now + timedelta(seconds=100)) - self.assertEqual(new_detector.state, 'working') diff --git a/open_workshop_mqtt/tests/test_topic_matching.py b/open_workshop_mqtt/tests/test_topic_matching.py deleted file mode 100644 index b49bf97..0000000 --- a/open_workshop_mqtt/tests/test_topic_matching.py +++ /dev/null @@ -1,152 +0,0 @@ -# -*- coding: utf-8 -*- -"""Test MQTT Topic Pattern Matching""" - -from odoo.tests.common import TransactionCase -import unittest -import logging - -_logger = logging.getLogger(__name__) - - -@unittest.skip("TODO: Rewrite without real MQTT service - use mocks") -class TestTopicMatching(TransactionCase): - """Test that devices correctly match MQTT topic patterns""" - - def setUp(self): - super().setUp() - - # Create MQTT connection - self.connection = self.env['mqtt.connection'].create({ - 'name': 'Test Broker', - 'host': 'test.broker.local', - 'port': '1883', - 'client_id': 'test_client', - }) - - # Create Test Device (shaperorigin/test/#) - self.test_device = self.env['mqtt.device'].create({ - 'name': 'Test Shaper', - 'connection_id': self.connection.id, - 'topic_pattern': 'shaperorigin/test/#', - 'parser_type': 'shelly_pm', - }) - - # Create Real Device (shaperorigin/real/#) - self.real_device = self.env['mqtt.device'].create({ - 'name': 'Real Shaper', - 'connection_id': self.connection.id, - 'topic_pattern': 'shaperorigin/real/#', - 'parser_type': 'shelly_pm', - }) - - # Get service instance - from ..services.iot_bridge_service import IotBridgeService - self.service = IotBridgeService.get_instance(self.env) - - def test_wildcard_topic_matching(self): - """Test that # wildcard matches correctly""" - - # Test: Message to shaperorigin/test/info should match test_device - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'shaperorigin/test/info' - ) - - self.assertEqual(matched.id, self.test_device.id, - "Topic 'shaperorigin/test/info' should match 'shaperorigin/test/#'") - - # Test: Message to shaperorigin/real/status should match real_device - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'shaperorigin/real/status' - ) - - self.assertEqual(matched.id, self.real_device.id, - "Topic 'shaperorigin/real/status' should match 'shaperorigin/real/#'") - - def test_no_cross_matching(self): - """Test that devices don't match other device's topics""" - - # Test: shaperorigin/test/info should NOT match real_device - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'shaperorigin/test/info' - ) - - self.assertNotEqual(matched.id, self.real_device.id, - "Test device topics should not match real device") - - def test_exact_matching(self): - """Test exact topic matching (no wildcards)""" - - # Create device with exact topic (no wildcard) - exact_device = self.env['mqtt.device'].create({ - 'name': 'Exact Device', - 'connection_id': self.connection.id, - 'topic_pattern': 'device/status', - 'parser_type': 'generic', - }) - - # Should match exact topic - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'device/status' - ) - - self.assertEqual(matched.id, exact_device.id, - "Exact topic 'device/status' should match pattern 'device/status'") - - # Should NOT match different topic - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'device/info' - ) - - self.assertNotEqual(matched.id, exact_device.id, - "Topic 'device/info' should not match exact pattern 'device/status'") - - def test_single_level_wildcard(self): - """Test + wildcard (single level)""" - - # Create device with + wildcard - plus_device = self.env['mqtt.device'].create({ - 'name': 'Multi Device', - 'connection_id': self.connection.id, - 'topic_pattern': 'device/+/status', - 'parser_type': 'generic', - }) - - # Should match device/abc/status - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'device/abc/status' - ) - - self.assertEqual(matched.id, plus_device.id, - "Topic 'device/abc/status' should match 'device/+/status'") - - # Should match device/xyz/status - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'device/xyz/status' - ) - - self.assertEqual(matched.id, plus_device.id, - "Topic 'device/xyz/status' should match 'device/+/status'") - - # Should NOT match device/abc/extra/status (+ is single level only) - matched = self.service._find_device_for_topic( - self.env, - self.connection.id, - 'device/abc/extra/status' - ) - - self.assertNotEqual(matched.id, plus_device.id, - "Topic 'device/abc/extra/status' should NOT match 'device/+/status' (+ is single level)")