odoo_mqtt/iot_bridge/core/event_queue.py

244 lines
7.9 KiB
Python

"""
Event Queue with Retry Logic for IoT Bridge
Handles queuing and retry of events sent to Odoo with exponential backoff.
"""
from __future__ import annotations
import threading
import time
import uuid
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
import structlog
@dataclass
class QueuedEvent:
"""Represents an event in the queue with retry metadata."""
event_uid: str
event_type: str
device_id: str
payload: dict[str, Any]
created_at: datetime
retry_count: int = 0
next_retry_time: datetime | None = None
max_retries: int = 10
def should_retry(self, current_time: datetime) -> bool:
"""Check whether this event is eligible for a retry attempt.
Args:
current_time: Current UTC timestamp used for retry comparison.
Returns:
True when event can be retried now, otherwise False.
"""
if self.retry_count >= self.max_retries:
return False
if self.next_retry_time is None:
return True
return current_time >= self.next_retry_time
def calculate_next_retry(self) -> datetime:
"""Calculate next retry timestamp with exponential backoff.
Returns:
UTC timestamp when next retry should be attempted.
"""
# Exponential backoff: 1s, 2s, 4s, 8s, 16s, ..., max 60s
delay_seconds = min(2**self.retry_count, 60)
return datetime.utcnow() + timedelta(seconds=delay_seconds)
def increment_retry(self):
"""Increment retry counter and set next retry time."""
self.retry_count += 1
self.next_retry_time = self.calculate_next_retry()
class EventQueue:
"""Thread-safe event queue with retry logic."""
def __init__(
self,
send_callback: Callable[[dict[str, Any]], bool],
logger=None,
):
"""
Initialize event queue.
Args:
send_callback: Function to send event to Odoo. Returns True on success, False on failure.
logger: Logger instance for queue operations.
"""
self.queue: deque[QueuedEvent] = deque()
self.lock = threading.Lock()
self.send_callback = send_callback
self.logger = logger or structlog.get_logger()
# Statistics
self.stats = {"pending_count": 0, "sent_count": 0, "failed_count": 0, "retry_count": 0}
# Background processing
self.running = False
self.process_thread: threading.Thread | None = None
def enqueue(self, event: dict[str, Any]) -> str:
"""
Add event to queue.
Args:
event: Event dictionary with event_type, device_id, payload
Returns:
event_uid: Unique identifier for the event
"""
# Generate UID if not present
event_uid_raw = event.get("event_uid")
event_uid = event_uid_raw if isinstance(event_uid_raw, str) else str(uuid.uuid4())
queued_event = QueuedEvent(
event_uid=event_uid,
event_type=event["event_type"],
device_id=event["device_id"],
payload=event,
created_at=datetime.utcnow(),
)
with self.lock:
self.queue.append(queued_event)
self.stats["pending_count"] = len(self.queue)
self.logger.debug(
f"event_enqueued uid={event_uid[:8]} type={event['event_type']} queue_size={self.stats['pending_count']}"
)
return event_uid
def process_queue(self) -> None:
"""Process queued events continuously in the background.
Returns:
None.
"""
while self.running:
try:
self._process_next_event()
time.sleep(0.1) # Small delay between processing attempts
except Exception as e:
self.logger.error("queue_processing_error", error=str(e))
def _process_next_event(self) -> None:
"""Process next event in queue that's ready for (re)try."""
with self.lock:
if not self.queue:
return
# Find first event ready for retry
current_time = datetime.utcnow()
event_to_process = None
for i, event in enumerate(self.queue):
if event.should_retry(current_time):
event_to_process = event
# Remove from queue for processing
del self.queue[i]
self.stats["pending_count"] = len(self.queue)
break
if not event_to_process:
return
# Send event (outside lock to avoid blocking queue)
success = False
try:
success = self.send_callback(event_to_process.payload)
except Exception as e:
self.logger.error(
f"event_send_exception uid={event_to_process.event_uid[:8]} error={str(e)}"
)
success = False
with self.lock:
if success:
# Event sent successfully
self.stats["sent_count"] += 1
self.logger.info(
f"event_sent_success uid={event_to_process.event_uid[:8]} type={event_to_process.event_type} attempts={event_to_process.retry_count + 1}"
)
else:
# Send failed
if event_to_process.retry_count >= event_to_process.max_retries:
# Max retries exceeded
self.stats["failed_count"] += 1
self.logger.error(
f"event_send_failed_permanently uid={event_to_process.event_uid[:8]} type={event_to_process.event_type} max_retries={event_to_process.max_retries}"
)
else:
# Re-queue with backoff
event_to_process.increment_retry()
self.queue.append(event_to_process)
self.stats["pending_count"] = len(self.queue)
self.stats["retry_count"] += 1
next_retry_time = event_to_process.next_retry_time
if next_retry_time is None:
next_retry_delay = 0.0
else:
next_retry_delay = (next_retry_time - datetime.utcnow()).total_seconds()
self.logger.warning(
f"event_send_failed_retry uid={event_to_process.event_uid[:8]} type={event_to_process.event_type} retry_count={event_to_process.retry_count} next_retry_in={next_retry_delay:.1f}s"
)
def start(self):
"""Start background queue processing."""
if self.running:
self.logger.warning("queue_already_running")
return
self.running = True
self.process_thread = threading.Thread(
target=self.process_queue, daemon=True, name="EventQueueProcessor"
)
self.process_thread.start()
self.logger.info("queue_processor_started")
def stop(self):
"""Stop background queue processing."""
if not self.running:
return
self.running = False
if self.process_thread:
self.process_thread.join(timeout=5)
self.logger.info(
f"queue_processor_stopped pending={self.stats['pending_count']} sent={self.stats['sent_count']} failed={self.stats['failed_count']}"
)
def get_stats(self) -> dict[str, int]:
"""Return queue processing statistics.
Returns:
Snapshot of pending, sent, failed, and retried event counters.
"""
with self.lock:
return self.stats.copy()
def get_queue_size(self) -> int:
"""Return the number of queued events.
Returns:
Current queue length.
"""
with self.lock:
return len(self.queue)