feat: MQTT integration - auto-start, session detection, UI fixes

- Auto-start connections after Odoo restart via _register_hook()
- Start/Stop buttons with invalidate_recordset() and reload action
- Session detection with ShellyParser (RPC, telemetry, status)
- Fixed imports: datetime, ShellyParser
- Parser matches prototype with device_id, timestamp extraction
- Sessions created/ended based on power > 0 threshold
This commit is contained in:
Matthias Lotz 2026-01-24 23:40:03 +01:00
parent 5fcaef0336
commit b6a0f0462d
20 changed files with 3191 additions and 0 deletions

View File

@ -0,0 +1,94 @@
# Open Workshop MQTT
MQTT IoT Device Integration for Odoo 18
## Features
- ✅ **MQTT Broker Connection** - Connect to external MQTT brokers (Mosquitto, etc.)
- ✅ **Device Management** - Configure and monitor IoT devices
- ✅ **Session Tracking** - Automatic runtime session detection
- ✅ **Flexible Parsers** - Support for Shelly PM Mini G3, Tasmota, Generic JSON
- ✅ **Session Strategies** - Power threshold, Last Will Testament, Manual control
- ✅ **Analytics** - Pivot tables and graphs for runtime analysis
- ✅ **Auto-Reconnect** - Exponential backoff on connection loss
- ✅ **Message Logging** - Debug log for MQTT messages
## Installation
1. Install Python dependencies:
```bash
pip install paho-mqtt
```
2. Install the module in Odoo:
- Apps → Update Apps List
- Search for "Open Workshop MQTT"
- Click Install
## Quick Start
1. **Create MQTT Connection**
- MQTT → Connections → Create
- Enter broker details (host, port, credentials)
- Click "Test Connection" then "Start"
2. **Add Device**
- MQTT → Devices → Create
- Select connection
- Configure device ID and topic pattern
- Choose parser type (Shelly, Tasmota, etc.)
- Set session detection strategy
3. **Monitor Sessions**
- MQTT → Sessions
- View runtime analytics in Pivot/Graph views
## Session Detection Strategies
### Power Threshold (Recommended)
- Dual threshold detection (standby/working)
- Configurable debounce timers
- Timeout detection
- Reference: `python_prototype/session_detector.py`
### Last Will Testament
- Uses MQTT LWT for offline detection
- Immediate session end on device disconnect
### Manual
- Start/stop sessions via buttons or MQTT commands
## Configuration Example
**Shelly PM Mini G3:**
```json
{
"standby_threshold_w": 20,
"working_threshold_w": 100,
"start_debounce_s": 3,
"stop_debounce_s": 15,
"message_timeout_s": 20
}
```
## Optional: Maintenance Integration
Install `open_workshop_mqtt_maintenance` (separate module) to link MQTT devices to `maintenance.equipment`.
## Technical Reference
See `python_prototype/` directory for:
- Implementation reference
- Test suite (pytest)
- Session detection logic
- MQTT client implementation
## Support
- Documentation: See `python_prototype/README.md`
- Issues: GitHub Issues
- Tests: `pytest python_prototype/tests/ -v`
## License
LGPL-3

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from . import models

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
{
'name': 'Open Workshop MQTT',
'version': '18.0.1.0.0',
'category': 'IoT',
'summary': 'MQTT IoT Device Integration for Workshop Equipment Tracking',
'description': """
Open Workshop MQTT Module
=========================
Connect MQTT IoT devices (Shelly, Tasmota, etc.) to Odoo and track:
- Device runtime sessions
- Power consumption
- Machine usage analytics
Features:
- Flexible device parsers (Shelly PM Mini G3, Tasmota, Generic)
- Multiple session detection strategies (Power threshold, Last Will, Manual)
- Auto-reconnect with exponential backoff
- State recovery after restart
- Real-time device monitoring
Optional Integration:
- Connect devices to maintenance.equipment (via open_workshop_mqtt_maintenance)
""",
'author': 'Open Workshop',
'website': 'https://github.com/your-repo/open_workshop',
'license': 'LGPL-3',
'depends': [
'base',
],
'external_dependencies': {
'python': [
'paho-mqtt', # pip install paho-mqtt
],
},
'data': [
'security/ir.model.access.csv',
# Actions first (so other views can reference them)
'views/mqtt_device_views.xml',
'views/mqtt_session_views.xml',
'views/mqtt_message_views.xml',
# Then views that reference actions
'views/mqtt_connection_views.xml',
# Menus last (need all actions to exist)
'views/mqtt_menus.xml',
],
'demo': [],
'installable': True,
'application': True,
'auto_install': False,
}

View File

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from . import mqtt_connection
from . import mqtt_device
from . import mqtt_session
from . import mqtt_message

View File

@ -0,0 +1,390 @@
# -*- coding: utf-8 -*-
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
import logging
_logger = logging.getLogger(__name__)
class MqttConnection(models.Model):
_name = 'mqtt.connection'
_description = 'MQTT Broker Connection'
_rec_name = 'name'
_order = 'name'
# ========== Basic Fields ==========
name = fields.Char(
string='Name',
required=True,
default='MQTT Broker',
help='Connection name for identification'
)
active = fields.Boolean(
default=True,
help='Deactivate to disable this connection'
)
# ========== Connection Configuration ==========
host = fields.Char(
string='Broker Host',
required=True,
default='localhost',
help='MQTT Broker hostname or IP address (without protocol prefix like mqtt://)'
)
port = fields.Char(
string='Broker Port',
required=True,
default='8883',
help='MQTT Broker port (1883 for non-TLS, 8883 for TLS)'
)
username = fields.Char(
string='Username',
help='MQTT authentication username'
)
password = fields.Char(
string='Password',
help='MQTT authentication password'
)
use_tls = fields.Boolean(
string='Use TLS/SSL',
default=True,
help='Enable TLS/SSL encryption (recommended)'
)
verify_cert = fields.Boolean(
string='Verify Certificate',
default=False,
help='Verify SSL certificate (disable for self-signed certificates)'
)
ca_cert_path = fields.Char(
string='CA Certificate Path',
help='Optional: Path to custom CA certificate file'
)
client_id = fields.Char(
string='Client ID',
default='odoo_mqtt',
required=True,
help='Unique MQTT client identifier'
)
# ========== Auto-Reconnect Configuration ==========
auto_reconnect = fields.Boolean(
string='Auto Reconnect',
default=True,
help='Automatically reconnect on connection loss'
)
reconnect_delay_min = fields.Integer(
string='Min Reconnect Delay (s)',
default=1,
help='Initial reconnect delay in seconds'
)
reconnect_delay_max = fields.Integer(
string='Max Reconnect Delay (s)',
default=60,
help='Maximum reconnect delay (exponential backoff)'
)
# ========== Connection Status (Runtime) ==========
state = fields.Selection([
('stopped', 'Stopped'),
('connecting', 'Connecting...'),
('connected', 'Connected'),
('error', 'Error')
], string='Status', default='stopped', readonly=True,
help='Current connection status')
last_connected = fields.Datetime(
string='Last Connected',
readonly=True,
help='Timestamp of last successful connection'
)
last_error = fields.Text(
string='Last Error',
readonly=True,
help='Last connection error message'
)
# ========== Relations ==========
device_ids = fields.One2many(
'mqtt.device',
'connection_id',
string='Devices',
help='MQTT devices using this connection'
)
# ========== Computed Fields ==========
device_count = fields.Integer(
string='Device Count',
compute='_compute_counts',
store=True
)
active_device_count = fields.Integer(
string='Active Devices',
compute='_compute_counts',
store=True
)
# ========== Compute Methods ==========
@api.depends('device_ids', 'device_ids.active')
def _compute_counts(self):
for connection in self:
connection.device_count = len(connection.device_ids)
connection.active_device_count = len(connection.device_ids.filtered('active'))
# ========== Constraints ==========
@api.constrains('port')
def _check_port(self):
for connection in self:
if connection.port < 1 or connection.port > 65535:
raise ValidationError(_('Port must be between 1 and 65535'))
@api.constrains('reconnect_delay_min', 'reconnect_delay_max')
def _check_reconnect_delays(self):
for connection in self:
if connection.reconnect_delay_min < 1:
raise ValidationError(_('Minimum reconnect delay must be at least 1 second'))
if connection.reconnect_delay_max < connection.reconnect_delay_min:
raise ValidationError(_('Maximum delay must be greater than minimum delay'))
# ========== Action Methods ==========
def action_start(self):
"""Start MQTT Bridge Service for this connection"""
self.ensure_one()
if self.state == 'connected':
raise UserError(_('Connection is already running'))
try:
from ..services.iot_bridge_service import IotBridgeService
_logger.info(f"Starting MQTT connection: {self.name}")
# Get service instance
service = IotBridgeService.get_instance(self.env)
# Start connection
if service.start_connection(self.id):
# Invalidate cache to force refresh
self.invalidate_recordset(['state', 'last_connected', 'last_error'])
# Reload current view
return {
'type': 'ir.actions.client',
'tag': 'reload',
}
else:
raise UserError(_('Failed to start connection - check logs for details'))
except Exception as e:
_logger.error(f"Failed to start MQTT connection: {e}", exc_info=True)
self.write({
'state': 'error',
'last_error': str(e),
})
raise UserError(_('Failed to start connection: %s') % str(e))
def action_stop(self):
"""Stop MQTT Bridge Service for this connection"""
self.ensure_one()
if self.state == 'stopped':
raise UserError(_('Connection is already stopped'))
try:
from ..services.iot_bridge_service import IotBridgeService
_logger.info(f"Stopping MQTT connection: {self.name}")
# Get service instance
service = IotBridgeService.get_instance(self.env)
# Stop connection
service.stop_connection(self.id)
# Invalidate cache to force refresh
self.invalidate_recordset(['state', 'last_connected', 'last_error'])
# Reload current view
return {
'type': 'ir.actions.client',
'tag': 'reload',
}
except Exception as e:
_logger.error(f"Failed to stop MQTT connection: {e}", exc_info=True)
raise UserError(_('Failed to stop connection: %s') % str(e))
def action_test_connection(self):
"""Test MQTT connection"""
self.ensure_one()
try:
import paho.mqtt.client as mqtt
import ssl
import time
_logger.info(f"Testing MQTT connection: {self.name} ({self.host}:{self.port})")
# Connection test result
test_result = {'connected': False, 'error': None}
def on_connect(client, userdata, flags, rc, properties=None):
"""Callback when connection is established"""
if rc == 0:
test_result['connected'] = True
_logger.info(f"Test connection successful: {self.name}")
else:
error_messages = {
1: 'Connection refused - incorrect protocol version',
2: 'Connection refused - invalid client identifier',
3: 'Connection refused - server unavailable',
4: 'Connection refused - bad username or password',
5: 'Connection refused - not authorized',
}
test_result['error'] = error_messages.get(rc, f'Connection refused - code {rc}')
_logger.error(f"Test connection failed: {test_result['error']}")
# Create test client
client = mqtt.Client(
client_id=f"{self.client_id}_test_{int(time.time())}",
protocol=mqtt.MQTTv5
)
# Set callbacks
client.on_connect = on_connect
# Configure authentication
if self.username:
client.username_pw_set(self.username, self.password or '')
# Configure TLS/SSL
if self.use_tls:
tls_context = ssl.create_default_context()
# Handle certificate verification
if not self.verify_cert:
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE
_logger.warning(f"SSL certificate verification disabled for {self.name}")
# Load custom CA certificate if provided
if self.ca_cert_path:
try:
tls_context.load_verify_locations(cafile=self.ca_cert_path)
_logger.info(f"Loaded CA certificate from {self.ca_cert_path}")
except Exception as ca_error:
raise UserError(_(f'Failed to load CA certificate: {ca_error}'))
client.tls_set_context(tls_context)
# Attempt connection
try:
port_int = int(self.port)
except ValueError:
raise UserError(_(f'Invalid port number: {self.port}'))
client.connect(self.host, port_int, keepalive=60)
# Run loop for 5 seconds to allow connection
client.loop_start()
# Wait for connection result (max 5 seconds)
timeout = 5
start_time = time.time()
while not test_result['connected'] and not test_result['error']:
if time.time() - start_time > timeout:
test_result['error'] = 'Connection timeout - broker not responding'
break
time.sleep(0.1)
# Cleanup
client.loop_stop()
client.disconnect()
# Return result
if test_result['connected']:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Connection Test Successful'),
'message': _('Successfully connected to MQTT broker %s:%s') % (self.host, self.port),
'type': 'success',
'sticky': False,
}
}
else:
error_msg = test_result.get('error', 'Unknown connection error')
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Connection Test Failed'),
'message': error_msg,
'type': 'danger',
'sticky': True,
}
}
except Exception as e:
_logger.error(f"Connection test failed: {e}", exc_info=True)
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Connection Test Failed'),
'message': str(e),
'type': 'danger',
'sticky': True,
}
}
# ========== Auto-Start on Odoo Restart ==========
@api.model
def _register_hook(self):
"""Auto-start all connected connections when Odoo starts"""
res = super()._register_hook()
self.auto_start_all_connections()
return res
@api.model
def auto_start_all_connections(self):
"""
Auto-start all connections that were connected before Odoo restart
Similar to doanmandev/odoo-iot-mqtt auto_start_all_listeners()
"""
try:
connections = self.search([('state', '=', 'connected')])
if not connections:
_logger.info("No connections to auto-start")
return
_logger.info(f"Auto-starting {len(connections)} MQTT connections...")
for connection in connections:
try:
_logger.info(f"Auto-starting connection: {connection.name}")
from ..services.iot_bridge_service import IotBridgeService
service = IotBridgeService.get_instance(self.env)
if service.start_connection(connection.id):
_logger.info(f"Successfully auto-started: {connection.name}")
else:
_logger.warning(f"Failed to auto-start: {connection.name}")
except Exception as e:
_logger.error(f"Error auto-starting connection {connection.name}: {e}", exc_info=True)
_logger.info("Auto-start completed")
except Exception as e:
_logger.error(f"Error in auto_start_all_connections: {e}", exc_info=True)
# ========== CRUD Overrides ==========
@api.ondelete(at_uninstall=False)
def _unlink_if_not_connected(self):
"""Prevent deletion of connected connections"""
if any(connection.state == 'connected' for connection in self):
raise UserError(_('Cannot delete a running connection. Please stop it first.'))

