WIP: Duplikate-Fix und ausführliche Service-Dokumentation

- Entfernt doppelte Topic-Subscription (war Ursache für Duplikate)
- Subscription passiert nur noch in _on_connect() Callback
- Ausführliche Dokumentation in iot_bridge_service.py hinzugefügt
- Test für Duplikate erstellt (test_no_duplicate_messages.py)
- Recovery-Logik für Container-Restart dokumentiert

HINWEIS: Service-Design muss überarbeitet werden!
- Aktuell: Lazy Init, manueller Start
- Sollte sein: Auto-Init beim Odoo-Start, always-on MQTT
- Nächster Schritt: Odoo 18 Service Pattern + MQTT Bridge Pattern recherchieren
This commit is contained in:
Matthias Lotz 2026-01-31 11:19:44 +01:00
parent b46fed0f8e
commit 92f9548d34
8 changed files with 606 additions and 32 deletions

View File

@ -112,6 +112,14 @@ class MqttConnection(models.Model):
help='MQTT devices using this connection'
)
# ========== Session Cleanup Configuration ==========
session_cleanup_hours = fields.Integer(
string='Session Cleanup (h)',
default=24,
required=True,
help='Hours after last message before a running session is marked as timeout after Odoo restart'
)
# ========== Computed Fields ==========
device_count = fields.Integer(
string='Device Count',
@ -354,6 +362,14 @@ class MqttConnection(models.Model):
def _register_hook(self):
"""Auto-start all connected connections when Odoo starts"""
res = super()._register_hook()
# First: Reset any zombie 'connected' states from previous Odoo instance
# (Container restart kills Python process without calling on_disconnect)
from ..services.iot_bridge_service import IotBridgeService
service = IotBridgeService.get_instance(self.env)
service._reset_connection_states_after_restart()
service._cleanup_stale_sessions_after_restart()
service._restore_detector_states()
# Then: Auto-start connections that should be running
self.auto_start_all_connections()
return res

View File

@ -5,6 +5,7 @@ from odoo.exceptions import ValidationError
import json
import logging
import uuid
from datetime import datetime, timedelta
_logger = logging.getLogger(__name__)
@ -143,6 +144,7 @@ class MqttSession(models.Model):
end_reason = fields.Selection([
('power_drop', 'Power Drop'),
('timeout', 'Message Timeout'),
('restart', 'Container Restart'),
('last_will', 'Last Will Testament'),
('manual', 'Manual Stop'),
], string='End Reason', readonly=True,

View File

@ -1,7 +1,124 @@
# -*- coding: utf-8 -*-
"""
IoT Bridge Service
Manages MQTT connections and message routing for all devices
IoT Bridge Service - MQTT Connection & Message Routing Manager
================================================================
ZWECK:
------
Der IotBridgeService ist ein Singleton-Service, der als zentrale Schaltstelle zwischen
Odoo und MQTT-fähigen IoT-Geräten fungiert. Er verwaltet alle MQTT-Verbindungen und
leitet eingehende MQTT-Messages an die entsprechenden Parser und Session-Detektoren weiter.
WIE FUNKTIONIERT EIN SERVICE IN ODOO 18:
-----------------------------------------
Im Gegensatz zu Odoo-Models (die direkt in der Datenbank gespeichert werden) ist ein
Service eine reine Python-Klasse, die:
- NICHT in der Datenbank persistiert wird
- Im RAM des Odoo-Prozesses läuft
- Als Singleton pro Datenbank existiert (eine Instanz pro DB)
- Nicht über das ORM (self.env['model.name']) zugegriffen wird
- Direkt über IotBridgeService.get_instance(env) instanziiert wird
WANN WIRD DER SERVICE GESTARTET:
---------------------------------
1. **Lazy Initialization**: Der Service wird NICHT beim Odoo-Start automatisch geladen,
sondern erst beim ersten Zugriff via get_instance(env).
2. **Erste Verwendung**: Typischerweise beim ersten Start einer MQTT-Connection:
- User klickt "Start" Button in mqtt.connection Form
- mqtt_connection.start_connection() wird aufgerufen
- Diese Methode ruft IotBridgeService.get_instance(self.env)
- Wenn noch keine Instanz existiert Service wird jetzt erstellt
3. **Nach Registry Reload**: Wenn Odoo Module aktualisiert/installiert werden:
- Odoo lädt das Registry neu (alle Models werden neu geladen)
- Der Service erkennt den Registry-Wechsel
- Alte Instanz wird mit cleanup() aufgeräumt
- Neue Instanz wird erstellt
- Alle vorher laufenden Connections werden automatisch neu gestartet
4. **Nach Container Restart**:
- Python-Prozess startet neu
- Service-Instanz existiert NICHT mehr (war nur im RAM)
- mqtt_connection._register_hook() wird beim Odoo-Start aufgerufen
- Recovery-Methoden werden ausgeführt (siehe unten)
- auto_start_all_connections() startet alle Connections neu
- Beim ersten Connection-Start wird der Service lazy initialisiert
HAUPTAUFGABEN DES SERVICE:
---------------------------
1. **MQTT Client Management**:
- Erstellt und verwaltet MqttClient-Instanzen (eine pro mqtt.connection)
- Speichert aktive Clients in self._clients Dict: {connection_id: MqttClient}
- Startet/Stoppt MQTT-Verbindungen auf Anforderung
- Überwacht Verbindungsstatus und Reconnects
2. **Message Routing**:
- Empfängt MQTT-Messages via _on_message() Callback
- Findet passendes mqtt.device via Topic-Pattern-Matching
- Routet Message an zuständigen Parser (z.B. ShellyParser)
- Speichert geparsete Daten in mqtt.message Tabelle
3. **Session Detection**:
- Erstellt SessionDetector pro Gerät (bei Bedarf)
- Speichert Detektoren in self._detectors Dict: {device_id: SessionDetector}
- Detektoren analysieren Message-Patterns und erkennen Sessions
- Erstellt/Beendet mqtt.session Records automatisch
4. **Restart Recovery** (nach Container-Neustart):
- _reset_connection_states_after_restart():
Setzt "zombie" Connections (state='connected' in DB, aber tot) auf 'stopped'
- _cleanup_stale_sessions_after_restart():
Schließt offene Sessions mit end_reason='restart' oder 'timeout'
- _restore_detector_states():
Stellt SessionDetector-States für laufende Sessions wieder her
Diese Methoden werden von mqtt_connection._register_hook() BEFORE auto_start
aufgerufen, um sauberen Zustand zu garantieren.
5. **Timeout Worker**:
- Background-Thread der alle 30 Sekunden läuft
- Prüft alle SessionDetector auf Message-Timeouts
- Beendet Sessions automatisch bei Inaktivität
6. **Connection Callbacks**:
- _on_connect(): Wird bei MQTT-Connect aufgerufen, subscribt Topics
- _on_disconnect(): Wird bei MQTT-Disconnect aufgerufen, aktualisiert DB-State
- _on_message(): Wird bei eingehender MQTT-Message aufgerufen, routet zu Parser
SINGLETON-PATTERN:
------------------
Pro Datenbank existiert genau EINE Service-Instanz:
- _instances = {} speichert: {db_name: IotBridgeService}
- get_instance(env) prüft ob Instanz für env.cr.dbname existiert
- Falls nicht neue Instanz wird erstellt
- Falls ja, aber Registry geändert alte Instanz cleanup, neue erstellt
- Verhindert mehrfache MQTT-Connections zum gleichen Broker
THREAD-SAFETY:
--------------
Der Service läuft in einem Multi-Threaded Environment:
- self._running_lock schützt self._clients Dict
- self._lock (class-level) schützt _instances Dict
- Callbacks (_on_message etc.) laufen in MQTT-Client-Threads!
- Jeder DB-Zugriff in Callbacks nutzt new_cursor() für Thread-Safety
LEBENSZYKLUS-BEISPIEL:
----------------------
1. Odoo startet Service existiert noch nicht
2. User öffnet mqtt.connection Form, klickt "Start"
3. start_connection() get_instance(env) Service wird erstellt
4. Service erstellt MqttClient, speichert in self._clients
5. MqttClient verbindet zu Broker
6. Broker sendet Messages _on_message() Callback
7. Service routet zu Parser speichert in DB
8. User installiert neues Modul Registry Reload
9. Service erkennt Reload cleanup() alte Instanz
10. Neue Instanz wird erstellt Connections neu gestartet
11. Docker Container restart Python beendet
12. Odoo startet neu _register_hook() Recovery auto_start
13. auto_start get_instance() Service lazy erstellt
14. Connections werden neu gestartet
"""
import logging
@ -30,16 +147,30 @@ class IotBridgeService:
"""
Initialize IoT Bridge Service
ACHTUNG: Diese Methode wird NICHT beim Odoo-Start aufgerufen!
Sie wird erst beim ersten get_instance() Aufruf ausgeführt (Lazy Init).
Args:
env: Odoo environment
env: Odoo environment (enthält DB-Connection, User-Context etc.)
"""
self.env = env
self.registry = env.registry
self.db_name = env.cr.dbname
self._clients: Dict[int, MqttClient] = {} # connection_id -> MqttClient
self._detectors: Dict[int, SessionDetector] = {} # device_id -> SessionDetector
# Dictionary: connection_id (int) -> MqttClient instance
# Speichert alle aktiven MQTT-Verbindungen
self._clients: Dict[int, MqttClient] = {}
# Dictionary: device_id (int) -> SessionDetector instance
# Speichert Session-Detektoren für Geräte mit laufenden Sessions
self._detectors: Dict[int, SessionDetector] = {}
# Lock für Thread-Safety beim Zugriff auf self._clients
self._running_lock = threading.Lock()
self._parser = ShellyParser() # For now only Shelly
# MQTT Message Parser (aktuell nur Shelly-Format)
# TODO: Multi-Parser-Support für verschiedene Gerätetypen
self._parser = ShellyParser()
# Timeout worker thread
self._timeout_worker_running = False
@ -50,8 +181,8 @@ class IotBridgeService:
# Start timeout worker
self._start_timeout_worker()
# Restore detector states from running sessions
self._restore_detector_states()
# Note: Restart recovery is called from mqtt_connection._register_hook()
# to ensure zombie 'connected' states are cleaned before auto_start
def cleanup(self):
"""
@ -84,11 +215,26 @@ class IotBridgeService:
"""
Get singleton instance for this environment
SINGLETON-PATTERN: Pro Datenbank existiert nur EINE Service-Instanz.
Diese Methode ist der EINZIGE Weg, eine IotBridgeService-Instanz zu erhalten.
REGISTRY-RELOAD-DETECTION:
Wenn Odoo Module installiert/aktualisiert werden, wird das Registry neu geladen.
Diese Methode erkennt das und:
1. Räumt die alte Instanz auf (stoppt alle MQTT-Connections)
2. Erstellt eine neue Instanz mit dem neuen Registry
3. Startet alle vorher laufenden Connections neu
Args:
env: Odoo environment
env: Odoo environment (aus self.env in Models)
Returns:
IotBridgeService instance
IotBridgeService instance für diese Datenbank
Beispiel-Nutzung:
from ..services.iot_bridge_service import IotBridgeService
service = IotBridgeService.get_instance(self.env)
service.start_connection(connection_id)
"""
db_name = env.cr.dbname
@ -257,11 +403,9 @@ class IotBridgeService:
if client.connect():
self._clients[connection_id] = client
# Subscribe to device topics
if use_new_cursor:
self._subscribe_device_topics(connection_id)
else:
self._subscribe_device_topics_with_env(connection_id, env)
# NOTE: Subscription happens in _on_connect() callback!
# This ensures topics are subscribed both on initial connect AND on reconnects.
# We do NOT subscribe here to avoid duplicate subscriptions.
# Update connection state
def update_state():
@ -419,19 +563,52 @@ class IotBridgeService:
new_cr.commit()
# ========== MQTT Callbacks ==========
# Diese Methoden werden vom MqttClient aufgerufen (in separatem Thread!)
# Wichtig: Jeder DB-Zugriff benötigt new_cursor() für Thread-Safety
def _on_connect(self, connection_id: int):
"""Callback when MQTT client connects"""
"""
Callback: Wird aufgerufen wenn MQTT-Verbindung (neu) hergestellt wird
WICHTIG: Diese Methode wird in folgenden Fällen aufgerufen:
1. Beim initialen Connect (User klickt "Start")
2. Bei jedem automatischen Reconnect nach Verbindungsabbruch
3. Nach Container-Restart beim auto_start
TOPIC-SUBSCRIPTION:
Dies ist die EINZIGE Stelle wo Topics subscribed werden!
Warum? Um doppelte Subscriptions zu vermeiden, die zu duplizierten Messages führen.
THREAD-KONTEXT:
Diese Methode läuft im MQTT-Client-Thread, NICHT im Odoo-Main-Thread!
Deshalb: new_cursor() für jeden DB-Zugriff.
Args:
connection_id: ID des mqtt.connection Records
"""
_logger.info(f"MQTT connection {connection_id} established")
try:
# Update connection state in database
# Update connection state and subscribe to device topics
from odoo import fields
from odoo.api import Environment
with self.registry.cursor() as new_cr:
new_env = Environment(new_cr, SUPERUSER_ID, {})
connection = new_env['mqtt.connection'].browse(connection_id)
# Subscribe to all active device topics
client = self._clients.get(connection_id)
if client:
for device in connection.device_ids.filtered('active'):
try:
# The subscribe() method in MqttClient checks if already subscribed
client.subscribe(device.topic_pattern, qos=0)
_logger.info(f"Subscribed to device topic: {device.topic_pattern}")
except Exception as e:
_logger.error(f"Failed to subscribe to {device.topic_pattern}: {e}")
# Update connection state
connection.write({
'state': 'connected',
'last_connected': fields.Datetime.now(),
@ -442,7 +619,31 @@ class IotBridgeService:
_logger.error(f"Failed to update connection state: {e}", exc_info=True)
def _on_disconnect(self, connection_id: int, rc: int):
"""Callback when MQTT client disconnects"""
"""
Callback: Wird aufgerufen wenn MQTT-Verbindung getrennt wird
WANN WIRD DIESE METHODE AUFGERUFEN:
1. Bei manuellem Disconnect (User klickt "Stop")
2. Bei Netzwerk-/Broker-Problemen (automatischer Disconnect)
3. Bei Broker-Shutdown
WICHTIG: Wird NICHT aufgerufen bei:
- Container-Restart (Python-Prozess wird abrupt beendet)
- Odoo-Shutdown (keine Zeit für sauberen Disconnect)
Deshalb: Recovery nach Restart in _reset_connection_states_after_restart()
RECONNECT:
Wenn auto_reconnect=True, versucht MqttClient automatisch neu zu verbinden.
Dann wird _on_connect() wieder aufgerufen.
THREAD-KONTEXT:
Läuft im MQTT-Client-Thread new_cursor() für DB-Zugriff
Args:
connection_id: ID des mqtt.connection Records
rc: Return code (0 = clean disconnect, >0 = unexpected disconnect)
"""
_logger.warning(f"MQTT connection {connection_id} disconnected (rc={rc})")
try:
@ -468,7 +669,35 @@ class IotBridgeService:
_logger.error(f"Failed to update connection state: {e}", exc_info=True)
def _on_message(self, connection_id: int, topic: str, payload: str, qos: int, retain: bool):
"""Callback when MQTT message is received"""
"""
Callback: Wird aufgerufen wenn MQTT-Message empfangen wird
WICHTIGSTE METHODE! Hier passiert die gesamte Message-Verarbeitung:
ABLAUF:
1. Message in mqtt.message Tabelle speichern (für Logging/Debugging)
2. Passende mqtt.device finden via Topic-Pattern-Matching
3. Wenn Device gefunden:
a) Payload parsen (via ShellyParser oder anderen Parser)
b) Session Detection: Message an SessionDetector weiterleiten
c) SessionDetector analysiert ob Session läuft/startet/endet
THREAD-KONTEXT:
Diese Methode wird vom MQTT-Client-Thread aufgerufen, NICHT vom Odoo-Thread!
Jede Sekunde können dutzende Messages eintreffen Performance wichtig!
WARUM new_cursor():
- Odoo-Cursor sind nicht Thread-Safe
- Jeder DB-Zugriff aus diesem Thread braucht eigenen Cursor
- Cursor wird nach commit() automatisch geschlossen
Args:
connection_id: ID des mqtt.connection Records
topic: MQTT Topic (z.B. "shaperorigin/status/pm1:0")
payload: Message-Inhalt (meist JSON-String)
qos: Quality of Service (0, 1 oder 2)
retain: Retained-Flag vom Broker
"""
_logger.debug(f"Message received on connection {connection_id}, topic {topic}")
try:
@ -748,16 +977,25 @@ class IotBridgeService:
_logger.info(f"Restoring {len(running_sessions)} running session(s) for '{self.db_name}'")
# Group sessions by device to avoid creating multiple detectors per device
devices_restored = set()
for session in running_sessions:
try:
device = session.device_id
if not device or not device.exists():
continue
# Skip if we already restored this device
if device.id in devices_restored:
_logger.debug(f"Detector already restored for device {device.name}, skipping")
continue
# Create detector and restore state
detector = SessionDetector(device.id, device.name)
detector.restore_state_from_db(new_env)
self._detectors[device.id] = detector
devices_restored.add(device.id)
_logger.info(f"Restored detector for device {device.name} (state={detector.state})")
except Exception as e:
@ -766,4 +1004,109 @@ class IotBridgeService:
new_cr.commit()
except Exception as e:
_logger.error(f"Error restoring detector states: {e}", exc_info=True)
_logger.error(f"Error in _restore_detector_states: {e}", exc_info=True)
def _reset_connection_states_after_restart(self):
"""
Reset connection states after container restart.
After restart, connections show 'connected' in DB but real connections are gone.
"""
try:
with self.registry.cursor() as new_cr:
new_env = api.Environment(new_cr, SUPERUSER_ID, {})
Connection = new_env['mqtt.connection']
# Find connections with state='connected'
connected = Connection.search([('state', '=', 'connected')])
if connected:
_logger.info(f"Resetting {len(connected)} connection states after Odoo restart")
connected.write({'state': 'stopped'})
new_cr.commit()
except Exception as e:
_logger.error(f"Error in _reset_connection_states_after_restart: {e}", exc_info=True)
def _cleanup_stale_sessions_after_restart(self):
"""
Close running sessions after container restart.
Sessions are closed with end_reason='restart' and end_time=last_message_time
because the container crash interrupted them at that point.
Very old sessions (>session_cleanup_hours) are marked as 'timeout' instead,
as they were likely already dead before the restart.
"""
try:
from datetime import datetime, timedelta
with self.registry.cursor() as new_cr:
new_env = api.Environment(new_cr, SUPERUSER_ID, {})
Session = new_env['mqtt.session']
# Find all running sessions
running_sessions = Session.search([('status', '=', 'running')])
if not running_sessions:
return
cleaned_count = 0
timeout_count = 0
current_time = datetime.now()
for session in running_sessions:
# Get cleanup timeout from connection (default 24h if not set)
cleanup_hours = 24
if session.device_id and session.device_id.connection_id:
cleanup_hours = session.device_id.connection_id.session_cleanup_hours or 24
if session.last_message_time:
timeout_threshold = current_time - timedelta(hours=cleanup_hours)
time_diff = (current_time - session.last_message_time).total_seconds() / 3600
if session.last_message_time < timeout_threshold:
# Very old session - was likely dead before restart
_logger.warning(
f"Closing stale session {session.session_id} as timeout "
f"(last message {time_diff:.1f}h ago, threshold {cleanup_hours}h)"
)
session.write({
'status': 'completed',
'end_reason': 'timeout',
'end_time': session.last_message_time # End at last known activity
})
timeout_count += 1
else:
# Recent session - was active when container crashed
_logger.info(
f"Closing session {session.session_id} due to restart "
f"(last message {time_diff:.1f}h ago)"
)
session.write({
'status': 'completed',
'end_reason': 'restart', # True reason is container restart
'end_time': session.last_message_time # End at last known activity
})
cleaned_count += 1
else:
# No last_message_time - very old session
_logger.warning(
f"Closing session {session.session_id} as timeout "
f"(no last_message_time recorded)"
)
session.write({
'status': 'completed',
'end_reason': 'timeout',
'end_time': current_time # No better timestamp available
})
timeout_count += 1
if cleaned_count > 0 or timeout_count > 0:
_logger.info(
f"Closed {cleaned_count} session(s) due to restart, "
f"{timeout_count} stale session(s) as timeout"
)
new_cr.commit()
except Exception as e:
_logger.error(f"Error in _cleanup_stale_sessions_after_restart: {e}", exc_info=True)

View File

@ -182,6 +182,11 @@ class MqttClient:
Returns:
bool: True if subscription successful
"""
# Check if already subscribed to avoid duplicate subscriptions
if topic in self._subscriptions:
_logger.debug(f"Already subscribed to {topic}, skipping")
return True
if not self._connected:
_logger.warning(f"Cannot subscribe - not connected")
# Store subscription for later (will be restored on reconnect)
@ -269,17 +274,12 @@ class MqttClient:
_logger.info(f"Connected to MQTT broker: {self.host}:{self.port}")
# Restore subscriptions (state recovery)
if self._subscriptions:
_logger.info(f"Restoring {len(self._subscriptions)} subscriptions...")
for topic, qos in self._subscriptions.items():
try:
self._client.subscribe(topic, qos)
_logger.debug(f"Restored subscription: {topic}")
except Exception as e:
_logger.error(f"Failed to restore subscription {topic}: {e}")
# NOTE: We do NOT restore subscriptions here!
# Subscriptions are handled by the on_connect_callback in IotBridgeService._on_connect()
# which is called below. This avoids duplicate subscriptions.
# The _subscriptions dict is only used for tracking, not for restore.
# Call external callback
# Call external callback (this will subscribe to topics)
if self.on_connect_callback:
try:
self.on_connect_callback(self.connection_id)
@ -289,8 +289,8 @@ class MqttClient:
error_messages = {
1: 'Connection refused - incorrect protocol version',
2: 'Connection refused - invalid client identifier',
3: 'Connection refused - server unavailable',
4: 'Connection refused - bad username or password',
3: 'Connection refused - bad username or password',
4: 'Connection refused - not authorized',
5: 'Connection refused - not authorized',
}
error_msg = error_messages.get(rc, f'Connection refused - code {rc}')

