// ============================================================================= // mqtt_client.cpp - MQTT-Verbindung, Publish und Subscribe // Projekt: MQTT-Display LaserCutter // ============================================================================= #include "mqtt_client.h" #include "laser_tracker.h" #include "display_manager.h" #include "config.h" #include // Globale Instanz MqttClient mqttClient; // -------------------------------------------------------------------------- // Konstruktor // -------------------------------------------------------------------------- MqttClient::MqttClient() : _client(_wifiClient) , _lastReconnectMs(0) , _lastHeartbeatMs(0) , _taskHandle(nullptr) , _pendingSession(false) , _pendingSessionSec(0) , _pendingGratisSec(0) { _clientId[0] = '\0'; } // -------------------------------------------------------------------------- // begin() - Client-ID setzen und MQTT-Task auf Core 0 starten. // WICHTIG: _client und _secureClient werden AUSSCHLIESSLICH in _taskLoop() // auf Core 0 initialisiert und verwendet (mbedtls nicht Core-safe). // -------------------------------------------------------------------------- void MqttClient::begin() { const auto& cfg = settings.get(); const char* broker = cfg.mqttBroker[0] != '\0' ? cfg.mqttBroker : DEFAULT_MQTT_BROKER; uint16_t port = cfg.mqttPort > 0 ? cfg.mqttPort : DEFAULT_MQTT_PORT; // Client-ID jetzt setzen (nur String-Op, kein Netzwerk) uint8_t mac[6]; WiFi.macAddress(mac); snprintf(_clientId, sizeof(_clientId), "%s-%02X%02X%02X", MQTT_CLIENT_ID, mac[3], mac[4], mac[5]); LOG_I("MQTT", "Broker: %s:%u", broker, port); LOG_I("MQTT", "Client-ID: %s", _clientId); // MQTT-Task auf Core 0 starten. // 16 KB Stack: TLS/mbedtls benoetigt ~12 KB. // Alle _client.*-Aufrufe laufen ausschliesslich im Task auf Core 0. xTaskCreatePinnedToCore(_taskFn, "mqtt_task", 16384, this, 1, &_taskHandle, 0); LOG_I("MQTT", "MQTT-Task gestartet (Core 0)"); } // -------------------------------------------------------------------------- // loop() - No-Op: Arbeit erledigt _taskLoop() auf Core 0 // Bleibt im Interface, damit main.cpp unveraendert bleibt. // -------------------------------------------------------------------------- void MqttClient::loop() { // Reconnect, _client.loop(), Heartbeat und Publish // werden vollstaendig vom MQTT-Task auf Core 0 erledigt. } // -------------------------------------------------------------------------- // publishSession() - beim Ende einer Session (Core 1 safe, kein Blocking) // Daten werden via volatile Flag an den MQTT-Task auf Core 0 uebergeben. // -------------------------------------------------------------------------- void MqttClient::publishSession(int lastSessionSec, int gratisSec) { _pendingSessionSec = lastSessionSec; _pendingGratisSec = gratisSec; _pendingSession = true; // Task auf Core 0 fuehrt den Publish aus LOG_I("MQTT", "publishSession vorgemerkt: %d s (gratis %d s)", lastSessionSec, gratisSec); } // -------------------------------------------------------------------------- // _doPublishSession() - eigentlicher Publish, wird aus MQTT-Task aufgerufen // -------------------------------------------------------------------------- void MqttClient::_doPublishSession(int lastSessionSec, int gratisSec) { if (!_client.connected()) { LOG_E("MQTT", "_doPublishSession: nicht verbunden, uebersprungen"); return; } // session_minutes: Decken-Rundung (62 s = 2 min) int sessionMinuten = (lastSessionSec + 59) / 60; // session_start_time als ISO-8601 Lokalzeit (CET/CEST via configTzTime) char startTimeBuf[32] = {0}; time_t startTime = laserTracker.getSessionStartTime(); if (startTime > 0) { struct tm* tmInfo = localtime(&startTime); strftime(startTimeBuf, sizeof(startTimeBuf), "%Y-%m-%dT%H:%M:%S", tmInfo); } JsonDocument doc; doc["session_minutes"] = sessionMinuten; doc["session_seconds"] = lastSessionSec; doc["session_start_time"] = (startTime > 0) ? startTimeBuf : "unknown"; doc["freetime_s"] = gratisSec; doc["ip"] = WiFi.localIP().toString(); char buf[128]; serializeJson(doc, buf, sizeof(buf)); bool ok = _client.publish(MQTT_TOPIC_SESSION, buf, /*retained=*/false); LOG_I("MQTT", "publishSession: %s -> %s", buf, ok ? "OK" : "FEHLER"); } // -------------------------------------------------------------------------- // publishHeartbeat() - Status-Heartbeat // -------------------------------------------------------------------------- void MqttClient::publishHeartbeat() { JsonDocument doc; doc["online"] = true; doc["session_sum"] = laserTracker.getAllSessionsSumMinutes(); doc["machine_running_time_min"] = serialized(String(laserTracker.getTotalMinutes(), 2)); doc["ip"] = WiFi.localIP().toString(); doc["uptime_s"] = (uint32_t)(millis() / 1000UL); doc["firmware_version"] = FIRMWARE_VERSION " (" __DATE__ ")"; char buf[220]; serializeJson(doc, buf, sizeof(buf)); bool ok = _client.publish(MQTT_TOPIC_STATUS, buf, /*retained=*/true); LOG_I("MQTT", "Heartbeat: %s -> %s", buf, ok ? "OK" : "FEHLER"); } // -------------------------------------------------------------------------- // reconnect() - Verbindungsaufbau, non-blocking // -------------------------------------------------------------------------- bool MqttClient::reconnect() { if (_client.connected()) return true; const auto& cfg = settings.get(); const char* user = cfg.mqttUser[0] != '\0' ? cfg.mqttUser : nullptr; const char* pass = cfg.mqttPassword[0] != '\0' ? cfg.mqttPassword : nullptr; // Offline-LWT (Last Will and Testament) const char* lwtPayload = "{\"online\":false}"; LOG_I("MQTT", "Verbinde als '%s'...", _clientId); // TLS-Handshake kann 2-15 s dauern. Laeuft auf Core 0 (MQTT-Task), // blockiert Core 1 (loop/LaserTracker/Display) nicht mehr. bool ok; if (user && pass) { ok = _client.connect(_clientId, user, pass, MQTT_TOPIC_STATUS, 0, true, lwtPayload); } else { ok = _client.connect(_clientId, nullptr, nullptr, MQTT_TOPIC_STATUS, 0, true, lwtPayload); } if (ok) { LOG_I("MQTT", "Verbunden!"); _client.subscribe(MQTT_TOPIC_RESET); LOG_I("MQTT", "Abonniert: %s", MQTT_TOPIC_RESET); // Sofortigen Heartbeat senden publishHeartbeat(); _lastHeartbeatMs = millis(); } else { LOG_E("MQTT", "Verbindung fehlgeschlagen, rc=%d", _client.state()); } return ok; } // -------------------------------------------------------------------------- // _taskFn() / _taskLoop() - FreeRTOS-Task auf Core 0 // Erledigt: Reconnect, PubSubClient.loop(), Heartbeat, pending Session-Publish // -------------------------------------------------------------------------- void MqttClient::_taskFn(void* param) { static_cast(param)->_taskLoop(); } void MqttClient::_taskLoop() { // ------------------------------------------------------------------ // Einmalige Initialisierung auf Core 0. // WiFiClientSecure (mbedtls) MUSS auf demselben Core laufen, auf dem // es konfiguriert und verwendet wird – sonst Heap-Korruption! // ------------------------------------------------------------------ const auto& cfg = settings.get(); const char* broker = cfg.mqttBroker[0] != '\0' ? cfg.mqttBroker : DEFAULT_MQTT_BROKER; uint16_t port = cfg.mqttPort > 0 ? cfg.mqttPort : DEFAULT_MQTT_PORT; if (port == 8883) { _secureClient.setInsecure(); _client.setClient(_secureClient); LOG_I("MQTT", "TLS aktiv (setInsecure, Core 0)"); } else { _client.setClient(_wifiClient); } _client.setServer(broker, port); _client.setCallback(MqttClient::onMessage); _client.setKeepAlive(60); _client.setBufferSize(512); for (;;) { // Kein WiFi: warten if (!WiFi.isConnected()) { vTaskDelay(pdMS_TO_TICKS(500)); continue; } // Nicht verbunden: Reconnect-Intervall abwarten, dann versuchen if (!_client.connected()) { uint32_t now = millis(); if (now - _lastReconnectMs >= MQTT_RECONNECT_MS) { _lastReconnectMs = now; reconnect(); // blockiert hier (TLS); Core 1 laeuft unbehelligt } vTaskDelay(pdMS_TO_TICKS(200)); continue; } // PubSubClient-interne Verarbeitung (eingehende Nachrichten) _client.loop(); // Pending Session-Publish (von Core 1 via volatile Flag gesetzt) if (_pendingSession) { _pendingSession = false; _doPublishSession(_pendingSessionSec, _pendingGratisSec); } // Heartbeat uint32_t now = millis(); if (now - _lastHeartbeatMs >= MQTT_HEARTBEAT_MS) { _lastHeartbeatMs = now; publishHeartbeat(); } vTaskDelay(pdMS_TO_TICKS(50)); } } // -------------------------------------------------------------------------- // onMessage() - Callback fuer eingehende MQTT-Nachrichten // -------------------------------------------------------------------------- void MqttClient::onMessage(const char* topic, byte* payload, unsigned int length) { // Payload als String kopieren char msg[64] = {}; size_t copyLen = length < sizeof(msg) - 1 ? length : sizeof(msg) - 1; memcpy(msg, payload, copyLen); LOG_I("MQTT", "Nachricht: topic=%s payload=%s", topic, msg); // Reset-Kommando if (strcmp(topic, MQTT_TOPIC_RESET) == 0) { bool doReset = false; // Plain "1" (rueckwaertskompatibel) if (strcmp(msg, "1") == 0) { doReset = true; } // JSON: {"reset":true} oder {"reset":1} else if (msg[0] == '{') { JsonDocument doc; DeserializationError err = deserializeJson(doc, msg); if (!err) { JsonVariant v = doc["reset"]; if (!v.isNull()) { doReset = v.as() || v.as() == 1; } } else { LOG_E("MQTT", "JSON-Parse-Fehler: %s", err.c_str()); } } if (doReset) { LOG_I("MQTT", "RESET-Kommando empfangen -> laserTracker.resetTotal()"); laserTracker.resetTotal(); } // reset_session: nur RAM-Session-Summe auf 0, NVS bleibt bool doResetSession = false; if (msg[0] == '{') { JsonDocument doc2; DeserializationError err = deserializeJson(doc2, msg); if (!err) { JsonVariant v = doc2["reset_session"]; if (!v.isNull()) { doResetSession = v.as() || v.as() == 1; } } } if (doResetSession) { LOG_I("MQTT", "RESET_SESSION-Kommando empfangen -> laserTracker.resetSessionSum()"); laserTracker.resetSessionSum(); } } } // -------------------------------------------------------------------------- // isConnected() // -------------------------------------------------------------------------- bool MqttClient::isConnected() { return _client.connected(); } // -------------------------------------------------------------------------- // printToSerial() // -------------------------------------------------------------------------- void MqttClient::printToSerial() { const auto& cfg = settings.get(); LOG_I("MQTT", "=== MqttClient ==="); LOG_I("MQTT", " Broker : %s", cfg.mqttBroker[0] ? cfg.mqttBroker : DEFAULT_MQTT_BROKER); LOG_I("MQTT", " Port : %u", cfg.mqttPort > 0 ? cfg.mqttPort : DEFAULT_MQTT_PORT); LOG_I("MQTT", " ClientID : %s", _clientId[0] ? _clientId : MQTT_CLIENT_ID); LOG_I("MQTT", " Verbunden: %s", _client.connected() ? "JA" : "NEIN"); LOG_I("MQTT", " rc : %d", _client.state()); LOG_I("MQTT", "=================="); }