- parsers/registry.py: PARSER_REGISTRY mit shelly_pm-Eintrag; get_parser(), get_schema(), list_parser_types() als Public API - parsers/__init__.py: Registry-Funktionen exportiert - core/device_manager.py: globalen ShellyParser entfernt; DeviceManager verwaltet jetzt ein eigenes parser-Dict pro Device (per get_parser()) - api/server.py: GET /parsers Endpoint hinzugefügt (gibt get_schema() zurück) - tests/unit/test_parser_registry.py: 17 neue Tests (Registry-API, PARSER_REGISTRY-Integrität, DeviceManager-Integration) - tests/unit/test_device_manager.py: Test auf neues API angepasst (patch statt parser=-Argument) Tests: 63/63 passed
286 lines
10 KiB
Python
286 lines
10 KiB
Python
"""
|
|
HTTP Config Server for IoT Bridge
|
|
Receives configuration from Odoo via POST /config
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
import structlog
|
|
import yaml
|
|
from fastapi import Depends, FastAPI, Header, HTTPException, Request
|
|
from structlog.contextvars import bind_contextvars, clear_contextvars
|
|
|
|
from api.models import BridgeConfig
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class ConfigServer:
|
|
"""HTTP Server for receiving configuration from Odoo."""
|
|
|
|
def __init__(
|
|
self, config_callback=None, mqtt_reconnect_callback=None, token: str | None = None
|
|
):
|
|
"""
|
|
Initialize Config Server.
|
|
|
|
Args:
|
|
config_callback: Callback function(new_config) called when device config is received
|
|
mqtt_reconnect_callback: Callback function(mqtt_config) called when MQTT broker changes
|
|
token: Optional Bearer token for authentication
|
|
"""
|
|
self.app = FastAPI(
|
|
title="IoT Bridge Config API",
|
|
description="Receives device configuration from Odoo",
|
|
version="1.0.0",
|
|
)
|
|
self.config_callback = config_callback
|
|
self.mqtt_reconnect_callback = mqtt_reconnect_callback
|
|
self.auth_token = token
|
|
self.current_config: BridgeConfig | None = None
|
|
self.device_count = 0
|
|
self.subscription_count = 0
|
|
self.last_config_update: datetime | None = None
|
|
|
|
# Register routes
|
|
self._setup_middleware()
|
|
self._setup_routes()
|
|
|
|
logger.info("config_server_initialized", auth_enabled=bool(token))
|
|
|
|
def _setup_middleware(self) -> None:
|
|
"""Setup request correlation middleware."""
|
|
|
|
@self.app.middleware("http")
|
|
async def add_request_context(request: Request, call_next):
|
|
"""Bind request-scoped context values for structured logs.
|
|
|
|
Args:
|
|
request: Incoming HTTP request object.
|
|
call_next: FastAPI middleware callback for downstream processing.
|
|
|
|
Returns:
|
|
HTTP response from downstream middleware/route handlers.
|
|
"""
|
|
clear_contextvars()
|
|
request_id = request.headers.get("x-request-id") or str(uuid4())
|
|
bind_contextvars(
|
|
request_id=request_id,
|
|
http_method=request.method,
|
|
http_path=request.url.path,
|
|
)
|
|
try:
|
|
response = await call_next(request)
|
|
response.headers["X-Request-ID"] = request_id
|
|
return response
|
|
finally:
|
|
clear_contextvars()
|
|
|
|
def _verify_token(self, authorization: str | None = Header(None)):
|
|
"""Verify Bearer token if authentication is enabled."""
|
|
if not self.auth_token:
|
|
return True # No auth required
|
|
|
|
if not authorization:
|
|
raise HTTPException(status_code=401, detail="Authorization header missing")
|
|
|
|
if not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Invalid authorization header format")
|
|
|
|
token = authorization.replace("Bearer ", "")
|
|
if token != self.auth_token:
|
|
raise HTTPException(status_code=403, detail="Invalid token")
|
|
|
|
return True
|
|
|
|
def _setup_routes(self):
|
|
"""Setup FastAPI routes."""
|
|
|
|
@self.app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint."""
|
|
return {
|
|
"status": "ok",
|
|
"devices": self.device_count,
|
|
"subscriptions": self.subscription_count,
|
|
"last_config_update": (
|
|
self.last_config_update.isoformat() if self.last_config_update else None
|
|
),
|
|
}
|
|
|
|
@self.app.post("/config")
|
|
async def receive_config(
|
|
config: BridgeConfig, authorized: bool = Depends(self._verify_token)
|
|
):
|
|
"""
|
|
Receive new configuration from Odoo.
|
|
|
|
This endpoint accepts a complete device configuration and triggers:
|
|
1. Config validation
|
|
2. MQTT broker change detection & reconnect (if needed)
|
|
3. Device diff (added/updated/removed)
|
|
4. Dynamic MQTT subscription updates
|
|
5. Config persistence to /data/config-active.yaml
|
|
|
|
Args:
|
|
config: Full bridge configuration payload from Odoo.
|
|
authorized: Authentication result from dependency injection.
|
|
|
|
Returns:
|
|
JSON response with application and reconnect status.
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"config_received",
|
|
device_count=len(config.devices),
|
|
has_mqtt_config=config.mqtt is not None,
|
|
timestamp=config.timestamp,
|
|
)
|
|
|
|
# Check if MQTT broker config changed
|
|
mqtt_changed = False
|
|
if config.mqtt and self.current_config:
|
|
old_mqtt = self.current_config.mqtt
|
|
new_mqtt = config.mqtt
|
|
|
|
if old_mqtt:
|
|
# Compare all MQTT settings
|
|
mqtt_changed = (
|
|
old_mqtt.broker != new_mqtt.broker
|
|
or old_mqtt.port != new_mqtt.port
|
|
or old_mqtt.username != new_mqtt.username
|
|
or old_mqtt.password != new_mqtt.password
|
|
or old_mqtt.use_tls != new_mqtt.use_tls
|
|
)
|
|
|
|
if mqtt_changed:
|
|
logger.info(
|
|
"mqtt_config_changed",
|
|
old_broker=f"{old_mqtt.broker}:{old_mqtt.port}",
|
|
new_broker=f"{new_mqtt.broker}:{new_mqtt.port}",
|
|
)
|
|
else:
|
|
# First time MQTT config received
|
|
mqtt_changed = True
|
|
logger.info(
|
|
"mqtt_config_first_time", broker=f"{new_mqtt.broker}:{new_mqtt.port}"
|
|
)
|
|
|
|
# Trigger MQTT reconnect if broker changed
|
|
if mqtt_changed and self.mqtt_reconnect_callback and config.mqtt:
|
|
logger.info("triggering_mqtt_reconnect")
|
|
await self.mqtt_reconnect_callback(config.mqtt)
|
|
|
|
# Persist config to disk
|
|
self._persist_config(config)
|
|
|
|
# Update internal state
|
|
self.current_config = config
|
|
self.device_count = len(config.devices)
|
|
self.subscription_count = len(config.devices) # Each device = 1 subscription
|
|
self.last_config_update = datetime.utcnow()
|
|
|
|
# Call callback to update device configuration
|
|
if self.config_callback:
|
|
await self.config_callback(config)
|
|
|
|
logger.info(
|
|
"config_applied",
|
|
devices=self.device_count,
|
|
mqtt_reconnected=mqtt_changed,
|
|
status="success",
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Configuration applied",
|
|
"devices_configured": len(config.devices),
|
|
"mqtt_reconnected": mqtt_changed,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("config_apply_failed", error=str(e))
|
|
raise HTTPException(status_code=500, detail=f"Failed to apply config: {str(e)}")
|
|
|
|
@self.app.get("/config")
|
|
async def get_current_config():
|
|
"""Get currently active configuration."""
|
|
if not self.current_config:
|
|
raise HTTPException(status_code=404, detail="No configuration loaded")
|
|
|
|
# Return config dict with all fields (including None values)
|
|
return self.current_config.dict(exclude_none=False)
|
|
|
|
@self.app.get("/parsers")
|
|
async def get_parsers():
|
|
"""
|
|
Return available parser types and their schemas.
|
|
|
|
Odoo uses this endpoint to derive the parser_type Selection field
|
|
and conditional form groups for device configuration.
|
|
|
|
Returns:
|
|
Dict mapping parser_type key → schema
|
|
(label, description, topic_hint, data_unit, parameters).
|
|
"""
|
|
from parsers.registry import get_schema
|
|
|
|
return get_schema()
|
|
|
|
def _persist_config(self, config: BridgeConfig):
|
|
"""Persist configuration to /data/config-active.yaml."""
|
|
try:
|
|
# Create /data directory if it doesn't exist
|
|
data_dir = Path("/data")
|
|
data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
config_path = data_dir / "config-active.yaml"
|
|
|
|
# Convert Pydantic model to dict (include None values for optional fields)
|
|
config_dict = config.dict(exclude_none=False)
|
|
|
|
# Write to YAML
|
|
with open(config_path, "w") as f:
|
|
yaml.dump(config_dict, f, default_flow_style=False, sort_keys=False)
|
|
|
|
logger.info("config_persisted", path=str(config_path), devices=len(config.devices))
|
|
|
|
except Exception as e:
|
|
logger.error("config_persist_failed", error=str(e))
|
|
# Don't raise - persistence failure shouldn't block config application
|
|
|
|
def load_persisted_config(self) -> BridgeConfig | None:
|
|
"""Load persisted active configuration from disk.
|
|
|
|
Returns:
|
|
Parsed bridge config if available and valid, otherwise None.
|
|
"""
|
|
try:
|
|
config_path = Path("/data/config-active.yaml")
|
|
|
|
if not config_path.exists():
|
|
logger.info("no_persisted_config", path=str(config_path))
|
|
return None
|
|
|
|
with open(config_path) as f:
|
|
config_dict = yaml.safe_load(f)
|
|
|
|
config = BridgeConfig(**config_dict)
|
|
|
|
self.current_config = config
|
|
self.device_count = len(config.devices)
|
|
self.subscription_count = len(config.devices)
|
|
|
|
logger.info(
|
|
"persisted_config_loaded", path=str(config_path), devices=len(config.devices)
|
|
)
|
|
|
|
return config
|
|
|
|
except Exception as e:
|
|
logger.error("persisted_config_load_failed", error=str(e))
|
|
return None
|