WIP: MQTT Tests - Mocked approach created but needs better testing strategy

- Created test_mqtt_mocked.py with unittest.mock (following OCA patterns)
- Old tests with real MQTT broker hang in TransactionCase tearDown
- Created run-tests.sh following OCA/oca-ci best practices
- TODO: Find proper way to test MQTT with background threads in Odoo
- TODO: Either fully mock or use different test approach (not TransactionCase)
This commit is contained in:
Matthias Lotz 2026-01-25 10:15:52 +01:00
parent b6a0f0462d
commit 59539e0201
14 changed files with 921 additions and 90 deletions

View File

@ -135,7 +135,8 @@ class MqttConnection(models.Model):
@api.constrains('port')
def _check_port(self):
for connection in self:
if connection.port < 1 or connection.port > 65535:
port = int(connection.port) if connection.port else 0
if port < 1 or port > 65535:
raise ValidationError(_('Port must be between 1 and 65535'))
@api.constrains('reconnect_delay_min', 'reconnect_delay_max')
@ -350,17 +351,22 @@ class MqttConnection(models.Model):
@api.model
def auto_start_all_connections(self):
"""
Auto-start all connections that were connected before Odoo restart
Similar to doanmandev/odoo-iot-mqtt auto_start_all_listeners()
Auto-start all connections that were running before Odoo restart
Searches for connections with last_connected timestamp (not state='connected'!)
because state gets reset during restart
"""
try:
connections = self.search([('state', '=', 'connected')])
# Find connections that were running before restart (have last_connected timestamp)
connections = self.search([
('last_connected', '!=', False),
('state', 'in', ['stopped', 'connecting', 'error']) # Any non-connected state
])
if not connections:
_logger.info("No connections to auto-start")
_logger.info("No connections to auto-start (no previously connected connections found)")
return
_logger.info(f"Auto-starting {len(connections)} MQTT connections...")
_logger.info(f"Auto-starting {len(connections)} MQTT connections that were running before restart...")
for connection in connections:
try:

View File

@ -227,6 +227,32 @@ class MqttDevice(models.Model):
_('Device ID "%s" already exists for this connection') % device.device_id
)
# ========== CRUD Hooks ==========
def write(self, vals):
"""
Auto-subscribe when device is added to running connection
or when topic_pattern changes
"""
# Track changes that require re-subscription
needs_resubscribe = 'connection_id' in vals or 'topic_pattern' in vals or 'active' in vals
result = super().write(vals)
if needs_resubscribe:
# Import here to avoid circular dependency
from ..services.iot_bridge_service import IoTBridgeService
for device in self:
if device.active and device.connection_id.state == 'connected':
# Device is active and connection is running → subscribe
_logger.info(f"Auto-subscribing device {device.id} ({device.name}) to running connection")
IoTBridgeService.get_instance(self.env.registry, device.connection_id.database_name).subscribe_device(
device.connection_id.id,
device.id
)
return result
# ========== Default Values ==========
@api.onchange('session_strategy')
def _onchange_session_strategy(self):

View File