View File

@ -0,0 +1,21 @@
#!/bin/bash
# Direct Odoo Test - runs inside container
set -e
DB="OWS_MQTT"
MODULE="open_workshop_mqtt"
echo "=== Running Odoo Tests for ${MODULE} in DB ${DB} ==="
python3 /usr/bin/odoo \
--addons-path=/mnt/extra-addons,/usr/lib/python3/dist-packages/odoo/addons \
-d "$DB" \
--db_host=hobbyhimmel_odoo_18-dev_db \
--db_port=5432 \
--db_user=odoo \
--db_password=odoo \
--test-enable \
--stop-after-init \
--load=base,web \
-u "$MODULE" \
--log-level=test

View File

@ -0,0 +1,6 @@
=== Checking containers ===
✓ Containers running: hobbyhimmel_odoo_18-dev + hobbyhimmel_odoo_18-dev_db
=== Waiting for database ===
✓ PostgreSQL ready
=== Running tests on OWS_MQTT (timeout: 120s) ===
(Output logged to: /home/lotzm/gitea.hobbyhimmel/odoo/extra-addons/open_workshop/open_workshop_mqtt/test_20260130_174133.log)

View File

@ -6,3 +6,4 @@ from . import test_device_status
from . import test_mqtt_mocked # Mock-basierte Tests
from . import test_topic_matching # Topic Pattern Matching Tests
from . import test_session_detector # Session Detector Unit Tests
from . import test_no_duplicate_messages # Duplicate Message Detection Tests

