MQTT-Display-LaserCutter/parser/shelly_parser.py
2026-02-22 11:13:16 +01:00

82 lines
2.4 KiB
Python

"""Shelly PM Mini G3 Parser - extracts power data from MQTT messages."""
from datetime import datetime
import structlog
from exceptions import ParserError
logger = structlog.get_logger()
class ShellyParser:
"""Parser for Shelly PM Mini G3 MQTT Messages."""
def parse_message(self, topic: str, payload: dict) -> dict | None:
"""
Parse Shelly MQTT message.
Args:
topic: MQTT topic
payload: Message payload (already JSON parsed)
Returns:
Dict with parsed data or None
"""
try:
# Only parse status messages
if "/status/pm1:0" in topic:
return self._parse_status_message(topic, payload)
return None
except Exception as e:
logger.debug("shelly_parse_error", topic=topic, error=str(e))
return None
def _parse_status_message(self, topic: str, data: dict) -> dict | None:
"""
Parse Shelly PM status message.
Topic: shaperorigin/status/pm1:0
Payload format:
{
"id": 0,
"voltage": 230.0,
"current": 0.217,
"apower": 50.0,
"freq": 50.0,
"aenergy": {"total": 12345.6},
"temperature": {"tC": 35.2}
}
"""
try:
device_id = self._extract_device_id(topic)
result = {
"message_type": "status",
"device_id": device_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
"voltage": data.get("voltage"),
"current": data.get("current"),
"apower": data.get("apower", 0), # Active Power in Watts
"frequency": data.get("freq"),
"total_energy": data.get("aenergy", {}).get("total"),
"temperature": data.get("temperature", {}).get("tC"),
}
logger.debug("shelly_parsed_status", device_id=device_id, apower=result["apower"])
return result
except Exception as e:
logger.error("shelly_status_parse_error", error=str(e))
return None
def _extract_device_id(self, topic: str) -> str:
"""Extract device ID from topic path."""
# Example: shaperorigin/status/pm1:0 -> shaperorigin
parts = topic.split("/")
if len(parts) > 0:
return parts[0]
return "unknown"