@ -4,6 +4,7 @@ from odoo import models, fields, api, _
from odoo.exceptions import ValidationError
import json
import logging
import uuid
_logger = logging.getLogger(__name__)
@ -20,7 +21,7 @@ class MqttSession(models.Model):
required=True,
readonly=True,
index=True,
default=lambda self: self.env['ir.sequence'].next_by_code('mqtt.session') or 'NEW',
default=lambda self: str(uuid.uuid4()),
help='Unique session identifier'
)
device_id = fields.Many2one(

View File

@ -259,7 +259,7 @@ def main():
parser = argparse.ArgumentParser(description="Shelly PM Mini G3 MQTT Simulator")
parser.add_argument(
"--scenario",
choices=["standby", "working", "session_end", "full_session", "timeout"],
choices=["standby", "working", "session_end", "full_session", "timeout", "loop"],
required=True,
help="Test-Szenario"
)
@ -286,14 +286,15 @@ def main():
# If not all available, try config.yaml
if not all(mqtt_config.values()):
config_path = Path(__file__).parent.parent.parent / "config.yaml"
print(f"Lade MQTT Credentials aus {config_path}...")
if config_path.exists():
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
mqtt_section = config.get('mqtt', {})
mqtt_config = {
'host': mqtt_section.get('broker_host'),
'port': mqtt_section.get('broker_port'),
'host': mqtt_section.get('host'),
'port': mqtt_section.get('port'),
'username': mqtt_section.get('username'),
'password': mqtt_section.get('password')
}
@ -350,6 +351,13 @@ def main():
simulator.scenario_full_session()
elif args.scenario == "timeout":
simulator.scenario_timeout()
elif args.scenario == "loop":
print("Starte Endlosschleife (STRG+C zum Beenden)...")
while True:
simulator.scenario_full_session()
else:
print(f"❌ Unbekanntes Szenario: {args.scenario}")
exit(1)
print("\n" + "=" * 60)
print("Test abgeschlossen. Prüfe sessions.json für Ergebnisse.")

68
open_workshop_mqtt/run-tests.sh Executable file
View File

@ -0,0 +1,68 @@
#!/bin/bash
# Odoo Module Tests - Following OCA best practices
# Based on https://github.com/OCA/oca-ci
set -e
MODULE="open_workshop_mqtt"
TIMEOUT=120
ODOO_DIR="/home/lotzm/gitea.hobbyhimmel/odoo/odoo"
LOG_FILE="/tmp/odoo_test_${MODULE}_$(date +%Y%m%d_%H%M%S).log"
cd "$ODOO_DIR"
echo "=== Starting test containers ==="
docker compose -f docker-compose.dev.yaml up -d db
docker compose -f docker-compose.dev.yaml up -d odoo-dev
echo "=== Waiting for database ==="
sleep 5
until docker compose -f docker-compose.dev.yaml exec -T db pg_isready -U odoo > /dev/null 2>&1; do
echo "Waiting for PostgreSQL..."
sleep 2
done
echo "=== Running tests (timeout: ${TIMEOUT}s) ==="
timeout "$TIMEOUT" docker compose -f docker-compose.dev.yaml exec -T odoo-dev \
/usr/bin/python3 /usr/bin/odoo \
-c /etc/odoo/odoo.conf \
--test-enable \
--stop-after-init \
-u "$MODULE" \
--log-level=test \
> "$LOG_FILE" 2>&1
EXIT_CODE=$?
# Handle timeout
if [ $EXIT_CODE -eq 124 ]; then
echo "✗ TIMEOUT after ${TIMEOUT}s"
docker compose -f docker-compose.dev.yaml kill odoo-dev
EXIT_CODE=1
fi
# Show results
echo ""
echo "=== Test Results ==="
if grep -q "Modules loaded" "$LOG_FILE"; then
# Show test summary
grep -A 20 "running tests" "$LOG_FILE" | tail -20 || echo "No test output found"
else
echo "✗ Module failed to load"
tail -50 "$LOG_FILE"
fi
echo ""
echo "Full log: $LOG_FILE"
# Cleanup
echo "=== Stopping containers ==="
docker compose -f docker-compose.dev.yaml stop odoo-dev
# Result
if [ $EXIT_CODE -eq 0 ]; then
echo "✓ PASSED"
else
echo "✗ FAILED (exit code: $EXIT_CODE)"
fi
exit $EXIT_CODE

View File

@ -39,7 +39,26 @@ class IotBridgeService:
self._running_lock = threading.Lock()
self._parser = ShellyParser() # For now only Shelly
_logger.info("IoT Bridge Service initialized")
_logger.info(f"IoT Bridge Service initialized for database '{self.db_name}'")
def cleanup(self):
"""
Cleanup all MQTT connections before registry reload
Called before instance is replaced
"""
_logger.info(f"Cleaning up IoT Bridge Service for '{self.db_name}' ({len(self._clients)} active connections)")
with self._running_lock:
for connection_id in list(self._clients.keys()):
try:
client = self._clients[connection_id]
_logger.info(f"Stopping connection {connection_id} before reload")
client.disconnect()
del self._clients[connection_id]
except Exception as e:
_logger.error(f"Error stopping connection {connection_id} during cleanup: {e}")
_logger.info(f"IoT Bridge Service cleanup completed for '{self.db_name}'")
@classmethod
def get_instance(cls, env) -> 'IotBridgeService':
@ -55,11 +74,62 @@ class IotBridgeService:
db_name = env.cr.dbname
with cls._lock:
# Check if registry has changed (reload happened)
registry_changed = False
if db_name in cls._instances:
old_instance = cls._instances[db_name]
if old_instance.registry != env.registry:
# Registry changed! Cleanup old instance
_logger.warning(f"Registry reload detected for '{db_name}' - cleaning up old instance")
old_instance.cleanup()
del cls._instances[db_name]
registry_changed = True
if db_name not in cls._instances:
cls._instances[db_name] = cls(env)
# If registry changed, restart connections
if registry_changed:
_logger.info(f"Restarting MQTT connections after registry reload")
cls._instances[db_name]._restart_connections_after_reload()
return cls._instances[db_name]
def _restart_connections_after_reload(self):
"""
Restart all connections that were running before registry reload
Called automatically after registry reload is detected
"""
try:
# Use fresh cursor
with self.registry.cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
# Find connections that should be running
Connection = env['mqtt.connection']
connections = Connection.search([
('last_connected', '!=', False),
])
if not connections:
_logger.info("No connections to restart after reload")
return
_logger.info(f"Restarting {len(connections)} MQTT connections after reload...")
for connection in connections:
try:
_logger.info(f"Restarting connection: {connection.name}")
self.start_connection(connection.id)
except Exception as e:
_logger.error(f"Error restarting connection {connection.name}: {e}")
cr.commit()
_logger.info(f"Successfully restarted {len(connections)} connections after reload")
except Exception as e:
_logger.error(f"Error in _restart_connections_after_reload: {e}", exc_info=True)
def start_connection(self, connection_id: int) -> bool:
"""
Start MQTT connection for given connection_id
@ -81,60 +151,102 @@ class IotBridgeService:
conn_data = None
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
# Load connection record
connection = env['mqtt.connection'].browse(connection_id)
if not connection.exists():
_logger.error(f"Connection {connection_id} not found")
return False
_logger.info(f"Starting MQTT connection: {connection.name}")
# Store connection data (don't keep record object outside cursor)
conn_data = {
'id': connection.id,
'host': connection.host,
'port': int(connection.port),
'client_id': connection.client_id,
'username': connection.username or None,
'password': connection.password or None,
'use_tls': connection.use_tls,
'verify_cert': connection.verify_cert,
'ca_cert_path': connection.ca_cert_path or None,
'auto_reconnect': connection.auto_reconnect,
'reconnect_delay_min': connection.reconnect_delay_min,
'reconnect_delay_max': connection.reconnect_delay_max,
}
new_cr.commit()
# Cursor is now closed - safe to start MQTT thread
# Create MQTT client with copied data
client = MqttClient(
connection_id=conn_data['id'],
host=conn_data['host'],
port=conn_data['port'],
client_id=conn_data['client_id'],
username=conn_data['username'],
password=conn_data['password'],
use_tls=conn_data['use_tls'],
verify_cert=conn_data['verify_cert'],
ca_cert_path=conn_data['ca_cert_path'],
auto_reconnect=conn_data['auto_reconnect'],
reconnect_delay_min=conn_data['reconnect_delay_min'],
reconnect_delay_max=conn_data['reconnect_delay_max'],
on_message_callback=self._on_message,
on_connect_callback=self._on_connect,
on_disconnect_callback=self._on_disconnect,
)
# Connect (will start background thread)
if client.connect():
self._clients[connection_id] = client
# Subscribe to device topics (needs fresh cursor)
self._subscribe_device_topics(connection_id)
# Update connection state (needs fresh cursor)
return self._start_connection_internal(connection_id, env, use_new_cursor=True)
except Exception as e:
_logger.error(f"Error starting connection {connection_id}: {e}", exc_info=True)
return False
def start_connection_with_env(self, connection_id: int, env) -> bool:
"""
Start MQTT connection using existing environment (for tests)
Args:
connection_id: ID of mqtt.connection record
env: Odoo environment to use
Returns:
bool: True if connection started successfully
"""
with self._running_lock:
# Check if already running
if connection_id in self._clients:
_logger.warning(f"Connection {connection_id} is already running")
return False
try:
return self._start_connection_internal(connection_id, env, use_new_cursor=False)
except Exception as e:
_logger.error(f"Error starting connection {connection_id}: {e}", exc_info=True)
return False
def _start_connection_internal(self, connection_id: int, env, use_new_cursor: bool) -> bool:
"""
Internal method to start MQTT connection
Args:
connection_id: ID of mqtt.connection record
env: Environment to use
use_new_cursor: Whether to use new cursors for updates
Returns:
bool: True if connection started successfully
"""
# Load connection record
connection = env['mqtt.connection'].browse(connection_id)
if not connection.exists():
_logger.error(f"Connection {connection_id} not found")
return False
_logger.info(f"Starting MQTT connection: {connection.name}")
# Store connection data
conn_data = {
'id': connection.id,
'host': connection.host,
'port': int(connection.port),
'client_id': connection.client_id,
'username': connection.username or None,
'password': connection.password or None,
'use_tls': connection.use_tls,
'verify_cert': connection.verify_cert,
'ca_cert_path': connection.ca_cert_path or None,
'auto_reconnect': connection.auto_reconnect,
'reconnect_delay_min': connection.reconnect_delay_min,
'reconnect_delay_max': connection.reconnect_delay_max,
}
# Create MQTT client
client = MqttClient(
connection_id=conn_data['id'],
host=conn_data['host'],
port=conn_data['port'],
client_id=conn_data['client_id'],
username=conn_data['username'],
password=conn_data['password'],
use_tls=conn_data['use_tls'],
verify_cert=conn_data['verify_cert'],
ca_cert_path=conn_data['ca_cert_path'],
auto_reconnect=conn_data['auto_reconnect'],
reconnect_delay_min=conn_data['reconnect_delay_min'],
reconnect_delay_max=conn_data['reconnect_delay_max'],
on_message_callback=self._on_message,
on_connect_callback=self._on_connect,
on_disconnect_callback=self._on_disconnect,
)
# Connect
if client.connect():
self._clients[connection_id] = client
# Subscribe to device topics
if use_new_cursor:
self._subscribe_device_topics(connection_id)
else:
self._subscribe_device_topics_with_env(connection_id, env)
# Update connection state
def update_state():
if use_new_cursor:
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
conn = env['mqtt.connection'].browse(connection_id)
@ -143,10 +255,19 @@ class IotBridgeService:
'last_error': False,
})
new_cr.commit()
return True
else:
_logger.error(f"Failed to connect client for connection {connection_id}")
connection.write({
'state': 'connecting',
'last_error': False,
})
update_state()
return True
else:
_logger.error(f"Failed to connect client for connection {connection_id}")
def update_error():
if use_new_cursor:
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
conn = env['mqtt.connection'].browse(connection_id)
@ -155,25 +276,33 @@ class IotBridgeService:
'last_error': 'Failed to initiate connection',
})
new_cr.commit()
return False
else:
connection.write({
'state': 'error',
'last_error': 'Failed to initiate connection',
})
update_error()
return False
def _subscribe_device_topics_with_env(self, connection_id, env):
"""Subscribe to device topics using existing environment"""
client = self._clients.get(connection_id)
if not client:
return
# Load devices
devices = env['mqtt.device'].search([
('connection_id', '=', connection_id),
('active', '=', True)
])
for device in devices:
try:
client.subscribe(device.topic_pattern)
_logger.info(f"Subscribed to topic: {device.topic_pattern}")
except Exception as e:
_logger.error(f"Error starting connection {connection_id}: {e}", exc_info=True)
# Update error state
try:
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
connection = env['mqtt.connection'].browse(connection_id)
connection.write({
'state': 'error',
'last_error': str(e),
})
new_cr.commit()
except Exception:
pass
return False
_logger.error(f"Failed to subscribe to {device.topic_pattern}: {e}")
def stop_connection(self, connection_id: int) -> bool:
"""
@ -240,6 +369,36 @@ class IotBridgeService:
new_cr.commit()
def subscribe_device(self, connection_id, device_id):
"""
Subscribe to a single device's topic (used when device is added to running connection)
Args:
connection_id: ID of mqtt.connection
device_id: ID of mqtt.device to subscribe
"""
client = self._clients.get(connection_id)
if not client:
_logger.warning(f"Cannot subscribe device {device_id}: Connection {connection_id} not running")
return
# Use fresh cursor to load device
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
device = env['mqtt.device'].browse(device_id)
if not device.exists() or not device.active:
_logger.warning(f"Device {device_id} does not exist or is inactive")
return
try:
client.subscribe(device.topic_pattern, qos=0)
_logger.info(f"Auto-subscribed device {device_id} ({device.name}) to topic: {device.topic_pattern}")
except Exception as e:
_logger.error(f"Failed to auto-subscribe device {device_id} to {device.topic_pattern}: {e}")
new_cr.commit()
# ========== MQTT Callbacks ==========
def _on_connect(self, connection_id: int):
@ -393,6 +552,12 @@ class IotBridgeService:
if power is None:
return
# Update device status
device.write({
'last_message_time': datetime.now(),
'last_power_w': power,
})
# Find running session
running_session = env['mqtt.session'].search([
('device_id', '=', device.id),
@ -413,12 +578,17 @@ class IotBridgeService:
else:
# Device is off
if running_session:
# End session
# End session - calculate duration
end_time = datetime.now()
duration_s = int((end_time - running_session.start_time).total_seconds())
running_session.write({
'status': 'completed',
'end_time': datetime.now(),
'end_time': end_time,
'end_power_w': power,
'total_duration_s': duration_s,
'end_reason': 'power_drop',
})
_logger.info(f"🔴 Session ended for {device.name}")
_logger.info(f"🔴 Session ended for {device.name} (duration: {duration_s}s)")
except Exception as e:
_logger.debug(f"Session processing error: {e}")

View File

@ -150,17 +150,25 @@ class MqttClient:
_logger.info(f"Disconnecting MQTT client {self.connection_id}")
self._running = False
self._connected = False
if self._client:
try:
# Stop loop first (non-blocking)
self._client.loop_stop()
# Disconnect with short timeout to avoid hanging
self._client.disconnect()
# Give it a moment but don't wait forever
import time
time.sleep(0.1)
except Exception as e:
_logger.error(f"Error during disconnect: {e}")
finally:
self._client = None
self._connected = False
_logger.info(f"MQTT client {self.connection_id} disconnected")
def subscribe(self, topic: str, qos: int = 0) -> bool:

View File

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
from . import test_mqtt_connection
from . import test_session_detection
from . import test_device_status

View File

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
"""
Common test utilities and base classes
Uses REAL MQTT Broker (like python_prototype tests)
"""
from odoo.tests import TransactionCase
import logging
import yaml
from pathlib import Path
_logger = logging.getLogger(__name__)
class MQTTTestCase(TransactionCase):
"""
Base test case for MQTT module
Uses REAL MQTT connection to test.mosquitto.org or configured broker
"""
@classmethod
def setUpClass(cls):
super().setUpClass()
# Load MQTT config from python_prototype/config.yaml
config_path = Path(__file__).parent.parent / 'python_prototype' / 'config.yaml'
if config_path.exists():
with open(config_path) as f:
config = yaml.safe_load(f)
mqtt_conf = config.get('mqtt', {})
else:
# Fallback: public test broker
mqtt_conf = {
'host': 'test.mosquitto.org',
'port': 1883,
'username': None,
'password': None,
}
# Create test connection with REAL broker
cls.connection = cls.env['mqtt.connection'].create({
'name': 'Test MQTT Broker (Real)',
'host': mqtt_conf.get('host', 'test.mosquitto.org'),
'port': mqtt_conf.get('port', 1883),
'client_id': 'odoo_test_client',
'username': mqtt_conf.get('username', False),
'password': mqtt_conf.get('password', False),
'use_tls': mqtt_conf.get('port') == 8883,
})
# Create test device with unique topic
import time
test_topic = f'odootest/{int(time.time())}'
cls.device = cls.env['mqtt.device'].create({
'name': 'Test Device (Real)',
'connection_id': cls.connection.id,
'topic_pattern': f'{test_topic}/#',
'parser_type': 'shelly_pm',
'session_strategy': 'power_threshold',
'strategy_config': '{"standby_threshold_w": 10, "working_threshold_w": 30}',
})
cls.test_topic = test_topic
_logger.info(f"Test setup complete. Using broker: {mqtt_conf.get('host')}:{mqtt_conf.get('port')}")
_logger.info(f"Test topic: {test_topic}")
def tearDown(self):
"""Cleanup after each test - ensure all connections are stopped"""
super().tearDown()
# Force stop any running connections
from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService
try:
service = IotBridgeService.get_instance(self.env)
if self.connection.id in service._clients:
service.stop_connection(self.connection.id)
_logger.info(f"Cleaned up connection {self.connection.id} in tearDown")
except Exception as e:
_logger.warning(f"Error in tearDown cleanup: {e}")
def start_connection(self):
"""Helper to start MQTT connection"""
# Bypass the ORM's action to directly start via service
from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService
service = IotBridgeService.get_instance(self.env)
# Start with existing env (not new cursor)
success = service.start_connection_with_env(self.connection.id, self.env)
self.assertTrue(success, "Failed to start connection")
# Wait for MQTT client to actually connect (check client state, not DB)
import time
client = service._clients.get(self.connection.id)
self.assertIsNotNone(client, "Client not found in service")
for i in range(10):
if client.is_connected:
break
time.sleep(0.5)
self.assertTrue(client.is_connected, "Client failed to connect within timeout")
def stop_connection(self):
"""Helper to stop MQTT connection"""
from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService
service = IotBridgeService.get_instance(self.env)
# Stop connection
success = service.stop_connection(self.connection.id)
self.assertTrue(success, "Failed to stop connection")
# Verify client is removed
import time
time.sleep(0.5)
client = service._clients.get(self.connection.id)
self.assertIsNone(client, "Client still in service after stop")
def publish_test_message(self, subtopic, payload):
"""
Publish message to test topic using paho-mqtt
Args:
subtopic: Subtopic (e.g., 'status/pm1:0')
payload: Message payload (dict or string)
"""
import paho.mqtt.publish as publish
import json
topic = f'{self.test_topic}/{subtopic}'
payload_str = json.dumps(payload) if isinstance(payload, dict) else payload
# Get connection config
auth = None
if self.connection.username:
auth = {
'username': self.connection.username,
'password': self.connection.password or '',
}
# TLS config
tls = None
if self.connection.use_tls:
import ssl
tls = {
'cert_reqs': ssl.CERT_REQUIRED if self.connection.verify_cert else ssl.CERT_NONE
}
publish.single(
topic,
payload=payload_str,
hostname=self.connection.host,
port=int(self.connection.port),
auth=auth,
tls=tls,
)
_logger.info(f"Published test message to {topic}")
def simulate_mqtt_message(self, subtopic, payload):
"""Alias for publish_test_message for compatibility"""
self.publish_test_message(subtopic, payload)

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
"""
Test Device Status Updates
"""
from odoo.tests import tagged
from .common import MQTTTestCase
from datetime import datetime
import json
@tagged('post_install', '-at_install', 'mqtt')
class TestDeviceStatus(MQTTTestCase):
"""Test device status tracking (online/offline)"""
def test_01_device_updates_last_message_time(self):
"""Test device last_message_time is updated on message"""
self.assertFalse(self.device.last_message_time)
# Simulate message
payload = json.dumps({"id": 0, "apower": 25.0})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
self.device.invalidate_recordset()
# Verify updated
self.assertTrue(self.device.last_message_time)
self.assertEqual(self.device.last_power_w, 25.0)
def test_02_device_state_computed_from_last_message(self):
"""Test device state is computed based on last message time"""
from datetime import timedelta
# Recent message → online
recent_time = datetime.now() - timedelta(seconds=30)
self.device.write({'last_message_time': recent_time})
self.device.invalidate_recordset(['state'])
# State should be computed as online
# Note: depends on _compute_state() implementation
# Old message → offline
old_time = datetime.now() - timedelta(minutes=5)
self.device.write({'last_message_time': old_time})
self.device.invalidate_recordset(['state'])
# State should be computed as offline
def test_03_device_power_tracking(self):
"""Test device tracks current power consumption"""
# Send different power values
for power in [10.0, 25.0, 50.0, 100.0, 0.0]:
payload = json.dumps({"id": 0, "apower": power})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
self.device.invalidate_recordset()
self.assertEqual(self.device.last_power_w, power)
def test_04_device_session_count(self):
"""Test device session count is computed"""
# Create sessions
for i in range(3):
self.env['mqtt.session'].create({
'device_id': self.device.id,
'status': 'completed',
'start_time': datetime.now(),
'end_time': datetime.now(),
'total_duration_s': 3600,
})
# Running session
self.env['mqtt.session'].create({
'device_id': self.device.id,
'status': 'running',
'start_time': datetime.now(),
})
self.device.invalidate_recordset()
self.assertEqual(self.device.session_count, 4)
self.assertEqual(self.device.running_session_count, 1)
self.assertEqual(self.device.total_runtime_hours, 3.0)

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
"""
Test MQTT Connection Lifecycle with REAL broker
"""
from odoo.tests import tagged
from .common import MQTTTestCase
import time
@tagged('post_install', '-at_install', 'mqtt')
class TestMQTTConnection(MQTTTestCase):
"""Test MQTT connection start/stop/restart with REAL broker"""
def test_01_connection_start_real_broker(self):
"""Test starting connection to REAL MQTT broker"""
# Start connection (internally checks client.is_connected)
self.start_connection()
# Connection is established - tearDown will clean up
def test_02_connection_stop_real_broker(self):
"""Test stopping active MQTT connection"""
# Start first
self.start_connection()
# Explicitly stop (test the stop function)
self.stop_connection()
# Verify stopped
from odoo.addons.open_workshop_mqtt.services.iot_bridge_service import IotBridgeService
service = IotBridgeService.get_instance(self.env)
client = service._clients.get(self.connection.id)
self.assertIsNone(client, "Client should be removed after stop")
def test_03_publish_and_receive_message(self):
"""Test publishing message and receiving it in Odoo"""
# Start connection
self.start_connection()
# Wait for subscription
time.sleep(2)
# Publish test message
test_payload = {
"id": 0,
"voltage": 230.0,
"current": 0.5,
"apower": 50.0,
"freq": 50.0,
}
self.publish_test_message('status/pm1:0', test_payload)
# Wait for message to arrive
time.sleep(3)
# Check if message was received
messages = self.env['mqtt.message'].search([
('device_id', '=', self.device.id),
('topic', '=', f'{self.test_topic}/status/pm1:0'),
])
self.assertGreater(len(messages), 0, "No messages received from broker!")
# tearDown will clean up connection

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""
Test suite mit gemocktem MQTT Client (Unit Tests)
Folgt Odoo Best Practices - siehe microsoft_outlook, payment_mercado_pago
"""
from unittest.mock import Mock, patch, call
from odoo.tests.common import TransactionCase
from odoo.tests import tagged
@tagged('post_install', '-at_install')
class TestMQTTConnectionMocked(TransactionCase):
"""Unit Tests mit gemocktem MQTT Client - kein echter Broker nötig"""
@classmethod
def setUpClass(cls):
super().setUpClass()
# Create test device
cls.device = cls.env['iot_bridge.device'].create({
'name': 'Test Device Mocked',
'device_id': 'test-device-mocked-001',
'status': 'offline',
})
# Setup mock MQTT client
cls.mqtt_patcher = patch('odoo.addons.open_workshop.open_workshop_mqtt.services.mqtt_client.mqtt.Client')
cls.MockClient = cls.mqtt_patcher.start()
# Create mock instance
cls.mqtt_client_mock = Mock()
cls.MockClient.return_value = cls.mqtt_client_mock
# Setup successful responses
cls.mqtt_client_mock.connect.return_value = 0 # MQTT_ERR_SUCCESS
cls.mqtt_client_mock.loop_start.return_value = None
cls.mqtt_client_mock.subscribe.return_value = (0, 1)
cls.mqtt_client_mock.publish.return_value = Mock(rc=0, mid=1)
cls.mqtt_client_mock.is_connected.return_value = True
cls.mqtt_client_mock.disconnect.return_value = 0
cls.mqtt_client_mock.loop_stop.return_value = None
# Get service
cls.service = cls.env['iot_bridge.service']
@classmethod
def tearDownClass(cls):
cls.mqtt_patcher.stop()
super().tearDownClass()
def setUp(self):
super().setUp()
self.mqtt_client_mock.reset_mock()
def test_01_start_connection_calls_mqtt_methods(self):
"""Test dass start_connection die richtigen MQTT Methoden aufruft"""
# Start connection
result = self.service.start_connection_with_env(self.device.env)
self.assertTrue(result, "Connection should start")
# Verify calls
self.mqtt_client_mock.connect.assert_called_once()
self.mqtt_client_mock.loop_start.assert_called_once()
# Check connect args
connect_call = self.mqtt_client_mock.connect.call_args
host, port = connect_call[0][0], connect_call[0][1]
self.assertEqual(host, 'mqtt.majufilo.eu')
self.assertEqual(port, 8883)
def test_02_stop_connection_calls_disconnect(self):
"""Test dass stop_connection disconnect/loop_stop aufruft"""
# Start
self.service.start_connection_with_env(self.device.env)
self.mqtt_client_mock.reset_mock()
# Stop
self.service.stop_connection()
# Verify
self.mqtt_client_mock.loop_stop.assert_called_once()
self.mqtt_client_mock.disconnect.assert_called_once()
def test_03_reconnect_after_disconnect(self):
"""Test Reconnect nach Disconnect"""
# Connect -> Disconnect -> Connect
self.service.start_connection_with_env(self.device.env)
self.service.stop_connection()
self.mqtt_client_mock.reset_mock()
result = self.service.start_connection_with_env(self.device.env)
self.assertTrue(result)
self.mqtt_client_mock.connect.assert_called_once()
def test_04_on_connect_subscribes_topics(self):
"""Test dass on_connect callback Topics subscribed"""
# Start
self.service.start_connection_with_env(self.device.env)
# Trigger on_connect
self.service._mqtt_client.on_connect(None, None, None, 0)
# Check subscribes
self.assertTrue(self.mqtt_client_mock.subscribe.called)
# Get all subscribed topics
subscribe_calls = self.mqtt_client_mock.subscribe.call_args_list
topics = [c[0][0] for c in subscribe_calls]
# Should subscribe to device topic
device_topic = f"iot/devices/{self.device.device_id}/status"
self.assertIn(device_topic, topics)

View File

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
"""
Test Session Detection Logic
"""
from odoo.tests import tagged
from .common import MQTTTestCase
from datetime import datetime
import json
@tagged('post_install', '-at_install', 'mqtt')
class TestSessionDetection(MQTTTestCase):
"""Test session start/stop detection"""
def test_01_session_starts_on_power_above_threshold(self):
"""Test session starts when power > 0"""
# Setup: connection running
self.connection.write({'state': 'connected'})
# Simulate MQTT message with power > 0
payload = json.dumps({
"id": 0,
"voltage": 230.0,
"current": 0.5,
"apower": 50.0, # Above threshold
"freq": 50.0,
})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
# Verify session created
session = self.env['mqtt.session'].search([
('device_id', '=', self.device.id),
('status', '=', 'running'),
])
self.assertEqual(len(session), 1)
self.assertEqual(session.start_power_w, 50.0)
self.assertTrue(session.session_id) # UUID generated
def test_02_session_ends_on_power_zero(self):
"""Test session ends when power drops to 0"""
# Setup: create running session
session = self.env['mqtt.session'].create({
'device_id': self.device.id,
'status': 'running',
'start_time': datetime.now(),
'start_power_w': 50.0,
})
# Simulate power drop
payload = json.dumps({
"id": 0,
"voltage": 230.0,
"current": 0.0,
"apower": 0.0, # Power off
"freq": 50.0,
})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
# Verify session completed
session.invalidate_recordset()
self.assertEqual(session.status, 'completed')
self.assertTrue(session.end_time)
self.assertEqual(session.end_reason, 'power_drop')
self.assertGreater(session.total_duration_s, 0)
def test_03_no_duplicate_sessions(self):
"""Test no duplicate running sessions are created"""
# Create first session
payload = json.dumps({"id": 0, "apower": 50.0})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
# Send another message with power > 0
payload = json.dumps({"id": 0, "apower": 60.0})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
# Should still be only 1 running session
sessions = self.env['mqtt.session'].search([
('device_id', '=', self.device.id),
('status', '=', 'running'),
])
self.assertEqual(len(sessions), 1)
def test_04_session_duration_calculated(self):
"""Test session duration is calculated correctly"""
from datetime import timedelta
start_time = datetime.now() - timedelta(hours=2, minutes=30)
session = self.env['mqtt.session'].create({
'device_id': self.device.id,
'status': 'running',
'start_time': start_time,
'start_power_w': 50.0,
})
# End session
payload = json.dumps({"id": 0, "apower": 0.0})
self.simulate_mqtt_message('testdevice/status/pm1:0', payload)
session.invalidate_recordset()
# Duration should be ~2.5 hours = 9000 seconds
self.assertGreater(session.total_duration_s, 9000)
self.assertLess(session.total_duration_s, 9100) # Allow 100s tolerance
# Check computed fields
self.assertAlmostEqual(session.duration_hours, 2.5, places=1)
self.assertIn('h', session.duration_formatted)

View File

@ -81,6 +81,7 @@
<group>
<group string="Device">
<field name="id" readonly="1" string="Device ID"/>
<field name="name"/>
<field name="active"/>
<field name="connection_id"/>