View File

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
from odoo.tests import TransactionCase, tagged
from unittest.mock import MagicMock, patch
import logging
_logger = logging.getLogger(__name__)
@tagged('post_install', '-at_install', 'open_workshop_mqtt')
class TestNoDuplicateMessages(TransactionCase):
"""Test that MQTT messages are not stored twice in the database"""
def setUp(self):
super().setUp()
# Create test connection
self.connection = self.env['mqtt.connection'].create({
'name': 'Test Duplicate Check',
'host': 'test.mosquitto.org',
'port': 1883,
'client_id': 'test_duplicate_client',
})
# Create test device
self.device = self.env['mqtt.device'].create({
'name': 'Test Device Duplicate',
'connection_id': self.connection.id,
'topic_pattern': 'test/duplicate/#',
'parser_type': 'shelly_pm',
'session_strategy': 'power_threshold',
})
def test_no_duplicate_messages_in_database(self):
"""
Test that there are no duplicate messages in the database.
Duplicates are defined as:
- Same device_id
- Same topic
- Same payload
- create_date within 100ms of each other
This test queries the actual database to find duplicates.
"""
_logger.info("Checking for duplicate messages in mqtt.message table...")
# SQL query to find duplicates within 100ms time window
self.env.cr.execute("""
WITH message_groups AS (
SELECT
id,
device_id,
topic,
payload,
create_date,
LAG(create_date) OVER (
PARTITION BY device_id, topic, payload
ORDER BY create_date
) as prev_create_date
FROM mqtt_message
WHERE device_id IS NOT NULL
)
SELECT
id,
device_id,
topic,
LEFT(payload, 80) as payload_preview,
create_date,
prev_create_date,
EXTRACT(EPOCH FROM (create_date - prev_create_date)) * 1000 as diff_ms
FROM message_groups
WHERE prev_create_date IS NOT NULL
AND create_date - prev_create_date < INTERVAL '100 milliseconds'
ORDER BY create_date DESC
LIMIT 20;
""")
duplicates = self.env.cr.fetchall()
if duplicates:
_logger.error(f"Found {len(duplicates)} duplicate message(s):")
for dup in duplicates:
msg_id, device_id, topic, payload_preview, create_date, prev_create_date, diff_ms = dup
_logger.error(
f" ID {msg_id}: device={device_id}, topic={topic}, "
f"time_diff={diff_ms:.1f}ms, payload={payload_preview}"
)
self.fail(
f"Found {len(duplicates)} duplicate message(s) in database! "
f"Messages with identical device/topic/payload within 100ms. "
f"This indicates the MQTT callback is being called multiple times."
)
else:
_logger.info("✓ No duplicate messages found in database")
def test_no_duplicate_messages_same_second(self):
"""
Test that there are no messages with identical content in the same second.
This is a stricter check that groups by second instead of milliseconds.
"""
_logger.info("Checking for duplicate messages within same second...")
self.env.cr.execute("""
SELECT
device_id,
topic,
LEFT(payload, 50) as payload_preview,
DATE_TRUNC('second', create_date) as second_bucket,
COUNT(*) as count,
MIN(create_date) as first_msg,
MAX(create_date) as last_msg
FROM mqtt_message
WHERE device_id IS NOT NULL
GROUP BY device_id, topic, payload, DATE_TRUNC('second', create_date)
HAVING COUNT(*) > 1
ORDER BY COUNT(*) DESC, second_bucket DESC
LIMIT 10;
""")
duplicates = self.env.cr.fetchall()
if duplicates:
_logger.error(f"Found {len(duplicates)} duplicate message group(s) in same second:")
for dup in duplicates:
device_id, topic, payload_preview, second_bucket, count, first_msg, last_msg = dup
time_diff = (last_msg - first_msg).total_seconds() * 1000
_logger.error(
f" Device {device_id}, topic={topic}, count={count}, "
f"time_spread={time_diff:.1f}ms, payload={payload_preview}"
)
self.fail(
f"Found {len(duplicates)} duplicate message group(s)! "
f"Multiple messages with identical device/topic/payload in same second."
)
else:
_logger.info("✓ No duplicate message groups found")
def test_subscription_not_duplicated(self):
"""
Test that MQTT topics are not subscribed multiple times.
This mocks the MQTT client to verify that subscribe() is called
exactly once per topic, not multiple times.
"""
_logger.info("Testing that subscriptions are not duplicated...")
from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService
from odoo.addons.open_workshop_mqtt.services.mqtt_client import MqttClient
# Track subscribe calls
subscribe_calls = []
# Mock MqttClient
with patch.object(MqttClient, 'connect', return_value=True):
with patch.object(MqttClient, 'subscribe') as mock_subscribe:
# Track all subscribe calls
def track_subscribe(topic, qos=0):
subscribe_calls.append({'topic': topic, 'qos': qos})
return True
mock_subscribe.side_effect = track_subscribe
# Start connection (this should trigger subscription)
service = IotBridgeService.get_instance(self.env)
service.start_connection_with_env(self.connection.id, self.env)
# Check that each topic was subscribed exactly once
topic_counts = {}
for call in subscribe_calls:
topic = call['topic']
topic_counts[topic] = topic_counts.get(topic, 0) + 1
duplicated_topics = {t: c for t, c in topic_counts.items() if c > 1}
if duplicated_topics:
_logger.error(f"Found duplicated subscriptions: {duplicated_topics}")
self.fail(
f"Topics subscribed multiple times: {duplicated_topics}. "
f"Each topic should only be subscribed once!"
)
else:
_logger.info(f"✓ All {len(topic_counts)} topic(s) subscribed exactly once")