View File

@ -0,0 +1,342 @@
# -*- coding: utf-8 -*-
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
import json
import logging
_logger = logging.getLogger(__name__)
class MqttDevice(models.Model):
_name = 'mqtt.device'
_description = 'MQTT Device'
_rec_name = 'name'
_order = 'name'
# ========== Basic Fields ==========
name = fields.Char(
string='Device Name',
required=True,
help='Friendly name for this device'
)
active = fields.Boolean(
default=True,
help='Deactivate to stop monitoring this device'
)
connection_id = fields.Many2one(
'mqtt.connection',
string='MQTT Connection',
required=True,
ondelete='cascade',
help='MQTT broker connection to use'
)
# ========== Status ==========
state = fields.Selection([
('online', 'Online'),
('offline', 'Offline'),
], string='Status', compute='_compute_state', store=False)
last_seen = fields.Datetime(
string='Last Seen',
compute='_compute_state',
store=False,
help='Last message received from this device'
)
# ========== MQTT Configuration ==========
topic_pattern = fields.Char(
string='Topic Pattern',
required=True,
default='shaperorigin/#',
help='MQTT Topic Pattern aus config.yaml → topic_prefix + /#\n'
'Beispiele:\n'
' - shaperorigin/# (empfohlen für Shaper Origin)\n'
' - device/+/status (mehrere Geräte)\n'
'# = alle Sub-Topics, + = ein Level'
)
# ========== Parser Configuration ==========
parser_type = fields.Selection([
('shelly_pm', 'Shelly PM Mini G3'),
('tasmota', 'Tasmota'),
('generic', 'Generic JSON'),
], string='Parser Type', required=True, default='shelly_pm',
help='Device message parser type')
# ========== Session Detection Strategy ==========
session_strategy = fields.Selection([
('power_threshold', 'Power Threshold (Dual)'),
('last_will', 'MQTT Last Will Testament'),
('manual', 'Manual Start/Stop'),
], string='Session Strategy', default='power_threshold', required=True,
help='How to detect session start/end')
strategy_config = fields.Text(
string='Strategy Configuration',
default='{"standby_threshold_w": 20, "working_threshold_w": 100, '
'"start_debounce_s": 3, "stop_debounce_s": 15, '
'"message_timeout_s": 20}',
help='JSON Konfiguration (entspricht config.yaml):\n'
' - standby_threshold_w: Power-Schwelle für Session-Start (Watt)\n'
' - working_threshold_w: Power-Schwelle für Working-Status (Watt)\n'
' - start_debounce_s: Verzögerung bis Session-Start (Sekunden)\n'
' - stop_debounce_s: Verzögerung bis Session-Ende (Sekunden)\n'
' - message_timeout_s: Timeout ohne Nachricht = Session-Ende\n'
'Beispiel Shaper Origin: standby=10, working=30'
)
# ========== Live Status (Runtime) ==========
state = fields.Selection([
('offline', 'Offline'),
('idle', 'Idle'),
('active', 'Active')
], string='Status', default='offline', readonly=True,
help='Current device status')
last_message_time = fields.Datetime(
string='Last Message',
readonly=True,
help='Timestamp of last received MQTT message'
)
last_power_w = fields.Float(
string='Current Power (W)',
readonly=True,
digits=(8, 2),
help='Last measured power consumption'
)
# ========== Relations ==========
session_ids = fields.One2many(
'mqtt.session',
'device_id',
string='Sessions',
help='Runtime sessions for this device'
)
message_ids = fields.One2many(
'mqtt.message',
'device_id',
string='Messages',
help='Message log (for debugging)'
)
# ========== Computed Statistics ==========
session_count = fields.Integer(
string='Total Sessions',
compute='_compute_session_stats',
store=True
)
running_session_count = fields.Integer(
string='Running Sessions',
compute='_compute_session_stats',
store=True
)
total_runtime_hours = fields.Float(
string='Total Runtime (h)',
compute='_compute_session_stats',
store=True,
digits=(10, 2)
)
avg_session_duration_hours = fields.Float(
string='Avg Session Duration (h)',
compute='_compute_session_stats',
store=True,
digits=(10, 2)
)
# ========== Compute Methods ==========
@api.depends('message_ids', 'message_ids.create_date')
def _compute_state(self):
"""Compute device online/offline state based on last message"""
from datetime import datetime, timedelta
for device in self:
# Find last message
last_msg = self.env['mqtt.message'].search([
('device_id', '=', device.id)
], limit=1, order='create_date desc')
if last_msg:
device.last_seen = last_msg.create_date
# Device is online if last message < 60 seconds ago
threshold = datetime.now() - timedelta(seconds=60)
device.state = 'online' if last_msg.create_date >= threshold else 'offline'
else:
device.last_seen = False
device.state = 'offline'
@api.depends('session_ids', 'session_ids.status', 'session_ids.duration_hours')
def _compute_session_stats(self):
for device in self:
sessions = device.session_ids
completed_sessions = sessions.filtered(lambda s: s.status == 'completed')
device.session_count = len(sessions)
device.running_session_count = len(sessions.filtered(lambda s: s.status == 'running'))
device.total_runtime_hours = sum(completed_sessions.mapped('duration_hours'))
if completed_sessions:
device.avg_session_duration_hours = device.total_runtime_hours / len(completed_sessions)
else:
device.avg_session_duration_hours = 0.0
# ========== Constraints ==========
@api.constrains('strategy_config')
def _check_strategy_config(self):
"""Validate JSON format of strategy config"""
for device in self:
if device.strategy_config:
try:
config = json.loads(device.strategy_config)
if not isinstance(config, dict):
raise ValidationError(_('Strategy configuration must be a JSON object'))
# Validate power threshold strategy config
if device.session_strategy == 'power_threshold':
required_fields = ['standby_threshold_w', 'working_threshold_w']
for field in required_fields:
if field not in config:
raise ValidationError(
_('Power threshold strategy requires "%s" in configuration') % field
)
if not isinstance(config[field], (int, float)) or config[field] < 0:
raise ValidationError(
_('Field "%s" must be a positive number') % field
)
# Validate threshold order
if config['standby_threshold_w'] >= config['working_threshold_w']:
raise ValidationError(
_('Standby threshold must be less than working threshold')
)
except json.JSONDecodeError as e:
raise ValidationError(_('Invalid JSON format: %s') % str(e))
@api.constrains('device_id')
def _check_device_id_unique(self):
"""Ensure device_id is unique per connection"""
for device in self:
if self.search_count([
('id', '!=', device.id),
('connection_id', '=', device.connection_id.id),
('device_id', '=', device.device_id)
]) > 0:
raise ValidationError(
_('Device ID "%s" already exists for this connection') % device.device_id
)
# ========== Default Values ==========
@api.onchange('session_strategy')
def _onchange_session_strategy(self):
"""Update default config when strategy changes"""
if self.session_strategy == 'power_threshold':
self.strategy_config = json.dumps({
'standby_threshold_w': 20,
'working_threshold_w': 100,
'start_debounce_s': 3,
'stop_debounce_s': 15,
'message_timeout_s': 20
}, indent=2)
elif self.session_strategy == 'last_will':
self.strategy_config = json.dumps({
'lwt_topic': f'{self.device_id or "device"}/status',
'lwt_offline_message': 'offline',
'timeout_s': 60
}, indent=2)
elif self.session_strategy == 'manual':
self.strategy_config = json.dumps({
'start_topic': f'{self.device_id or "device"}/session/start',
'stop_topic': f'{self.device_id or "device"}/session/stop'
}, indent=2)
@api.onchange('parser_type')
def _onchange_parser_type(self):
"""Update topic pattern based on parser type"""
if self.parser_type == 'shelly_pm':
if not self.topic_pattern or self.topic_pattern == '#':
self.topic_pattern = f'{self.device_id or "device"}/#'
elif self.parser_type == 'tasmota':
if not self.topic_pattern or self.topic_pattern == '#':
self.topic_pattern = f'tele/{self.device_id or "device"}/#'
# ========== Action Methods ==========
def action_view_sessions(self):
"""Open sessions view for this device"""
self.ensure_one()
return {
'name': _('Sessions - %s') % self.name,
'type': 'ir.actions.act_window',
'res_model': 'mqtt.session',
'view_mode': 'tree,form,pivot,graph',
'domain': [('device_id', '=', self.id)],
'context': {'default_device_id': self.id},
}
def action_view_messages(self):
"""Open message log for this device"""
self.ensure_one()
return {
'name': _('Messages - %s') % self.name,
'type': 'ir.actions.act_window',
'res_model': 'mqtt.message',
'view_mode': 'tree,form',
'domain': [('device_id', '=', self.id)],
'context': {'default_device_id': self.id},
}
def action_start_manual_session(self):
"""Manually start a session"""
self.ensure_one()
if self.session_strategy != 'manual':
raise UserError(_('Manual session start is only available for manual strategy'))
# Check if there's already a running session
running = self.session_ids.filtered(lambda s: s.status == 'running')
if running:
raise UserError(_('Device already has a running session'))
# TODO: Implement manual session start
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Manual Session'),
'message': _('Manual session start not yet implemented'),
'type': 'warning',
}
}
def action_stop_manual_session(self):
"""Manually stop a session"""
self.ensure_one()
if self.session_strategy != 'manual':
raise UserError(_('Manual session stop is only available for manual strategy'))
running = self.session_ids.filtered(lambda s: s.status == 'running')
if not running:
raise UserError(_('No running session to stop'))
# TODO: Implement manual session stop
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _('Manual Session'),
'message': _('Manual session stop not yet implemented'),
'type': 'warning',
}
}
# ========== Helper Methods ==========
def get_strategy_config_dict(self):
"""Parse strategy_config JSON and return as dict"""
self.ensure_one()
try:
return json.loads(self.strategy_config or '{}')
except json.JSONDecodeError:
_logger.error(f"Invalid JSON in strategy_config for device {self.id}")
return {}

