odoo_mqtt/iot_bridge/tests/unit/test_session_detector.py
2026-02-10 20:00:27 +01:00

234 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Unit Tests for SessionDetector
Tests the session detection logic with aggregation.
Usage:
pytest test_session_detector.py -v
pytest test_session_detector.py::TestSessionStart -v
"""
import pytest
from datetime import datetime, timedelta
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from session_detector import SessionDetector
@pytest.fixture
def events_received():
"""Track events emitted by detector."""
events = []
return events
@pytest.fixture
def detector(events_received):
"""SessionDetector with test configuration."""
def event_callback(event):
events_received.append(event)
return SessionDetector(
device_id="test-device-01",
machine_name="Test Machine",
standby_threshold_w=20.0,
working_threshold_w=100.0,
start_debounce_s=3.0,
stop_debounce_s=15.0,
message_timeout_s=20.0,
heartbeat_interval_s=30.0,
event_callback=event_callback
)
class TestSessionStart:
"""Tests for session start with debounce."""
def test_idle_to_starting(self, detector, events_received):
"""Power >= 20W → STARTING state."""
timestamp = datetime.utcnow()
detector.process_power_measurement(30.0, timestamp)
assert detector.state == 'starting'
assert len(events_received) == 0 # No event yet
def test_starting_to_standby(self, detector, events_received):
"""After 3s debounce with 20-100W → STANDBY."""
start_time = datetime.utcnow()
# T+0: 30W → STARTING
detector.process_power_measurement(30.0, start_time)
assert detector.state == 'starting'
# T+1: 50W → still STARTING
detector.process_power_measurement(50.0, start_time + timedelta(seconds=1))
assert detector.state == 'starting'
# T+3: 50W → STANDBY (debounce passed)
detector.process_power_measurement(50.0, start_time + timedelta(seconds=3))
assert detector.state == 'standby'
assert len(events_received) == 1
assert events_received[0]['event_type'] == 'session_started'
assert events_received[0]['payload']['start_power_w'] == 50.0
def test_starting_to_working(self, detector, events_received):
"""After 3s debounce with >=100W → WORKING."""
start_time = datetime.utcnow()
# T+0: 30W → STARTING
detector.process_power_measurement(30.0, start_time)
# T+1: 120W → still STARTING
detector.process_power_measurement(120.0, start_time + timedelta(seconds=1))
# T+3: 150W → WORKING (debounce passed)
detector.process_power_measurement(150.0, start_time + timedelta(seconds=3))
assert detector.state == 'standby' # Session starts in standby first
# Then immediately transitions to working if power high enough
def test_false_start(self, detector, events_received):
"""Power drops before debounce → back to IDLE."""
start_time = datetime.utcnow()
# T+0: 30W → STARTING
detector.process_power_measurement(30.0, start_time)
assert detector.state == 'starting'
# T+1: 10W → back to IDLE
detector.process_power_measurement(10.0, start_time + timedelta(seconds=1))
assert detector.state == 'idle'
assert len(events_received) == 0
class TestStateTransitions:
"""Tests for state transitions during session."""
def test_standby_to_working(self, detector, events_received):
"""STANDBY → WORKING when power >= 100W."""
start_time = datetime.utcnow()
# Start session in STANDBY
detector.process_power_measurement(30.0, start_time)
detector.process_power_measurement(50.0, start_time + timedelta(seconds=3))
assert detector.state == 'standby'
# Clear started event
events_received.clear()
# Power increases to 150W → WORKING
detector.process_power_measurement(150.0, start_time + timedelta(seconds=10))
assert detector.state == 'working'
assert len(events_received) == 0 # No event, just state change
def test_working_to_standby(self, detector, events_received):
"""WORKING → STANDBY when power < 100W."""
start_time = datetime.utcnow()
# Start session with high power → WORKING
detector.process_power_measurement(150.0, start_time)
detector.process_power_measurement(150.0, start_time + timedelta(seconds=3))
# Transition to working
detector.process_power_measurement(150.0, start_time + timedelta(seconds=4))
assert detector.state == 'working' or detector.state == 'standby'
events_received.clear()
# Power decreases to 60W → STANDBY
detector.process_power_measurement(60.0, start_time + timedelta(seconds=10))
assert detector.state in ['standby', 'working'] # Depending on transition
class TestHeartbeat:
"""Tests for heartbeat aggregation."""
def test_heartbeat_emission(self, detector, events_received):
"""Heartbeat emitted after heartbeat_interval_s."""
start_time = datetime.utcnow()
# Start session
detector.process_power_measurement(50.0, start_time)
detector.process_power_measurement(50.0, start_time + timedelta(seconds=3))
events_received.clear()
# Send measurements for 30 seconds (heartbeat interval)
for i in range(30):
detector.process_power_measurement(50.0, start_time + timedelta(seconds=4 + i))
# Should have emitted heartbeat
heartbeats = [e for e in events_received if e['event_type'] == 'session_heartbeat']
assert len(heartbeats) >= 1
if len(heartbeats) > 0:
hb = heartbeats[0]
assert 'interval_standby_s' in hb['payload']
assert 'interval_working_s' in hb['payload']
assert 'avg_power_w' in hb['payload']
class TestSessionEnd:
"""Tests for session end with debounce."""
def test_session_end_from_standby(self, detector, events_received):
"""STANDBY → STOPPING → IDLE after 15s < 20W."""
start_time = datetime.utcnow()
# Start session
detector.process_power_measurement(50.0, start_time)
detector.process_power_measurement(50.0, start_time + timedelta(seconds=3))
events_received.clear()
# Run for 10s in STANDBY
for i in range(10):
detector.process_power_measurement(50.0, start_time + timedelta(seconds=4 + i))
# Power drops < 20W → STOPPING
t_stopping = start_time + timedelta(seconds=14)
detector.process_power_measurement(10.0, t_stopping)
assert detector.state == 'stopping'
# Run for 15s in STOPPING
for i in range(15):
detector.process_power_measurement(10.0, t_stopping + timedelta(seconds=1 + i))
# Should have ended session
ended = [e for e in events_received if e['event_type'] == 'session_ended']
assert len(ended) >= 1
assert detector.state == 'idle'
class TestTimeout:
"""Tests for session timeout."""
def test_session_timeout(self, detector, events_received):
"""Session times out after message_timeout_s without messages."""
start_time = datetime.utcnow()
# Start session
detector.process_power_measurement(50.0, start_time)
detector.process_power_measurement(50.0, start_time + timedelta(seconds=3))
assert detector.current_session_id is not None
events_received.clear()
# Check timeout after 25 seconds (> 20s timeout)
detector.check_timeout(start_time + timedelta(seconds=28))
# Should have timed out
timeout_events = [e for e in events_received if e['event_type'] == 'session_timeout']
assert len(timeout_events) >= 1
assert detector.state == 'idle'
assert detector.current_session_id is None