odoo_mqtt/iot_bridge/tests/tools/shelly_simulator.py
matthias.lotz ff4ef2f563 Phase 3: Complete type safety & logging unification (3.1-3.2)
Phase 3.1: Type Safety
- Add bridge_types.py for shared type aliases (EventDict, PowerWatts, Timestamp, DeviceID)
- Define protocols for callbacks and message parsers
- Strict type annotations on all core modules (session_detector, event_queue, device_manager)
- Fix Optional handling and type guards throughout codebase
- Achieve full mypy compliance: 0 errors across 47 source files

Phase 3.2: Logging Unification
- Migrate from stdlib logging to pure structlog across all runtime modules
- Convert all logs to structured event+fields format (snake_case event names)
- Remove f-string and printf-style logger calls
- Add contextvars support for per-request correlation
- Implement FastAPI middleware to bind request_id, http_method, http_path
- Propagate X-Request-ID header in responses
- Remove stdlib logging imports except setup layer (utils/logging.py)
- Ensure log-level consistency across all modules

Files Modified:
- iot_bridge/bridge_types.py (new) - Central type definitions
- iot_bridge/core/* - Type safety and logging unification
- iot_bridge/clients/* - Structured logging with request context
- iot_bridge/parsers/* - Type-safe parsing with structured logs
- iot_bridge/utils/logging.py - Pure structlog setup with contextvars
- iot_bridge/api/server.py - Added request correlation middleware
- iot_bridge/tests/* - Test fixtures updated for type safety
- iot_bridge/OPTIMIZATION_PLAN.md - Phase 3 status updated

Validation:
- mypy . → 0 errors (47 files)
- All unit tests pass
- Runtime behavior unchanged
- API response headers include X-Request-ID
2026-02-18 23:54:27 +01:00

300 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Shelly PM Mini G3 Simulator for Testing
Sends MQTT messages to test topics with various power scenarios.
Usage:
python shelly_simulator.py --scenario standby
python shelly_simulator.py --scenario working
python shelly_simulator.py --scenario full_session
"""
import argparse
import json
import ssl
import time
from datetime import datetime
from paho.mqtt import client as mqtt_client
class ShellySimulator:
"""Simulates Shelly PM Mini G3 MQTT Messages."""
def __init__(
self,
broker_host: str,
broker_port: int,
username: str,
password: str,
topic_prefix: str = "testshelly",
use_tls: bool = True,
):
self.broker_host = broker_host
self.broker_port = broker_port
self.username = username
self.password = password
self.topic_prefix = topic_prefix
self.topic = f"{topic_prefix}/status/pm1:0"
self.use_tls = use_tls
self.client: mqtt_client.Client | None = None
def connect(self):
"""Connect to MQTT Broker."""
client_id = f"shelly-simulator-{int(time.time())}"
client = mqtt_client.Client(client_id=client_id, protocol=mqtt_client.MQTTv5)
self.client = client
client.username_pw_set(self.username, self.password)
if self.use_tls:
client.tls_set(cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)
print(f"Connecting to MQTT Broker {self.broker_host}:{self.broker_port}...")
client.connect(self.broker_host, self.broker_port, keepalive=60)
client.loop_start()
time.sleep(1)
print("✓ Connected")
def disconnect(self):
"""Disconnect from broker."""
if self.client:
self.client.loop_stop()
self.client.disconnect()
print("✓ Disconnected")
def send_power_message(self, power_w: float, interval_s: int = 1):
"""
Send Shelly PM Mini G3 status message.
Args:
power_w: Power in watts
interval_s: Sleep interval after sending (seconds)
"""
message = {
"id": 0,
"voltage": 230.0,
"current": round(power_w / 230.0, 3),
"apower": power_w,
"freq": 50.0,
"aenergy": {
"total": round(12345.6 + (power_w * interval_s / 3600), 1),
"by_minute": [0.0, 0.0, 0.0],
"minute_ts": int(time.time()),
},
"temperature": {"tC": 35.2, "tF": 95.4},
}
payload = json.dumps(message)
if self.client is None:
raise RuntimeError("Simulator is not connected. Call connect() first.")
self.client.publish(self.topic, payload, qos=1)
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] 📤 Sent: {power_w:.1f}W")
if interval_s > 0:
time.sleep(interval_s)
def scenario_standby(self, duration_s: int = 60):
"""Scenario: Machine in STANDBY (20-100W)."""
print("\n=== SCENARIO: STANDBY (20-100W) ===")
print(f"Duration: {duration_s}s\n")
for i in range(duration_s):
power = 40 + (i % 20) # 40-60W
self.send_power_message(power)
def scenario_working(self, duration_s: int = 60):
"""Scenario: Machine WORKING (>100W)."""
print("\n=== SCENARIO: WORKING (>100W) ===")
print(f"Duration: {duration_s}s\n")
for i in range(duration_s):
power = 150 + (i % 50) # 150-200W
self.send_power_message(power)
def scenario_full_session(self):
"""Scenario: Complete session with transitions."""
print("\n=== SCENARIO: FULL SESSION ===")
print("OFF → IDLE → STARTING → STANDBY → WORKING → STANDBY → STOPPING → IDLE\n")
# 0. OFF (10s)
print("0. OFF (0W, 10s)")
for _ in range(0):
self.send_power_message(0)
# 1. IDLE (10s)
print("1. IDLE (10W, 10s)")
for _ in range(10):
self.send_power_message(10)
# 2. STARTING → STANDBY (30W, 5s)
print("2. STARTING → STANDBY (30W, 5s)")
for _ in range(5):
self.send_power_message(30)
# 3. STANDBY (50W, 15s)
print("3. STANDBY (50W, 15s)")
for _ in range(15):
self.send_power_message(50)
# 4. WORKING (150W, 4 Minuten)
print("4. WORKING (150W, 4 Minuten)")
for _ in range(240):
self.send_power_message(150)
# 5. Back to STANDBY (40W, 60s)
print("5. Back to STANDBY (40W, 60s)")
for _ in range(60):
self.send_power_message(40)
# 6. WORKING (150W, 4 Minuten)
print("6. WORKING (150W, 4 Minuten)")
for _ in range(240):
self.send_power_message(150)
# 7. STOPPING → OFF (0W, 60s)
print("7. STOPPING → OFF (0W, 60s)")
for _ in range(60):
self.send_power_message(0)
print("\n✓ Session completed")
def scenario_online_test(self, duration_s: int = 120):
"""
Scenario: Online Status Test - Continuous messaging.
Sends continuous MQTT messages to verify device stays online.
Used to test that Bridge/Odoo correctly detects device as online.
Args:
duration_s: How long to send messages (default: 120s)
"""
print("\n=== SCENARIO: ONLINE TEST ===")
print(f"Sending continuous messages for {duration_s}s")
print("Expected: Device status should show 'idle' or 'active'\n")
for i in range(duration_s):
power = 60 + (i % 20) # Varying 60-80W (STANDBY range)
self.send_power_message(power)
print("\n✓ Online test completed")
def scenario_offline_test(
self,
online_duration_s: int = 30,
offline_duration_s: int = 60,
recovery_duration_s: int = 30,
):
"""
Scenario: Offline Detection Test - Message gap to trigger timeout.
Simulates device going offline by stopping message transmission,
then coming back online. Used to test timeout detection logic.
Args:
online_duration_s: Initial online period (default: 30s)
offline_duration_s: Offline period - no messages (default: 60s)
recovery_duration_s: Recovery period after coming back (default: 30s)
"""
print("\n=== SCENARIO: OFFLINE TEST ===")
print(f"Phase 1: Online for {online_duration_s}s")
print(f"Phase 2: OFFLINE (no messages) for {offline_duration_s}s")
print(f"Phase 3: Recovery for {recovery_duration_s}s")
print("\nExpected behavior:")
print(" - Phase 1: Device status 'idle'")
print(" - Phase 2: After timeout (~30-60s), status should change to 'offline'")
print(" - Phase 3: Status should recover to 'idle'\n")
# Phase 1: Online
print(f"\n▶ Phase 1: ONLINE ({online_duration_s}s)")
for i in range(online_duration_s):
power = 50 + (i % 10)
self.send_power_message(power)
# Phase 2: Offline (no messages)
print(f"\n⏸ Phase 2: OFFLINE - Stopping messages for {offline_duration_s}s")
print(" (Device should be detected as offline after ~30-60s)")
for i in range(offline_duration_s):
if i % 10 == 0:
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] ⏱ Offline timer: {i}/{offline_duration_s}s")
time.sleep(1)
# Phase 3: Recovery
print(f"\n▶ Phase 3: RECOVERY ({recovery_duration_s}s)")
for i in range(recovery_duration_s):
power = 55
self.send_power_message(power)
print("\n✓ Offline test completed")
print("Check Odoo UI: Device should have transitioned offline → idle")
def main():
parser = argparse.ArgumentParser(description="Shelly PM Mini G3 Simulator")
parser.add_argument("--broker", default="localhost", help="MQTT Broker host")
parser.add_argument("--port", type=int, default=1883, help="MQTT Broker port")
parser.add_argument("--username", default="", help="MQTT username")
parser.add_argument("--password", default="", help="MQTT password")
parser.add_argument("--topic-prefix", default="testshelly", help="MQTT topic prefix")
parser.add_argument(
"--scenario",
choices=["standby", "working", "full_session", "online_test", "offline_test"],
default="full_session",
help="Test scenario",
)
parser.add_argument(
"--duration",
type=int,
default=60,
help="Duration for standby/working/online_test scenarios (seconds)",
)
parser.add_argument(
"--offline-duration",
type=int,
default=60,
help="Offline duration for offline_test scenario (seconds)",
)
parser.add_argument("--no-tls", action="store_true", help="Disable TLS")
args = parser.parse_args()
simulator = ShellySimulator(
broker_host=args.broker,
broker_port=args.port,
username=args.username,
password=args.password,
topic_prefix=args.topic_prefix,
use_tls=not args.no_tls,
)
try:
simulator.connect()
if args.scenario == "standby":
simulator.scenario_standby(args.duration)
elif args.scenario == "working":
simulator.scenario_working(args.duration)
elif args.scenario == "full_session":
simulator.scenario_full_session()
elif args.scenario == "online_test":
simulator.scenario_online_test(args.duration)
elif args.scenario == "offline_test":
simulator.scenario_offline_test(
online_duration_s=30,
offline_duration_s=args.offline_duration,
recovery_duration_s=30,
)
except KeyboardInterrupt:
print("\n\nInterrupted by user")
finally:
simulator.disconnect()
if __name__ == "__main__":
main()