View File

@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
from odoo import models, fields, api, _
import logging
_logger = logging.getLogger(__name__)
class MqttMessage(models.Model):
_name = 'mqtt.message'
_description = 'MQTT Message Log'
_rec_name = 'topic'
_order = 'create_date desc'
# Note: _log_access = True by default, provides create_date/write_date
# ========== Basic Fields ==========
connection_id = fields.Many2one(
'mqtt.connection',
string='Connection',
index=True,
ondelete='cascade',
help='MQTT connection that received this message'
)
device_id = fields.Many2one(
'mqtt.device',
string='Device',
ondelete='cascade',
index=True,
help='MQTT device that received this message'
)
device_name = fields.Char(
string='Device Name',
related='device_id.name',
store=True,
readonly=True
)
# ========== Message Data ==========
topic = fields.Char(
string='Topic',
required=True,
index=True,
help='MQTT topic'
)
payload = fields.Text(
string='Payload',
help='Raw message payload (usually JSON)'
)
parsed_data = fields.Text(
string='Parsed Data',
help='Parsed and normalized data'
)
qos = fields.Integer(
string='QoS',
default=0,
help='MQTT Quality of Service (0, 1, or 2)'
)
retain = fields.Boolean(
string='Retain',
default=False,
help='MQTT retain flag'
)
direction = fields.Selection([
('inbound', 'Inbound'),
('outbound', 'Outbound'),
], string='Direction', default='inbound', required=True,
help='Message direction (inbound from broker, outbound to broker)')
# ========== Timestamps ==========
create_date = fields.Datetime(
string='Received At',
readonly=True,
index=True,
help='Timestamp when message was received'
)
# ========== Computed Fields ==========
payload_preview = fields.Char(
string='Payload Preview',
compute='_compute_payload_preview',
help='First 100 characters of payload'
)
# ========== Compute Methods ==========
@api.depends('payload')
def _compute_payload_preview(self):
"""Create preview of payload"""
for message in self:
if message.payload:
message.payload_preview = (message.payload[:100] + '...') if len(message.payload) > 100 else message.payload
else:
message.payload_preview = '-'
# ========== CRUD Methods ==========
@api.model
def create(self, vals):
"""Override create to enforce message limit per device"""
message = super().create(vals)
# Keep only last N messages per device (configurable, default 1000)
max_messages = self.env['ir.config_parameter'].sudo().get_param(
'mqtt.max_messages_per_device', default='1000'
)
try:
max_messages = int(max_messages)
except ValueError:
max_messages = 1000
# Delete old messages if limit exceeded
device_messages = self.search([
('device_id', '=', message.device_id.id)
], order='create_date desc', offset=max_messages)
if device_messages:
device_messages.unlink()
return message
@api.model
def log_message(self, device_id, topic, payload, parsed_data=None):
"""
Convenience method to log MQTT message
Args:
device_id: int (Odoo ID)
topic: str
payload: str or dict
parsed_data: dict (optional)
"""
import json
# Convert payload to string if dict
if isinstance(payload, dict):
payload = json.dumps(payload, indent=2)
# Convert parsed_data to string if dict
if isinstance(parsed_data, dict):
parsed_data = json.dumps(parsed_data, indent=2)
return self.create({
'device_id': device_id,
'topic': topic,
'payload': payload,
'parsed_data': parsed_data,
})
@api.model
def cleanup_old_messages(self, days=7):
"""
Cleanup old messages (called by cron)
Args:
days: int - Delete messages older than this many days
"""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
old_messages = self.search([
('create_date', '<', cutoff_date)
])
count = len(old_messages)
old_messages.unlink()
_logger.info(f"Cleaned up {count} old MQTT messages (older than {days} days)")
return count

View File

@ -0,0 +1,302 @@
# -*- coding: utf-8 -*-
from odoo import models, fields, api, _
from odoo.exceptions import ValidationError
import json
import logging
_logger = logging.getLogger(__name__)
class MqttSession(models.Model):
_name = 'mqtt.session'
_description = 'MQTT Device Session'
_rec_name = 'session_id'
_order = 'start_time desc'
# ========== Basic Fields ==========
session_id = fields.Char(
string='Session ID',
required=True,
readonly=True,
index=True,
default=lambda self: self.env['ir.sequence'].next_by_code('mqtt.session') or 'NEW',
help='Unique session identifier'
)
device_id = fields.Many2one(
'mqtt.device',
string='Device',
required=True,
readonly=True,
ondelete='cascade',
index=True,
help='MQTT device that generated this session'
)
device_name = fields.Char(
string='Device Name',
related='device_id.name',
store=True,
readonly=True
)
# ========== Session Times ==========
start_time = fields.Datetime(
string='Start Time',
required=True,
readonly=True,
index=True,
help='Session start timestamp'
)
end_time = fields.Datetime(
string='End Time',
readonly=True,
help='Session end timestamp (null if running)'
)
# ========== Durations (in seconds) ==========
total_duration_s = fields.Integer(
string='Total Duration (s)',
readonly=True,
help='Total session duration in seconds'
)
standby_duration_s = fields.Integer(
string='Standby Duration (s)',
readonly=True,
help='Time in standby state (machine on, not working)'
)
working_duration_s = fields.Integer(
string='Working Duration (s)',
readonly=True,
help='Time in working state (active work)'
)
# ========== Computed Durations (friendly format) ==========
duration_hours = fields.Float(
string='Duration (h)',
compute='_compute_duration_hours',
store=True,
digits=(10, 2),
help='Total duration in hours'
)
duration_formatted = fields.Char(
string='Duration',
compute='_compute_duration_formatted',
store=True,
help='Human-readable duration format (e.g., 2h 15m)'
)
standby_hours = fields.Float(
string='Standby (h)',
compute='_compute_duration_hours',
store=True,
digits=(10, 2)
)
working_hours = fields.Float(
string='Working (h)',
compute='_compute_duration_hours',
store=True,
digits=(10, 2)
)
# ========== Power Data ==========
start_power_w = fields.Float(
string='Start Power (W)',
readonly=True,
digits=(8, 2),
help='Power consumption at session start'
)
end_power_w = fields.Float(
string='End Power (W)',
readonly=True,
digits=(8, 2),
help='Power consumption at session end'
)
# ========== Session Status ==========
status = fields.Selection([
('running', 'Running'),
('completed', 'Completed')
], string='Status', required=True, default='running', readonly=True,
index=True, help='Session status')
end_reason = fields.Selection([
('power_drop', 'Power Drop'),
('timeout', 'Message Timeout'),
('last_will', 'Last Will Testament'),
('manual', 'Manual Stop'),
], string='End Reason', readonly=True,
help='Reason for session end')
# ========== Metadata ==========
metadata = fields.Text(
string='Metadata',
readonly=True,
help='Additional session data in JSON format'
)
# ========== Compute Methods ==========
@api.depends('total_duration_s', 'standby_duration_s', 'working_duration_s')
def _compute_duration_hours(self):
"""Convert seconds to hours"""
for session in self:
session.duration_hours = (session.total_duration_s / 3600.0) if session.total_duration_s else 0.0
session.standby_hours = (session.standby_duration_s / 3600.0) if session.standby_duration_s else 0.0
session.working_hours = (session.working_duration_s / 3600.0) if session.working_duration_s else 0.0
@api.depends('total_duration_s')
def _compute_duration_formatted(self):
"""Format duration as human-readable string"""
for session in self:
if not session.total_duration_s:
session.duration_formatted = '-'
else:
total_s = session.total_duration_s
hours = total_s // 3600
minutes = (total_s % 3600) // 60
seconds = total_s % 60
if hours > 0:
session.duration_formatted = f"{hours}h {minutes}m"
elif minutes > 0:
session.duration_formatted = f"{minutes}m {seconds}s"
else:
session.duration_formatted = f"{seconds}s"
# ========== Constraints ==========
_sql_constraints = [
('session_id_unique', 'UNIQUE(session_id)',
'Session ID must be unique!'),
]
@api.constrains('start_time', 'end_time')
def _check_times(self):
"""Validate session times"""
for session in self:
if session.end_time and session.start_time and session.end_time < session.start_time:
raise ValidationError(_('End time cannot be before start time'))
# ========== Helper Methods ==========
def get_metadata_dict(self):
"""Parse metadata JSON and return as dict"""
self.ensure_one()
try:
return json.loads(self.metadata or '{}')
except json.JSONDecodeError:
_logger.error(f"Invalid JSON in metadata for session {self.id}")
return {}
# ========== CRUD Methods ==========
@api.model
def create_or_update_session(self, session_data):
"""
Create or update session from MQTT event
Args:
session_data: Dict with session information
- session_id: str (required)
- device_id: int (required, Odoo ID)
- event_type: 'session_start' | 'session_end'
- start_time: datetime
- end_time: datetime (for session_end)
- total_duration_s: int
- standby_duration_s: int
- working_duration_s: int
- start_power_w: float
- end_power_w: float
- end_reason: str
- metadata: dict (optional)
Returns:
mqtt.session record
"""
session_id = session_data.get('session_id')
if not session_id:
raise ValueError('session_id is required')
existing = self.search([('session_id', '=', session_id)], limit=1)
if session_data.get('event_type') == 'session_start':
if existing:
_logger.warning(f"Session {session_id} already exists, skipping create")
return existing
# Create new session
values = {
'session_id': session_id,
'device_id': session_data['device_id'],
'start_time': session_data['start_time'],
'start_power_w': session_data.get('start_power_w', 0.0),
'status': 'running',
'metadata': json.dumps(session_data.get('metadata', {})) if session_data.get('metadata') else False,
}
return self.create(values)
elif session_data.get('event_type') == 'session_end':
if not existing:
_logger.error(f"Cannot end non-existent session {session_id}")
# Create it anyway with end data
values = {
'session_id': session_id,
'device_id': session_data['device_id'],
'start_time': session_data.get('start_time'),
'end_time': session_data.get('end_time'),
'total_duration_s': session_data.get('total_duration_s', 0),
'standby_duration_s': session_data.get('standby_duration_s', 0),
'working_duration_s': session_data.get('working_duration_s', 0),
'start_power_w': session_data.get('start_power_w', 0.0),
'end_power_w': session_data.get('end_power_w', 0.0),
'end_reason': session_data.get('end_reason'),
'status': 'completed',
'metadata': json.dumps(session_data.get('metadata', {})) if session_data.get('metadata') else False,
}
return self.create(values)
# Update existing session
update_values = {
'end_time': session_data.get('end_time'),
'total_duration_s': session_data.get('total_duration_s', 0),
'standby_duration_s': session_data.get('standby_duration_s', 0),
'working_duration_s': session_data.get('working_duration_s', 0),
'end_power_w': session_data.get('end_power_w', 0.0),
'end_reason': session_data.get('end_reason'),
'status': 'completed',
}
# Merge metadata if provided
if session_data.get('metadata'):
existing_metadata = existing.get_metadata_dict()
existing_metadata.update(session_data['metadata'])
update_values['metadata'] = json.dumps(existing_metadata)
existing.write(update_values)
return existing
else:
raise ValueError(f"Unknown event_type: {session_data.get('event_type')}")
@api.model
def get_running_sessions(self, device_id=None):
"""
Get all running sessions, optionally filtered by device
Args:
device_id: int (optional) - Filter by device ID
Returns:
List of session dicts for state recovery
"""
domain = [('status', '=', 'running')]
if device_id:
domain.append(('device_id', '=', device_id))
sessions = self.search(domain)
return [{
'session_id': s.session_id,
'device_id': s.device_id.id,
'machine_id': s.device_id.device_id, # For session_detector compatibility
'machine_name': s.device_id.name,
'start_time': s.start_time.isoformat() + 'Z',
'start_power_w': s.start_power_w,
'standby_duration_s': s.standby_duration_s or 0,
'working_duration_s': s.working_duration_s or 0,
} for s in sessions]

View File

@ -0,0 +1,9 @@
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_mqtt_connection_user,mqtt.connection.user,model_mqtt_connection,base.group_user,1,1,1,1
access_mqtt_device_user,mqtt.device.user,model_mqtt_device,base.group_user,1,1,1,1
access_mqtt_session_user,mqtt.session.user,model_mqtt_session,base.group_user,1,0,0,0
access_mqtt_message_user,mqtt.message.user,model_mqtt_message,base.group_user,1,0,0,1
access_mqtt_connection_system,mqtt.connection.system,model_mqtt_connection,base.group_system,1,1,1,1
access_mqtt_device_system,mqtt.device.system,model_mqtt_device,base.group_system,1,1,1,1
access_mqtt_session_system,mqtt.session.system,model_mqtt_session,base.group_system,1,1,1,1
access_mqtt_message_system,mqtt.message.system,model_mqtt_message,base.group_system,1,1,1,1
1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
2 access_mqtt_connection_user mqtt.connection.user model_mqtt_connection base.group_user 1 1 1 1
3 access_mqtt_device_user mqtt.device.user model_mqtt_device base.group_user 1 1 1 1
4 access_mqtt_session_user mqtt.session.user model_mqtt_session base.group_user 1 0 0 0
5 access_mqtt_message_user mqtt.message.user model_mqtt_message base.group_user 1 0 0 1
6 access_mqtt_connection_system mqtt.connection.system model_mqtt_connection base.group_system 1 1 1 1
7 access_mqtt_device_system mqtt.device.system model_mqtt_device base.group_system 1 1 1 1
8 access_mqtt_session_system mqtt.session.system model_mqtt_session base.group_system 1 1 1 1
9 access_mqtt_message_system mqtt.message.system model_mqtt_message base.group_system 1 1 1 1

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from . import iot_bridge_service

View File

@ -0,0 +1,424 @@
# -*- coding: utf-8 -*-
"""
IoT Bridge Service
Manages MQTT connections and message routing for all devices
"""
import logging
import threading
from datetime import datetime
from typing import Dict, Optional
from odoo import api, SUPERUSER_ID
from .mqtt_client import MqttClient
from .parsers.shelly_parser import ShellyParser
_logger = logging.getLogger(__name__)
class IotBridgeService:
"""
Singleton service managing MQTT connections
One instance per Odoo environment
"""
_instances: Dict[str, 'IotBridgeService'] = {}
_lock = threading.Lock()
def __init__(self, env):
"""
Initialize IoT Bridge Service
Args:
env: Odoo environment
"""
self.env = env
self.registry = env.registry
self.db_name = env.cr.dbname
self._clients: Dict[int, MqttClient] = {} # connection_id -> MqttClient
self._running_lock = threading.Lock()
self._parser = ShellyParser() # For now only Shelly
_logger.info("IoT Bridge Service initialized")
@classmethod
def get_instance(cls, env) -> 'IotBridgeService':
"""
Get singleton instance for this environment
Args:
env: Odoo environment
Returns:
IotBridgeService instance
"""
db_name = env.cr.dbname
with cls._lock:
if db_name not in cls._instances:
cls._instances[db_name] = cls(env)
return cls._instances[db_name]
def start_connection(self, connection_id: int) -> bool:
"""
Start MQTT connection for given connection_id
Args:
connection_id: ID of mqtt.connection record
Returns:
bool: True if connection started successfully
"""
with self._running_lock:
# Check if already running
if connection_id in self._clients:
_logger.warning(f"Connection {connection_id} is already running")
return False
try:
# Use a fresh cursor to read connection data
conn_data = None
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
# Load connection record
connection = env['mqtt.connection'].browse(connection_id)
if not connection.exists():
_logger.error(f"Connection {connection_id} not found")
return False
_logger.info(f"Starting MQTT connection: {connection.name}")
# Store connection data (don't keep record object outside cursor)
conn_data = {
'id': connection.id,
'host': connection.host,
'port': int(connection.port),
'client_id': connection.client_id,
'username': connection.username or None,
'password': connection.password or None,
'use_tls': connection.use_tls,
'verify_cert': connection.verify_cert,
'ca_cert_path': connection.ca_cert_path or None,
'auto_reconnect': connection.auto_reconnect,
'reconnect_delay_min': connection.reconnect_delay_min,
'reconnect_delay_max': connection.reconnect_delay_max,
}
new_cr.commit()
# Cursor is now closed - safe to start MQTT thread
# Create MQTT client with copied data
client = MqttClient(
connection_id=conn_data['id'],
host=conn_data['host'],
port=conn_data['port'],
client_id=conn_data['client_id'],
username=conn_data['username'],
password=conn_data['password'],
use_tls=conn_data['use_tls'],
verify_cert=conn_data['verify_cert'],
ca_cert_path=conn_data['ca_cert_path'],
auto_reconnect=conn_data['auto_reconnect'],
reconnect_delay_min=conn_data['reconnect_delay_min'],
reconnect_delay_max=conn_data['reconnect_delay_max'],
on_message_callback=self._on_message,
on_connect_callback=self._on_connect,
on_disconnect_callback=self._on_disconnect,
)
# Connect (will start background thread)
if client.connect():
self._clients[connection_id] = client
# Subscribe to device topics (needs fresh cursor)
self._subscribe_device_topics(connection_id)
# Update connection state (needs fresh cursor)
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
conn = env['mqtt.connection'].browse(connection_id)
conn.write({
'state': 'connecting',
'last_error': False,
})
new_cr.commit()
return True
else:
_logger.error(f"Failed to connect client for connection {connection_id}")
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
conn = env['mqtt.connection'].browse(connection_id)
conn.write({
'state': 'error',
'last_error': 'Failed to initiate connection',
})
new_cr.commit()
return False
except Exception as e:
_logger.error(f"Error starting connection {connection_id}: {e}", exc_info=True)
# Update error state
try:
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
connection = env['mqtt.connection'].browse(connection_id)
connection.write({
'state': 'error',
'last_error': str(e),
})
new_cr.commit()
except Exception:
pass
return False
def stop_connection(self, connection_id: int) -> bool:
"""
Stop MQTT connection
Args:
connection_id: ID of mqtt.connection record
Returns:
bool: True if connection stopped successfully
"""
with self._running_lock:
if connection_id not in self._clients:
_logger.warning(f"Connection {connection_id} is not running")
return False
try:
_logger.info(f"Stopping MQTT connection {connection_id}")
# Disconnect client
client = self._clients[connection_id]
client.disconnect()
# Remove from active clients
del self._clients[connection_id]
# Update connection state
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
connection = env['mqtt.connection'].browse(connection_id)
connection.write({
'state': 'stopped',
})
new_cr.commit()
return True
except Exception as e:
_logger.error(f"Error stopping connection {connection_id}: {e}", exc_info=True)
return False
def _subscribe_device_topics(self, connection_id):
"""
Subscribe to all active device topics for this connection
Args:
connection_id: ID of mqtt.connection
"""
client = self._clients.get(connection_id)
if not client:
return
# Use fresh cursor to load devices
with self.registry.cursor() as new_cr:
env = api.Environment(new_cr, SUPERUSER_ID, {})
connection = env['mqtt.connection'].browse(connection_id)
for device in connection.device_ids.filtered('active'):
try:
client.subscribe(device.topic_pattern, qos=0)
_logger.info(f"Subscribed to device topic: {device.topic_pattern}")
except Exception as e:
_logger.error(f"Failed to subscribe to {device.topic_pattern}: {e}")
new_cr.commit()
# ========== MQTT Callbacks ==========
def _on_connect(self, connection_id: int):
"""Callback when MQTT client connects"""
_logger.info(f"MQTT connection {connection_id} established")
try:
# Update connection state in database
from odoo import fields
from odoo.api import Environment
with self.registry.cursor() as new_cr:
new_env = Environment(new_cr, SUPERUSER_ID, {})
connection = new_env['mqtt.connection'].browse(connection_id)
connection.write({
'state': 'connected',
'last_connected': fields.Datetime.now(),
'last_error': False,
})
new_cr.commit()
except Exception as e:
_logger.error(f"Failed to update connection state: {e}", exc_info=True)
def _on_disconnect(self, connection_id: int, rc: int):
"""Callback when MQTT client disconnects"""
_logger.warning(f"MQTT connection {connection_id} disconnected (rc={rc})")
try:
# Update connection state in database
from odoo.api import Environment
with self.registry.cursor() as new_cr:
new_env = Environment(new_cr, SUPERUSER_ID, {})
connection = new_env['mqtt.connection'].browse(connection_id)
if rc == 0:
# Clean disconnect
connection.write({'state': 'stopped'})
else:
# Unexpected disconnect
connection.write({
'state': 'connecting', # Will reconnect automatically
'last_error': f'Unexpected disconnect (code {rc})',
})
new_cr.commit()
except Exception as e:
_logger.error(f"Failed to update connection state: {e}", exc_info=True)
def _on_message(self, connection_id: int, topic: str, payload: str, qos: int, retain: bool):
"""Callback when MQTT message is received"""
_logger.debug(f"Message received on connection {connection_id}, topic {topic}")
try:
# Store message in database
from odoo.api import Environment
with self.registry.cursor() as new_cr:
new_env = Environment(new_cr, SUPERUSER_ID, {})
# Find matching device
device = self._find_device_for_topic(new_env, connection_id, topic)
# Create message record
new_env['mqtt.message'].create({
'connection_id': connection_id,
'device_id': device.id if device else False,
'topic': topic,
'payload': payload,
'qos': qos,
'retain': retain,
'direction': 'inbound',
})
new_cr.commit()
# Process message for session detection
if device:
self._process_session(new_env, device, topic, payload)
new_cr.commit()
except Exception as e:
_logger.error(f"Error processing message: {e}", exc_info=True)
def _find_device_for_topic(self, env, connection_id: int, topic: str):
"""
Find device matching the MQTT topic
Args:
env: Odoo environment
connection_id: Connection ID
topic: MQTT topic
Returns:
mqtt.device record or False
"""
# TODO: Implement proper topic matching with wildcards (+, #)
# For now: exact match or simple wildcard
devices = env['mqtt.device'].search([
('connection_id', '=', connection_id),
('active', '=', True),
])
for device in devices:
pattern = device.topic_pattern
# Simple wildcard matching
if pattern.endswith('#'):
prefix = pattern[:-1]
if topic.startswith(prefix):
return device
elif pattern == topic:
return device
return False
def get_client(self, connection_id: int) -> Optional[MqttClient]:
"""
Get MQTT client for connection
Args:
connection_id: Connection ID
Returns:
MqttClient or None
"""
return self._clients.get(connection_id)
def is_running(self, connection_id: int) -> bool:
"""
Check if connection is running
Args:
connection_id: Connection ID
Returns:
bool: True if running
"""
return connection_id in self._clients
def _process_session(self, env, device, topic: str, payload: str):
"""Simple session detection based on power > 0"""
try:
# Parse message
parsed = self._parser.parse_message(topic, payload)
if not parsed:
return
power = self._parser.get_power_value(parsed)
if power is None:
return
# Find running session
running_session = env['mqtt.session'].search([
('device_id', '=', device.id),
('status', '=', 'running')
], limit=1)
if power > 0:
# Device is on
if not running_session:
# Start new session
env['mqtt.session'].create({
'device_id': device.id,
'status': 'running',
'start_time': datetime.now(),
'start_power_w': power,
})
_logger.info(f"🟢 Session started for {device.name} ({power}W)")
else:
# Device is off
if running_session:
# End session
running_session.write({
'status': 'completed',
'end_time': datetime.now(),
'end_power_w': power,
})
_logger.info(f"🔴 Session ended for {device.name}")
except Exception as e:
_logger.debug(f"Session processing error: {e}")

View File

@ -0,0 +1,381 @@
# -*- coding: utf-8 -*-
"""
MQTT Client mit Auto-Reconnect und State Recovery
Migriert von python_prototype/mqtt_client.py (M5)
"""
import paho.mqtt.client as mqtt
import ssl
import time
import logging
import threading
from typing import Optional, Callable, Dict, Any
_logger = logging.getLogger(__name__)
class MqttClient:
"""Enhanced MQTT Client with auto-reconnect and state recovery"""
def __init__(
self,
connection_id: int,
host: str,
port: int,
client_id: str,
username: Optional[str] = None,
password: Optional[str] = None,
use_tls: bool = True,
verify_cert: bool = False,
ca_cert_path: Optional[str] = None,
auto_reconnect: bool = True,
reconnect_delay_min: int = 1,
reconnect_delay_max: int = 60,
on_message_callback: Optional[Callable] = None,
on_connect_callback: Optional[Callable] = None,
on_disconnect_callback: Optional[Callable] = None,
):
"""
Initialize MQTT Client
Args:
connection_id: Database ID of mqtt.connection record
host: MQTT broker hostname
port: MQTT broker port
client_id: MQTT client identifier
username: Authentication username
password: Authentication password
use_tls: Enable TLS/SSL encryption
verify_cert: Verify SSL certificate
ca_cert_path: Path to custom CA certificate
auto_reconnect: Enable automatic reconnection
reconnect_delay_min: Minimum reconnect delay (seconds)
reconnect_delay_max: Maximum reconnect delay (seconds)
on_message_callback: Callback for received messages
on_connect_callback: Callback when connected
on_disconnect_callback: Callback when disconnected
"""
self.connection_id = connection_id
self.host = host
self.port = port
self.client_id = client_id
self.username = username
self.password = password
self.use_tls = use_tls
self.verify_cert = verify_cert
self.ca_cert_path = ca_cert_path
self.auto_reconnect = auto_reconnect
self.reconnect_delay_min = reconnect_delay_min
self.reconnect_delay_max = reconnect_delay_max
# Callbacks
self.on_message_callback = on_message_callback
self.on_connect_callback = on_connect_callback
self.on_disconnect_callback = on_disconnect_callback
# State
self._client: Optional[mqtt.Client] = None
self._running = False
self._connected = False
self._reconnect_thread: Optional[threading.Thread] = None
self._subscriptions: Dict[str, int] = {} # topic -> qos
# Reconnect state
self._reconnect_delay = reconnect_delay_min
self._reconnect_attempt = 0
_logger.info(f"MqttClient initialized for connection {connection_id}: {host}:{port}")
def connect(self) -> bool:
"""
Connect to MQTT broker
Returns:
bool: True if connection initiated successfully
"""
if self._running:
_logger.warning(f"Client already running for connection {self.connection_id}")
return False
try:
_logger.info(f"Connecting to MQTT broker: {self.host}:{self.port}")
# Create MQTT client
self._client = mqtt.Client(
client_id=self.client_id,
protocol=mqtt.MQTTv5
)
# Set callbacks
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
# Configure authentication
if self.username:
self._client.username_pw_set(self.username, self.password or '')
# Configure TLS/SSL
if self.use_tls:
tls_context = ssl.create_default_context()
if not self.verify_cert:
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE
_logger.warning(f"SSL certificate verification disabled")
if self.ca_cert_path:
tls_context.load_verify_locations(cafile=self.ca_cert_path)
_logger.info(f"Loaded CA certificate from {self.ca_cert_path}")
self._client.tls_set_context(tls_context)
# Connect
self._client.connect(self.host, self.port, keepalive=60)
# Start network loop
self._client.loop_start()
self._running = True
_logger.info(f"MQTT connection initiated for {self.connection_id}")
return True
except Exception as e:
_logger.error(f"Failed to connect: {e}", exc_info=True)
self._running = False
return False
def disconnect(self):
"""Disconnect from MQTT broker"""
_logger.info(f"Disconnecting MQTT client {self.connection_id}")
self._running = False
if self._client:
try:
self._client.loop_stop()
self._client.disconnect()
except Exception as e:
_logger.error(f"Error during disconnect: {e}")
finally:
self._client = None
self._connected = False
_logger.info(f"MQTT client {self.connection_id} disconnected")
def subscribe(self, topic: str, qos: int = 0) -> bool:
"""
Subscribe to MQTT topic
Args:
topic: MQTT topic pattern
qos: Quality of Service (0, 1, or 2)
Returns:
bool: True if subscription successful
"""
if not self._connected:
_logger.warning(f"Cannot subscribe - not connected")
# Store subscription for later (will be restored on reconnect)
self._subscriptions[topic] = qos
return False
try:
result, mid = self._client.subscribe(topic, qos)
if result == mqtt.MQTT_ERR_SUCCESS:
self._subscriptions[topic] = qos
_logger.info(f"Subscribed to topic: {topic} (QoS {qos})")
return True
else:
_logger.error(f"Failed to subscribe to {topic}: {result}")
return False
except Exception as e:
_logger.error(f"Error subscribing to {topic}: {e}")
return False
def unsubscribe(self, topic: str) -> bool:
"""
Unsubscribe from MQTT topic
Args:
topic: MQTT topic pattern
Returns:
bool: True if unsubscription successful
"""
if topic in self._subscriptions:
del self._subscriptions[topic]
if not self._connected:
return True
try:
result, mid = self._client.unsubscribe(topic)
if result == mqtt.MQTT_ERR_SUCCESS:
_logger.info(f"Unsubscribed from topic: {topic}")
return True
else:
_logger.error(f"Failed to unsubscribe from {topic}: {result}")
return False
except Exception as e:
_logger.error(f"Error unsubscribing from {topic}: {e}")
return False
def publish(self, topic: str, payload: str, qos: int = 0, retain: bool = False) -> bool:
"""
Publish message to MQTT topic
Args:
topic: MQTT topic
payload: Message payload
qos: Quality of Service
retain: Retain message on broker
Returns:
bool: True if publish successful
"""
if not self._connected:
_logger.warning(f"Cannot publish - not connected")
return False
try:
result = self._client.publish(topic, payload, qos, retain)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
_logger.debug(f"Published to {topic}: {payload[:100]}")
return True
else:
_logger.error(f"Failed to publish to {topic}: {result.rc}")
return False
except Exception as e:
_logger.error(f"Error publishing to {topic}: {e}")
return False
# ========== Internal Callbacks ==========
def _on_connect(self, client, userdata, flags, rc, properties=None):
"""Callback when connection is established"""
if rc == 0:
self._connected = True
self._reconnect_delay = self.reconnect_delay_min
self._reconnect_attempt = 0
_logger.info(f"Connected to MQTT broker: {self.host}:{self.port}")
# Restore subscriptions (state recovery)
if self._subscriptions:
_logger.info(f"Restoring {len(self._subscriptions)} subscriptions...")
for topic, qos in self._subscriptions.items():
try:
self._client.subscribe(topic, qos)
_logger.debug(f"Restored subscription: {topic}")
except Exception as e:
_logger.error(f"Failed to restore subscription {topic}: {e}")
# Call external callback
if self.on_connect_callback:
try:
self.on_connect_callback(self.connection_id)
except Exception as e:
_logger.error(f"Error in connect callback: {e}")
else:
error_messages = {
1: 'Connection refused - incorrect protocol version',
2: 'Connection refused - invalid client identifier',
3: 'Connection refused - server unavailable',
4: 'Connection refused - bad username or password',
5: 'Connection refused - not authorized',
}
error_msg = error_messages.get(rc, f'Connection refused - code {rc}')
_logger.error(f"Connection failed: {error_msg}")
# Trigger reconnect if auto-reconnect is enabled
if self.auto_reconnect and self._running:
self._schedule_reconnect()
def _on_disconnect(self, client, userdata, rc, properties=None):
"""Callback when disconnected from broker"""
self._connected = False
if rc == 0:
_logger.info(f"Disconnected from MQTT broker (clean)")
else:
_logger.warning(f"Unexpected disconnect from MQTT broker (rc={rc})")
# Call external callback
if self.on_disconnect_callback:
try:
self.on_disconnect_callback(self.connection_id, rc)
except Exception as e:
_logger.error(f"Error in disconnect callback: {e}")
# Trigger reconnect if auto-reconnect is enabled and disconnect was unexpected
if rc != 0 and self.auto_reconnect and self._running:
self._schedule_reconnect()
def _on_message(self, client, userdata, msg):
"""Callback when message is received"""
try:
_logger.info(f"📨 MQTT Message received on {msg.topic}: {msg.payload[:100]}")
# Call external callback
if self.on_message_callback:
self.on_message_callback(
connection_id=self.connection_id,
topic=msg.topic,
payload=msg.payload.decode('utf-8'),
qos=msg.qos,
retain=msg.retain
)
except Exception as e:
_logger.error(f"Error processing message: {e}", exc_info=True)
def _schedule_reconnect(self):
"""Schedule reconnection attempt with exponential backoff"""
if not self.auto_reconnect or not self._running:
return
# Don't schedule if reconnect thread is already running
if self._reconnect_thread and self._reconnect_thread.is_alive():
return
self._reconnect_attempt += 1
delay = min(self._reconnect_delay, self.reconnect_delay_max)
_logger.info(f"Scheduling reconnect attempt {self._reconnect_attempt} in {delay}s...")
self._reconnect_thread = threading.Thread(target=self._reconnect_worker, args=(delay,))
self._reconnect_thread.daemon = True
self._reconnect_thread.start()
def _reconnect_worker(self, delay: int):
"""Worker thread for reconnection"""
time.sleep(delay)
if not self._running:
return
_logger.info(f"Attempting to reconnect (attempt {self._reconnect_attempt})...")
try:
if self._client:
self._client.reconnect()
# Exponential backoff
self._reconnect_delay = min(self._reconnect_delay * 2, self.reconnect_delay_max)
except Exception as e:
_logger.error(f"Reconnect failed: {e}")
# Schedule next attempt
if self.auto_reconnect and self._running:
self._schedule_reconnect()
@property
def is_connected(self) -> bool:
"""Check if client is connected"""
return self._connected
@property
def is_running(self) -> bool:
"""Check if client is running"""
return self._running

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from . import shelly_parser

View File

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
"""
Shelly PM Mini G3 Parser
Parses MQTT Messages from Shelly PM Mini G3
"""
import json
import logging
from typing import Dict, Optional
from datetime import datetime
_logger = logging.getLogger(__name__)
class ShellyParser:
"""Parser for Shelly PM Mini G3 MQTT Messages"""
def parse_message(self, topic: str, payload: str) -> Optional[Dict]:
"""
Parse Shelly MQTT message
Args:
topic: MQTT topic
payload: Message payload (JSON string)
Returns:
Dict with parsed data or None
"""
try:
# Parse JSON
data = json.loads(payload)
# Ignore debug logs
if 'debug/log' in topic:
return None
# Parse different message types
if '/status/pm1:0' in topic:
return self._parse_status_message(topic, data)
elif '/events/rpc' in topic:
return self._parse_rpc_event(topic, data)
elif '/telemetry' in topic:
return self._parse_telemetry(topic, data)
elif '/online' in topic:
return {'online': data == 'true' or data is True}
return None
except Exception as e:
_logger.debug(f"Error parsing message from {topic}: {e}")
return None
def _parse_status_message(self, topic: str, data: dict) -> Optional[Dict]:
"""
Parse Shelly PM status message
Topic: shaperorigin/status/pm1:0
Payload format:
{
"id": 0,
"voltage": 230.0,
"current": 0.217,
"apower": 50.0,
"freq": 50.0,
"aenergy": {"total": 12345.6},
"temperature": {"tC": 35.2}
}
"""
try:
# Extract device ID from topic
device_id = self._extract_device_id_from_topic(topic)
result = {
'message_type': 'status',
'device_id': device_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'voltage': data.get('voltage'),
'current': data.get('current'),
'apower': data.get('apower', 0), # Active Power in Watts
'frequency': data.get('freq'),
'total_energy': data.get('aenergy', {}).get('total'),
'temperature': data.get('temperature', {}).get('tC'),
}
_logger.debug(f"Parsed status: {result['apower']}W")
return result
except Exception as e:
_logger.error(f"Error parsing status message: {e}")
return None
def _parse_rpc_event(self, topic: str, payload: dict) -> Optional[Dict]:
"""
Parse RPC NotifyStatus event
Topic: shellypmminig3/events/rpc
"""
try:
if payload.get('method') != 'NotifyStatus':
return None
device_id = payload.get('src', '').replace('shellypmminig3-', '')
params = payload.get('params', {})
pm_data = params.get('pm1:0', {})
# Get timestamp
ts = params.get('ts')
if ts:
timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z'
else:
timestamp = datetime.utcnow().isoformat() + 'Z'
data = {
'message_type': 'event',
'device_id': device_id,
'timestamp': timestamp,
'apower': pm_data.get('apower'),
'current': pm_data.get('current'),
'voltage': pm_data.get('voltage'),
}
# Only return if we have actual data
if data['apower'] is not None or data['current'] is not None:
_logger.debug(f"Parsed RPC event: {pm_data}")
return data
return None
except Exception as e:
_logger.error(f"Error parsing RPC event: {e}")
return None
def _parse_telemetry(self, topic: str, payload: dict) -> Optional[Dict]:
"""
Parse telemetry message
Topic: shelly/pmmini/shellypmminig3-xxx/telemetry
"""
try:
# Extract device ID from topic
parts = topic.split('/')
device_id = parts[2] if len(parts) > 2 else 'unknown'
device_id = device_id.replace('shellypmminig3-', '')
# Get timestamp
ts = payload.get('ts')
if ts:
timestamp = datetime.utcfromtimestamp(ts).isoformat() + 'Z'
else:
timestamp = datetime.utcnow().isoformat() + 'Z'
data = {
'message_type': 'telemetry',
'device_id': device_id,
'timestamp': timestamp,
'voltage': payload.get('voltage_v'),
'current': payload.get('current_a'),
'apower': payload.get('power_w'),
'frequency': payload.get('freq_hz'),
'total_energy': payload.get('energy_wh'),
}
_logger.debug(f"Parsed telemetry: apower={data['apower']}W")
return data
except Exception as e:
_logger.error(f"Error parsing telemetry: {e}")
return None
def _extract_device_id_from_topic(self, topic: str) -> str:
"""
Extract device ID from topic
Topic format: shaperorigin/status/pm1:0
Returns: shaperorigin (the topic prefix)
"""
parts = topic.split('/')
if len(parts) > 0:
return parts[0]
return 'unknown'
def get_power_value(self, parsed_data: Dict) -> Optional[float]:
"""Extract power value from parsed data"""
return parsed_data.get('apower')
def get_device_id(self, parsed_data: Dict) -> str:
"""Get device ID from parsed data"""
return parsed_data.get('device_id', 'unknown')

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""
Test Start/Stop Button functionality
Run from Docker container:
docker exec -it hobbyhimmel_odoo_18-dev python3 /mnt/extra-addons/open_workshop/open_workshop_mqtt/tests/test_start_stop.py
"""
import psycopg2
import time
DB_NAME = 'mqtt'
DB_USER = 'odoo'
DB_PASSWORD = 'odoo'
DB_HOST = 'localhost'
DB_PORT = '5432'
def test_stop_button():
"""Test stopping connection via action_stop()"""
conn = psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
cur = conn.cursor()
# Check initial state
cur.execute("SELECT id, name, state FROM mqtt_connection WHERE id=1")
result = cur.fetchone()
print(f"Before stop: ID={result[0]}, Name={result[1]}, State={result[2]}")
# Simulate action_stop() - what should happen:
# 1. Call service.stop_connection(connection_id)
# 2. MqttClient.disconnect() should be called
# 3. client.loop_stop() should be called
# 4. state should be set to 'stopped'
print("\n[INFO] To actually test, you need to:")
print("1. Open Odoo UI: http://localhost:9018")
print("2. Go to: MQTT Machine > Connections")
print("3. Click 'Stop' button")
print("4. Watch terminal: docker logs -f hobbyhimmel_odoo_18-dev")
print("5. Expected log: 'Stopping MQTT connection 1'")
print("6. Expected log: 'Disconnecting MQTT client 1'")
print("7. Expected log: 'MQTT client 1 disconnected'")
print("\n[CHECK] Wait 5 seconds and check if messages still arriving...")
time.sleep(5)
cur.execute("SELECT COUNT(*) FROM mqtt_message WHERE create_date > NOW() - INTERVAL '5 seconds'")
recent_count = cur.fetchone()[0]
print(f"Messages received in last 5 seconds: {recent_count}")
if recent_count > 0:
print("[WARNING] Messages still arriving - Stop might not be working!")
else:
print("[OK] No recent messages - Stop might be working")
cur.close()
conn.close()
if __name__ == '__main__':
test_stop_button()

View File

@ -0,0 +1,176 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<!-- ========== List View ========== -->
<record id="view_mqtt_connection_list" model="ir.ui.view">
<field name="name">mqtt.connection.list</field>
<field name="model">mqtt.connection</field>
<field name="arch" type="xml">
<list>
<field name="name"/>
<field name="host"/>
<field name="port"/>
<field name="state"
decoration-success="state == 'connected'"
decoration-warning="state == 'connecting'"
decoration-danger="state == 'error'"
decoration-muted="state == 'stopped'"
widget="badge"/>
<field name="device_count"/>
<field name="active_device_count"/>
<field name="last_connected"/>
</list>
</field>
</record>
<!-- ========== Form View ========== -->
<record id="view_mqtt_connection_form" model="ir.ui.view">
<field name="name">mqtt.connection.form</field>
<field name="model">mqtt.connection</field>
<field name="arch" type="xml">
<form>
<header>
<button name="action_start"
string="Start"
type="object"
class="btn-primary"
invisible="state == 'connected'"/>
<button name="action_stop"
string="Stop"
type="object"
class="btn-secondary"
invisible="state != 'connected'"/>
<button name="action_test_connection"
string="Test Connection"
type="object"
class="btn-secondary"/>
<field name="state" widget="statusbar"
statusbar_visible="stopped,connecting,connected"/>
</header>
<sheet>
<div class="oe_button_box" name="button_box">
<button name="%(action_mqtt_device)d"
type="action"
class="oe_stat_button"
icon="fa-microchip"
context="{'default_connection_id': id}">
<field name="device_count"
widget="statinfo"
string="Devices"/>
</button>
<button name="%(action_mqtt_device)d"
type="action"
class="oe_stat_button"
icon="fa-power-off"
context="{'default_connection_id': id, 'search_default_active': 1}">
<field name="active_device_count"
widget="statinfo"
string="Active"/>
</button>
</div>
<widget name="web_ribbon"
title="Connected"
bg_color="bg-success"
invisible="state != 'connected'"/>
<widget name="web_ribbon"
title="Error"
bg_color="bg-danger"
invisible="state != 'error'"/>
<group>
<group string="Connection">
<field name="name"/>
<field name="active"/>
<field name="host"/>
<field name="port"/>
<field name="use_tls"/>
</group>
<group string="Authentication">
<field name="username"/>
<field name="password" password="True"/>
<field name="client_id"/>
</group>
</group>
<group string="Auto-Reconnect">
<group>
<field name="auto_reconnect"/>
</group>
<group invisible="not auto_reconnect">
<field name="reconnect_delay_min"/>
<field name="reconnect_delay_max"/>
</group>
</group>
<group string="Status" invisible="state == 'stopped'">
<group>
<field name="last_connected"/>
</group>
<group invisible="not last_error">
<field name="last_error"/>
</group>
</group>
<notebook>
<page string="Devices" name="devices">
<field name="device_ids">
<list>
<field name="name"/>
<field name="parser_type"/>
<field name="session_strategy"/>
<field name="state" widget="badge"/>
<field name="last_message_time"/>
<field name="session_count"/>
</list>
</field>
</page>
</notebook>
</sheet>
</form>
</field>
</record>
<!-- ========== Search View ========== -->
<record id="view_mqtt_connection_search" model="ir.ui.view">
<field name="name">mqtt.connection.search</field>
<field name="model">mqtt.connection</field>
<field name="arch" type="xml">
<search>
<field name="name"/>
<field name="host"/>
<separator/>
<filter name="active" string="Active" domain="[('active', '=', True)]"/>
<filter name="inactive" string="Inactive" domain="[('active', '=', False)]"/>
<separator/>
<filter name="connected" string="Connected" domain="[('state', '=', 'connected')]"/>
<filter name="stopped" string="Stopped" domain="[('state', '=', 'stopped')]"/>
<filter name="error" string="Error" domain="[('state', '=', 'error')]"/>
<separator/>
<group expand="0" string="Group By">
<filter name="group_state" string="Status" context="{'group_by': 'state'}"/>
<filter name="group_tls" string="TLS" context="{'group_by': 'use_tls'}"/>
</group>
</search>
</field>
</record>
<!-- ========== Action ========== -->
<record id="action_mqtt_connection" model="ir.actions.act_window">
<field name="name">MQTT Connections</field>
<field name="res_model">mqtt.connection</field>
<field name="view_mode">list,form</field>
<field name="help" type="html">
<p class="o_view_nocontent_smiling_face">
Create your first MQTT Connection
</p>
<p>
Connect to your MQTT broker (e.g., Mosquitto) to start receiving device messages.
</p>
</field>
</record>
</odoo>

View File

@ -0,0 +1,256 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<!-- ========== List View ========== -->
<record id="view_mqtt_device_list" model="ir.ui.view">
<field name="name">mqtt.device.list</field>
<field name="model">mqtt.device</field>
<field name="arch" type="xml">
<list>
<field name="name"/>
<field name="connection_id"/>
<field name="parser_type"/>
<field name="state"
decoration-success="state == 'online'"
decoration-danger="state == 'offline'"
widget="badge"/>
<field name="last_seen"/>
<field name="session_count"/>
<field name="total_runtime_hours" optional="hide"/>
</list>
</field>
</record>
<!-- ========== Form View ========== -->
<record id="view_mqtt_device_form" model="ir.ui.view">
<field name="name">mqtt.device.form</field>
<field name="model">mqtt.device</field>
<field name="arch" type="xml">
<form>
<header>
<button name="action_start_manual_session"
string="Start Session"
type="object"
class="btn-primary"
invisible="session_strategy != 'manual'"/>
<button name="action_stop_manual_session"
string="Stop Session"
type="object"
class="btn-secondary"
invisible="session_strategy != 'manual'"/>
<field name="state" widget="statusbar"/>
</header>
<sheet>
<div class="oe_button_box" name="button_box">
<button name="action_view_sessions"
type="object"
class="oe_stat_button"
icon="fa-clock-o">
<field name="session_count"
widget="statinfo"
string="Sessions"/>
</button>
<button name="action_view_sessions"
type="object"
class="oe_stat_button"
icon="fa-hourglass">
<field name="total_runtime_hours"
widget="statinfo"
string="Runtime (h)"/>
</button>
<button name="action_view_messages"
type="object"
class="oe_stat_button"
icon="fa-envelope-o"
groups="base.group_no_one">
<div class="o_field_widget o_stat_info">
<span class="o_stat_text">Messages</span>
</div>
</button>
</div>
<widget name="web_ribbon"
title="Active"
bg_color="bg-success"
invisible="state != 'active'"/>
<widget name="web_ribbon"
title="Offline"
bg_color="bg-danger"
invisible="state != 'offline'"/>
<group>
<group string="Device">
<field name="name"/>
<field name="active"/>
<field name="connection_id"/>
</group>
<group string="MQTT Configuration">
<field name="topic_pattern"/>
<field name="parser_type"/>
</group>
</group>
<group string="Session Detection">
<group>
<field name="session_strategy"/>
</group>
<group>
<field name="strategy_config"
widget="ace"
options="{'mode': 'json'}"
nolabel="1"/>
</group>
</group>
<group string="Live Status" invisible="state == 'offline'">
<group>
<field name="last_message_time"/>
<field name="last_power_w"/>
</group>
<group>
<field name="running_session_count"/>
</group>
</group>
<notebook>
<page string="Sessions" name="sessions">
<field name="session_ids"
context="{'default_device_id': id}">
<list>
<field name="session_id"/>
<field name="start_time"/>
<field name="end_time"/>
<field name="duration_formatted"/>
<field name="status" widget="badge"/>
<field name="end_reason"/>
</list>
</field>
</page>
<page string="Statistics" name="statistics">
<group>
<group string="Session Stats">
<field name="session_count"/>
<field name="running_session_count"/>
</group>
<group string="Runtime Stats">
<field name="total_runtime_hours"/>
<field name="avg_session_duration_hours"/>
</group>
</group>
</page>
</notebook>
</sheet>
</form>
</field>
</record>
<!-- ========== Kanban View ========== -->
<record id="view_mqtt_device_kanban" model="ir.ui.view">
<field name="name">mqtt.device.kanban</field>
<field name="model">mqtt.device</field>
<field name="arch" type="xml">
<kanban>
<field name="id"/>
<field name="name"/>
<field name="state"/>
<field name="last_power_w"/>
<field name="session_count"/>
<field name="total_runtime_hours"/>
<field name="parser_type"/>
<templates>
<t t-name="kanban-box">
<div class="oe_kanban_global_click">
<div class="o_kanban_record_top">
<div class="o_kanban_record_headings">
<strong class="o_kanban_record_title">
<field name="name"/>
</strong>
<div class="o_kanban_record_subtitle">
<field name="parser_type"/>
</div>
</div>
<div class="o_kanban_record_top_right">
<field name="state" widget="badge"
decoration-success="state == 'active'"
decoration-info="state == 'idle'"
decoration-muted="state == 'offline'"/>
</div>
</div>
<div class="o_kanban_record_body">
<div class="row">
<div class="col-6">
<i class="fa fa-microchip"/>
<field name="parser_type"/>
</div>
<div class="col-6" t-if="record.state.raw_value != 'offline'">
<i class="fa fa-bolt"/>
<field name="last_power_w"/> W
</div>
</div>
</div>
<div class="o_kanban_record_bottom">
<div class="oe_kanban_bottom_left">
<span><i class="fa fa-clock-o"/> <field name="session_count"/> sessions</span>
</div>
<div class="oe_kanban_bottom_right">
<span><field name="total_runtime_hours"/> h</span>
</div>
</div>
</div>
</t>
</templates>
</kanban>
</field>
</record>
<!-- ========== Search View ========== -->
<record id="view_mqtt_device_search" model="ir.ui.view">
<field name="name">mqtt.device.search</field>
<field name="model">mqtt.device</field>
<field name="arch" type="xml">
<search>
<field name="name"/>
<field name="connection_id"/>
<separator/>
<filter name="active" string="Active" domain="[('active', '=', True)]"/>
<filter name="inactive" string="Inactive" domain="[('active', '=', False)]"/>
<separator/>
<filter name="state_active" string="Active" domain="[('state', '=', 'active')]"/>
<filter name="state_idle" string="Idle" domain="[('state', '=', 'idle')]"/>
<filter name="state_offline" string="Offline" domain="[('state', '=', 'offline')]"/>
<separator/>
<filter name="parser_shelly" string="Shelly" domain="[('parser_type', '=', 'shelly_pm')]"/>
<filter name="parser_tasmota" string="Tasmota" domain="[('parser_type', '=', 'tasmota')]"/>
<separator/>
<filter name="has_sessions" string="Has Sessions" domain="[('session_count', '>', 0)]"/>
<filter name="running_sessions" string="Running Sessions" domain="[('running_session_count', '>', 0)]"/>
<separator/>
<group expand="0" string="Group By">
<filter name="group_connection" string="Connection" context="{'group_by': 'connection_id'}"/>
<filter name="group_parser" string="Parser" context="{'group_by': 'parser_type'}"/>
<filter name="group_strategy" string="Strategy" context="{'group_by': 'session_strategy'}"/>
<filter name="group_state" string="Status" context="{'group_by': 'state'}"/>
</group>
</search>
</field>
</record>
<!-- ========== Action ========== -->
<record id="action_mqtt_device" model="ir.actions.act_window">
<field name="name">MQTT Devices</field>
<field name="res_model">mqtt.device</field>
<field name="view_mode">kanban,list,form</field>
<field name="help" type="html">
<p class="o_view_nocontent_smiling_face">
Add your first MQTT Device
</p>
<p>
Configure IoT devices (Shelly, Tasmota, etc.) to track their runtime and sessions.
</p>
</field>
</record>
</odoo>

View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<!-- ========== Root Menu ========== -->
<menuitem id="menu_mqtt_root"
name="MQTT"
sequence="50"
web_icon="mqtt,static/description/icon.png"/>
<!-- ========== Main Menu Items ========== -->
<menuitem id="menu_mqtt_connections"
name="Connections"
parent="menu_mqtt_root"
action="action_mqtt_connection"
sequence="10"/>
<menuitem id="menu_mqtt_devices"
name="Devices"
parent="menu_mqtt_root"
action="action_mqtt_device"
sequence="20"/>
<menuitem id="menu_mqtt_sessions"
name="Sessions"
parent="menu_mqtt_root"
action="action_mqtt_session"
sequence="30"/>
<menuitem id="menu_mqtt_messages"
name="Messages"
parent="menu_mqtt_root"
action="action_mqtt_message"
sequence="40"/>
<!-- ========== Configuration Submenu ========== -->
<menuitem id="menu_mqtt_config"
name="Configuration"
parent="menu_mqtt_root"
sequence="100"/>
<menuitem id="menu_mqtt_config_connections"
name="Connections"
parent="menu_mqtt_config"
action="action_mqtt_connection"
sequence="10"/>
<menuitem id="menu_mqtt_config_devices"
name="Devices"
parent="menu_mqtt_config"
action="action_mqtt_device"
sequence="20"/>
</odoo>

View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<!-- ========== List View ========== -->
<record id="view_mqtt_message_list" model="ir.ui.view">
<field name="name">mqtt.message.list</field>
<field name="model">mqtt.message</field>
<field name="arch" type="xml">
<list create="false" edit="false" delete="true">
<field name="create_date"/>
<field name="device_name"/>
<field name="topic"/>
<field name="payload_preview"/>
</list>
</field>
</record>
<!-- ========== Form View ========== -->
<record id="view_mqtt_message_form" model="ir.ui.view">
<field name="name">mqtt.message.form</field>
<field name="model">mqtt.message</field>
<field name="arch" type="xml">
<form create="false" edit="false">
<sheet>
<group>
<group string="Message Info">
<field name="device_id"/>
<field name="topic"/>
<field name="create_date"/>
</group>
</group>
<group string="Raw Payload">
<field name="payload" widget="ace" options="{'mode': 'json'}" nolabel="1"/>
</group>
<group string="Parsed Data" invisible="not parsed_data">
<field name="parsed_data" widget="ace" options="{'mode': 'json'}" nolabel="1"/>
</group>
</sheet>
</form>
</field>
</record>
<!-- ========== Search View ========== -->
<record id="view_mqtt_message_search" model="ir.ui.view">
<field name="name">mqtt.message.search</field>
<field name="model">mqtt.message</field>
<field name="arch" type="xml">
<search>
<field name="device_id"/>
<field name="device_name"/>
<field name="topic"/>
<field name="payload"/>
<separator/>
<filter name="today" string="Today"
domain="[('create_date', '&gt;=', context_today().strftime('%Y-%m-%d 00:00:00'))]"/>
<filter name="last_hour" string="Last Hour"
domain="[('create_date', '&gt;=', (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S'))]"/>
<filter name="last_10min" string="Last 10 Minutes"
domain="[('create_date', '&gt;=', (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime('%Y-%m-%d %H:%M:%S'))]"/>
<separator/>
<group expand="0" string="Group By">
<filter name="group_device" string="Device" context="{'group_by': 'device_id'}"/>
<filter name="group_topic" string="Topic" context="{'group_by': 'topic'}"/>
<filter name="group_hour" string="Hour" context="{'group_by': 'create_date:hour'}"/>
</group>
</search>
</field>
</record>
<!-- ========== Action ========== -->
<record id="action_mqtt_message" model="ir.actions.act_window">
<field name="name">MQTT Messages (Debug)</field>
<field name="res_model">mqtt.message</field>
<field name="view_mode">list,form</field>
<field name="context">{'search_default_today': 1}</field>
<field name="help" type="html">
<p class="o_view_nocontent_smiling_face">
No messages logged yet
</p>
<p>
MQTT messages will be logged here for debugging purposes.
Note: Only the most recent messages are kept.
</p>
</field>
</record>
</odoo>

View File

@ -0,0 +1,193 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<!-- ========== List View ========== -->
<record id="view_mqtt_session_list" model="ir.ui.view">
<field name="name">mqtt.session.list</field>
<field name="model">mqtt.session</field>
<field name="arch" type="xml">
<list>
<field name="device_name"/>
<field name="session_id" optional="hide"/>
<field name="start_time"/>
<field name="end_time"/>
<field name="duration_formatted"/>
<field name="duration_hours" optional="show" sum="Total Hours"/>
<field name="standby_hours" optional="hide"/>
<field name="working_hours" optional="hide"/>
<field name="start_power_w" optional="hide"/>
<field name="end_power_w" optional="hide"/>
<field name="status"
decoration-info="status == 'running'"
decoration-success="status == 'completed'"
widget="badge"/>
<field name="end_reason" optional="show"/>
</list>
</field>
</record>
<!-- ========== Form View ========== -->
<record id="view_mqtt_session_form" model="ir.ui.view">
<field name="name">mqtt.session.form</field>
<field name="model">mqtt.session</field>
<field name="arch" type="xml">
<form create="false" edit="false">
<header>
<field name="status" widget="statusbar"/>
</header>
<sheet>
<widget name="web_ribbon"
title="Running"
bg_color="bg-info"
invisible="status != 'running'"/>
<widget name="web_ribbon"
title="Completed"
bg_color="bg-success"
invisible="status != 'completed'"/>
<div class="oe_title">
<h1>
<field name="device_name"/>
</h1>
<h3>
<field name="session_id"/>
</h3>
</div>
<group>
<group string="Session Times">
<field name="start_time"/>
<field name="end_time"/>
<field name="duration_formatted" string="Duration"/>
</group>
<group string="Device">
<field name="device_id"/>
<field name="end_reason" invisible="status == 'running'"/>
</group>
</group>
<group string="Durations (Detailed)">
<group>
<field name="total_duration_s" string="Total (seconds)"/>
<field name="duration_hours" string="Total (hours)"/>
</group>
<group>
<field name="standby_duration_s" string="Standby (seconds)"/>
<field name="standby_hours" string="Standby (hours)"/>
</group>
<group>
<field name="working_duration_s" string="Working (seconds)"/>
<field name="working_hours" string="Working (hours)"/>
</group>
</group>
<group string="Power Measurements">
<group>
<field name="start_power_w"/>
</group>
<group invisible="not end_power_w">
<field name="end_power_w"/>
</group>
</group>
<group string="Metadata" invisible="not metadata">
<field name="metadata" widget="ace" options="{'mode': 'json'}" nolabel="1"/>
</group>
</sheet>
</form>
</field>
</record>
<!-- ========== Pivot View (Analytics) ========== -->
<record id="view_mqtt_session_pivot" model="ir.ui.view">
<field name="name">mqtt.session.pivot</field>
<field name="model">mqtt.session</field>
<field name="arch" type="xml">
<pivot string="Session Analytics">
<field name="device_name" type="row"/>
<field name="start_time" interval="day" type="col"/>
<field name="duration_hours" type="measure"/>
<field name="working_hours" type="measure"/>
<field name="standby_hours" type="measure"/>
</pivot>
</field>
</record>
<!-- ========== Graph View (Bar Chart) ========== -->
<record id="view_mqtt_session_graph" model="ir.ui.view">
<field name="name">mqtt.session.graph</field>
<field name="model">mqtt.session</field>
<field name="arch" type="xml">
<graph string="Session Statistics" type="bar">
<field name="device_name"/>
<field name="duration_hours" type="measure"/>
</graph>
</field>
</record>
<!-- ========== Graph View (Line Chart - Timeline) ========== -->
<record id="view_mqtt_session_graph_line" model="ir.ui.view">
<field name="name">mqtt.session.graph.line</field>
<field name="model">mqtt.session</field>
<field name="arch" type="xml">
<graph string="Runtime Timeline" type="line">
<field name="start_time" interval="day"/>
<field name="duration_hours" type="measure"/>
</graph>
</field>
</record>
<!-- ========== Search View ========== -->
<record id="view_mqtt_session_search" model="ir.ui.view">
<field name="name">mqtt.session.search</field>
<field name="model">mqtt.session</field>
<field name="arch" type="xml">
<search>
<field name="device_id"/>
<field name="device_name"/>
<field name="session_id"/>
<field name="start_time"/>
<separator/>
<filter name="running" string="Running" domain="[('status', '=', 'running')]"/>
<filter name="completed" string="Completed" domain="[('status', '=', 'completed')]"/>
<separator/>
<filter name="today" string="Today"
domain="[('start_time', '&gt;=', context_today().strftime('%Y-%m-%d 00:00:00'))]"/>
<filter name="this_week" string="This Week"
domain="[('start_time', '&gt;=', (context_today() - relativedelta(weeks=1)).strftime('%Y-%m-%d'))]"/>
<filter name="this_month" string="This Month"
domain="[('start_time', '&gt;=', (context_today() - relativedelta(months=1)).strftime('%Y-%m-%d'))]"/>
<separator/>
<filter name="end_power_drop" string="Power Drop" domain="[('end_reason', '=', 'power_drop')]"/>
<filter name="end_timeout" string="Timeout" domain="[('end_reason', '=', 'timeout')]"/>
<separator/>
<group expand="0" string="Group By">
<filter name="group_device" string="Device" context="{'group_by': 'device_id'}"/>
<filter name="group_status" string="Status" context="{'group_by': 'status'}"/>
<filter name="group_end_reason" string="End Reason" context="{'group_by': 'end_reason'}"/>
<filter name="group_start_day" string="Start Day" context="{'group_by': 'start_time:day'}"/>
<filter name="group_start_week" string="Start Week" context="{'group_by': 'start_time:week'}"/>
<filter name="group_start_month" string="Start Month" context="{'group_by': 'start_time:month'}"/>
</group>
</search>
</field>
</record>
<!-- ========== Action ========== -->
<record id="action_mqtt_session" model="ir.actions.act_window">
<field name="name">MQTT Sessions</field>
<field name="res_model">mqtt.session</field>
<field name="view_mode">list,form,pivot,graph</field>
<field name="context">{'search_default_this_week': 1}</field>
<field name="help" type="html">
<p class="o_view_nocontent_smiling_face">
No sessions recorded yet
</p>
<p>
Sessions will appear here once your MQTT devices start reporting data.
</p>
</field>
</record>
</odoo>