Added MQTT Logs, and improved OTA and NTP to Async

This commit is contained in:
2025-12-28 18:39:13 +02:00
parent 8d397c6dd5
commit 0f0b67cab9
18 changed files with 568 additions and 123 deletions

View File

@@ -13,6 +13,7 @@
#include "../../Telemetry/Telemetry.hpp"
#include "../../Logging/Logging.hpp"
#include "../ResponseBuilder/ResponseBuilder.hpp"
#include "../CommunicationRouter/CommunicationRouter.hpp"
CommandHandler::CommandHandler(ConfigManager& configManager, OTAManager& otaManager)
: _configManager(configManager)
@@ -23,6 +24,7 @@ CommandHandler::CommandHandler(ConfigManager& configManager, OTAManager& otaMana
, _firmwareValidator(nullptr)
, _clientManager(nullptr)
, _telemetry(nullptr)
, _communicationRouter(nullptr)
, _responseCallback(nullptr) {}
CommandHandler::~CommandHandler() {}
@@ -51,6 +53,10 @@ void CommandHandler::setTelemetryReference(Telemetry* telemetry) {
_telemetry = telemetry;
}
void CommandHandler::setCommunicationRouterReference(CommunicationRouter* comm) {
_communicationRouter = comm;
}
void CommandHandler::setResponseCallback(ResponseCallback callback) {
_responseCallback = callback;
}
@@ -1025,6 +1031,10 @@ void CommandHandler::handleSystemCommand(JsonVariant contents, const MessageCont
handleSetSerialLogLevelCommand(contents, context);
} else if (action == "set_sd_log_level") {
handleSetSdLogLevelCommand(contents, context);
} else if (action == "set_mqtt_log_level") {
handleSetMqttLogLevelCommand(contents, context);
} else if (action == "set_mqtt_enabled") {
handleSetMqttEnabledCommand(contents, context);
} else {
LOG_WARNING("Unknown system action: %s", action.c_str());
sendErrorResponse("system", "Unknown action: " + action, context);
@@ -1089,8 +1099,76 @@ void CommandHandler::handleSetSdLogLevelCommand(JsonVariant contents, const Mess
LOG_ERROR("Failed to save SD log level to SD card");
}
} else {
sendErrorResponse("set_sd_log_level",
sendErrorResponse("set_sd_log_level",
"Invalid log level (must be 0-5)", context);
}
}
void CommandHandler::handleSetMqttLogLevelCommand(JsonVariant contents, const MessageContext& context) {
if (!contents.containsKey("level")) {
sendErrorResponse("set_mqtt_log_level", "Missing level parameter", context);
return;
}
uint8_t level = contents["level"].as<uint8_t>();
// Set the level in ConfigManager
if (_configManager.setMqttLogLevel(level)) {
// Apply the level to Logging immediately
Logging::setMqttLogLevel((Logging::LogLevel)level);
// Save to SD card
bool saved = _configManager.saveGeneralConfig();
if (saved) {
sendSuccessResponse("set_mqtt_log_level",
"MQTT log level set to " + String(level) + " and saved", context);
LOG_INFO("MQTT log level updated to %d", level);
} else {
sendErrorResponse("set_mqtt_log_level",
"Log level set but failed to save to SD card", context);
LOG_ERROR("Failed to save MQTT log level to SD card");
}
} else {
sendErrorResponse("set_mqtt_log_level",
"Invalid log level (must be 0-5)", context);
}
}
void CommandHandler::handleSetMqttEnabledCommand(JsonVariant contents, const MessageContext& context) {
if (!contents.containsKey("enabled")) {
sendErrorResponse("set_mqtt_enabled", "Missing enabled parameter", context);
return;
}
bool enabled = contents["enabled"].as<bool>();
// Set MQTT enabled state in ConfigManager
_configManager.setMqttEnabled(enabled);
// Save to SD card
bool saved = _configManager.saveGeneralConfig();
if (saved) {
sendSuccessResponse("set_mqtt_enabled",
String("MQTT ") + (enabled ? "enabled" : "disabled") + " and saved", context);
LOG_INFO("MQTT %s by user command", enabled ? "enabled" : "disabled");
// If disabling, disconnect MQTT immediately
// If enabling, trigger connection attempt
if (_communicationRouter) {
if (!enabled) {
_communicationRouter->getMQTTClient().disconnect();
} else {
_communicationRouter->getMQTTClient().connect();
}
} else {
LOG_WARNING("CommunicationRouter reference not set - cannot control MQTT");
}
} else {
sendErrorResponse("set_mqtt_enabled",
"MQTT state changed but failed to save to SD card", context);
LOG_ERROR("Failed to save MQTT enabled state to SD card");
}
}

View File

@@ -34,6 +34,7 @@ class Timekeeper;
class FirmwareValidator;
class ClientManager;
class Telemetry;
class CommunicationRouter;
class CommandHandler {
public:
@@ -67,6 +68,7 @@ public:
void setFirmwareValidatorReference(FirmwareValidator* fv);
void setClientManagerReference(ClientManager* cm);
void setTelemetryReference(Telemetry* telemetry);
void setCommunicationRouterReference(CommunicationRouter* comm);
/**
* @brief Set response callback for sending responses back
@@ -90,6 +92,7 @@ private:
FirmwareValidator* _firmwareValidator;
ClientManager* _clientManager;
Telemetry* _telemetry;
CommunicationRouter* _communicationRouter;
ResponseCallback _responseCallback;
// Response helpers
@@ -146,4 +149,8 @@ private:
// Log Level Commands
void handleSetSerialLogLevelCommand(JsonVariant contents, const MessageContext& context);
void handleSetSdLogLevelCommand(JsonVariant contents, const MessageContext& context);
void handleSetMqttLogLevelCommand(JsonVariant contents, const MessageContext& context);
// MQTT Control Commands
void handleSetMqttEnabledCommand(JsonVariant contents, const MessageContext& context);
};

View File

@@ -60,6 +60,21 @@ void CommunicationRouter::begin() {
_mqttClient.setCallback([this](const String& topic, const String& payload) {
onMqttMessage(topic, payload);
});
// Setup MQTT logging callback
String logTopic = "vesper/" + _configManager.getDeviceUID() + "/logs";
Logging::setMqttPublishCallback(
[this](const String& topic, const String& payload, int qos) {
_mqttClient.publish(topic, payload, qos, false);
},
logTopic
);
// Apply MQTT log level from config
uint8_t mqttLogLevel = _configManager.getMqttLogLevel();
Logging::setMqttLogLevel((Logging::LogLevel)mqttLogLevel);
LOG_INFO("MQTT logging enabled with level %d on topic: %s", mqttLogLevel, logTopic.c_str());
LOG_INFO("✅ MQTT client initialized");
} catch (...) {
LOG_ERROR("❌ MQTT initialization failed, but WebSocket is still available");
@@ -68,12 +83,16 @@ void CommunicationRouter::begin() {
// 🔥 CRITICAL FIX: Connect ClientManager to CommandHandler
_commandHandler.setClientManagerReference(&_clientManager);
LOG_INFO("ClientManager reference set for CommandHandler");
// 🔥 Set CommunicationRouter reference for MQTT control commands
_commandHandler.setCommunicationRouterReference(this);
LOG_INFO("CommunicationRouter reference set for CommandHandler");
// Setup command handler response callback
_commandHandler.setResponseCallback([this](const String& response, const CommandHandler::MessageContext& context) {
sendResponse(response, context);
});
LOG_INFO("Communication Router initialized with modular architecture");
}
@@ -121,7 +140,7 @@ void CommunicationRouter::setupUdpDiscovery() {
StaticJsonDocument<128> req;
DeserializationError err = deserializeJson(req, msg);
if (!err) {
shouldReply = (req["op"] == "discover" && req["svc"] == "vesper");
shouldReply = (req["op"] == "discover");
}
}
@@ -136,7 +155,7 @@ void CommunicationRouter::setupUdpDiscovery() {
doc["id"] = _configManager.getDeviceUID();
doc["ip"] = _networking.getLocalIP();
char wsUrl[64];
snprintf(wsUrl, sizeof(wsUrl), "ws://%s/ws", _networking.getLocalIP().c_str());
snprintf(wsUrl, sizeof(wsUrl), "ws://%s:80/ws", _networking.getLocalIP().c_str());
doc["ws"] = wsUrl;
doc["port"] = 80;
doc["fw"] = "2.0";

View File

@@ -73,7 +73,10 @@ public:
bool hasActiveWebSocketClients() const;
size_t getWebSocketClientCount() const;
bool isHealthy() const;
// Component accessors
MQTTAsyncClient& getMQTTClient() { return _mqttClient; }
// Broadcast methods
void broadcastStatus(const String& statusMessage);
void broadcastStatus(const JsonDocument& statusJson);

View File

@@ -12,21 +12,33 @@ MQTTAsyncClient* MQTTAsyncClient::_instance = nullptr;
MQTTAsyncClient::MQTTAsyncClient(ConfigManager& configManager, Networking& networking)
: _configManager(configManager)
, _networking(networking)
, _messageCallback(nullptr)
, _messageCallback(nullptr)
, _mqttReconnectTimer(nullptr)
, _heartbeatTimer(nullptr) {
, _networkStabilizationTimer(nullptr)
, _heartbeatTimer(nullptr)
, _reconnectAttempts(0)
, _lastConnectionAttempt(0) {
_instance = this; // Set static instance pointer
// Create reconnection timer
// Create reconnection timer (initial delay will be calculated dynamically)
_mqttReconnectTimer = xTimerCreate(
"mqttReconnect", // Timer name (for debugging)
pdMS_TO_TICKS(MQTT_RECONNECT_DELAY), // Period: 5000ms = 5 seconds
pdMS_TO_TICKS(MQTT_RECONNECT_BASE_DELAY), // Initial period: 5000ms = 5 seconds
pdFALSE, // One-shot (false) or Auto-reload (true)
(void*)0, // Timer ID (can store data)
mqttReconnectTimerCallback // Callback function when timer expires
);
// Create network stabilization timer (one-shot, 2 seconds)
_networkStabilizationTimer = xTimerCreate(
"mqttNetStable", // Timer name
pdMS_TO_TICKS(NETWORK_STABILIZATION_DELAY), // Period: 2000ms = 2 seconds
pdFALSE, // One-shot timer
(void*)0, // Timer ID
networkStabilizationTimerCallback // Callback function
);
// Create heartbeat timer (auto-reload every 30 seconds)
_heartbeatTimer = xTimerCreate(
"mqttHeartbeat", // Timer name
@@ -42,6 +54,10 @@ MQTTAsyncClient::~MQTTAsyncClient() {
if (_mqttReconnectTimer) {
xTimerDelete(_mqttReconnectTimer, portMAX_DELAY);
}
if (_networkStabilizationTimer) {
xTimerStop(_networkStabilizationTimer, 0);
xTimerDelete(_networkStabilizationTimer, portMAX_DELAY);
}
if (_heartbeatTimer) {
xTimerStop(_heartbeatTimer, 0);
xTimerDelete(_heartbeatTimer, portMAX_DELAY);
@@ -98,17 +114,26 @@ void MQTTAsyncClient::begin() {
}
void MQTTAsyncClient::connect() {
auto& mqttConfig = _configManager.getMqttConfig();
// 🔥 Check if MQTT is enabled
if (!mqttConfig.enabled) {
LOG_DEBUG("MQTT is disabled in configuration - skipping connection");
return;
}
if (_mqttClient.connected()) {
LOG_DEBUG("Already connected to MQTT");
return;
}
auto& mqttConfig = _configManager.getMqttConfig();
// Track connection attempt
_lastConnectionAttempt = millis();
LOG_INFO("Free heap BEFORE MQTT connect: %d bytes", ESP.getFreeHeap());
_mqttClient.connect();
LOG_INFO("MQTT connect() called - waiting for async connection...");
}
@@ -118,17 +143,22 @@ void MQTTAsyncClient::disconnect() {
}
uint16_t MQTTAsyncClient::publish(const String& topic, const String& payload, int qos, bool retain) {
// Check if connected before attempting to publish
if (!_mqttClient.connected()) {
// Don't log error here - would cause infinite loop with MQTT logging
return 0;
}
// Build full topic (if relative)
String fullTopic = topic.startsWith("vesper/") ? topic : _dataTopic;
uint16_t packetId = _mqttClient.publish(fullTopic.c_str(), qos, retain, payload.c_str());
if (packetId > 0) {
LOG_DEBUG("Published to %s: %s (packetId=%d)", fullTopic.c_str(), payload.c_str(), packetId);
} else {
LOG_ERROR("Failed to publish to %s", fullTopic.c_str());
}
// REMOVED: Error logging here to prevent infinite recursion with MQTT logs
return packetId;
}
@@ -141,13 +171,28 @@ bool MQTTAsyncClient::isConnected() const {
}
void MQTTAsyncClient::onNetworkConnected() {
LOG_DEBUG("Network connected - waiting 2 seconds for network stack to stabilize...");
// Small delay to ensure network stack is fully ready
delay(2000);
LOG_DEBUG("Network stable - connecting to MQTT");
connect();
auto& mqttConfig = _configManager.getMqttConfig();
// 🔥 Only attempt connection if MQTT is enabled
if (!mqttConfig.enabled) {
LOG_DEBUG("Network connected but MQTT is disabled - skipping MQTT connection");
return;
}
LOG_DEBUG("Network connected - scheduling MQTT connection after 2s stabilization (non-blocking)");
// Reset reconnect attempts on fresh network connection
_reconnectAttempts = 0;
// 🔥 CRITICAL FIX: Use non-blocking timer instead of delay()
// This prevents blocking UDP discovery, WebSocket connections, and async operations
if (_networkStabilizationTimer) {
xTimerStart(_networkStabilizationTimer, 0);
} else {
LOG_ERROR("Network stabilization timer not initialized!");
// Fallback to immediate connection (better than blocking)
connect();
}
}
void MQTTAsyncClient::onNetworkDisconnected() {
@@ -166,15 +211,20 @@ void MQTTAsyncClient::subscribe() {
void MQTTAsyncClient::onMqttConnect(bool sessionPresent) {
LOG_INFO("✅ Connected to MQTT broker (session present: %s)", sessionPresent ? "yes" : "no");
LOG_INFO("🔍 Free heap AFTER MQTT connect: %d bytes", ESP.getFreeHeap());
// Reset reconnection attempts on successful connection
_reconnectAttempts = 0;
// Subscribe to control topic
subscribe();
// 🔥 Start heartbeat timer
startHeartbeat();
}
void MQTTAsyncClient::onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
auto& mqttConfig = _configManager.getMqttConfig();
const char* reasonStr;
switch(reason) {
case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED:
@@ -199,14 +249,30 @@ void MQTTAsyncClient::onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
reasonStr = "Unknown";
break;
}
LOG_ERROR("❌ Disconnected from MQTT broker - Reason: %s (%d)", reasonStr, static_cast<int>(reason));
// Stop heartbeat timer when disconnected
stopHeartbeat();
if (_networking.isConnected()) {
LOG_INFO("Network still connected - scheduling MQTT reconnection in %d seconds", MQTT_RECONNECT_DELAY / 1000);
// 🔥 Don't attempt reconnection if MQTT is disabled
if (!mqttConfig.enabled) {
LOG_INFO("MQTT is disabled - not attempting reconnection");
return;
}
if (_networking.isConnected()) {
// Increment reconnection attempts
_reconnectAttempts++;
// Calculate backoff delay
unsigned long reconnectDelay = getReconnectDelay();
LOG_INFO("Network still connected - scheduling MQTT reconnection #%d in %lu seconds (backoff active)",
_reconnectAttempts, reconnectDelay / 1000);
// Update timer period with new delay
xTimerChangePeriod(_mqttReconnectTimer, pdMS_TO_TICKS(reconnectDelay), 0);
xTimerStart(_mqttReconnectTimer, 0);
} else {
LOG_INFO("Network is down - waiting for network to reconnect");
@@ -342,4 +408,43 @@ void MQTTAsyncClient::heartbeatTimerCallback(TimerHandle_t xTimer) {
if (MQTTAsyncClient::_instance) {
MQTTAsyncClient::_instance->publishHeartbeat();
}
}
// ═══════════════════════════════════════════════════════════════════════════════════
// NETWORK STABILIZATION - NON-BLOCKING TIMER APPROACH
// ═══════════════════════════════════════════════════════════════════════════════════
void MQTTAsyncClient::connectAfterStabilization() {
LOG_DEBUG("Network stabilization complete - connecting to MQTT");
connect();
}
void MQTTAsyncClient::networkStabilizationTimerCallback(TimerHandle_t xTimer) {
if (MQTTAsyncClient::_instance) {
MQTTAsyncClient::_instance->connectAfterStabilization();
}
}
// ═══════════════════════════════════════════════════════════════════════════════════
// EXPONENTIAL BACKOFF CALCULATION
// ═══════════════════════════════════════════════════════════════════════════════════
unsigned long MQTTAsyncClient::getReconnectDelay() {
// First 3 attempts: Quick retries (5 seconds each)
if (_reconnectAttempts <= MQTT_MAX_QUICK_RETRIES) {
return MQTT_RECONNECT_BASE_DELAY;
}
// After quick retries: Exponential backoff
// Formula: base_delay * 2^(attempts - quick_retries)
// Examples: 10s, 20s, 40s, 80s, 160s, 300s (capped at 5 minutes)
uint8_t backoffPower = _reconnectAttempts - MQTT_MAX_QUICK_RETRIES;
unsigned long delay = MQTT_RECONNECT_BASE_DELAY * (1 << backoffPower); // 2^backoffPower
// Cap at maximum delay (5 minutes)
if (delay > MQTT_RECONNECT_MAX_DELAY) {
delay = MQTT_RECONNECT_MAX_DELAY;
}
return delay;
}

View File

@@ -108,11 +108,22 @@ private:
void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total);
void onMqttPublish(uint16_t packetId);
// Reconnection Timer
// Reconnection Timer with Exponential Backoff
TimerHandle_t _mqttReconnectTimer;
static const unsigned long MQTT_RECONNECT_DELAY = 5000; // 5 seconds
static const unsigned long MQTT_RECONNECT_BASE_DELAY = 5000; // 5 seconds base
static const unsigned long MQTT_RECONNECT_MAX_DELAY = 300000; // 5 minutes max
static const uint8_t MQTT_MAX_QUICK_RETRIES = 3; // Try 3 times quickly
uint8_t _reconnectAttempts; // Track failed attempts
unsigned long _lastConnectionAttempt; // Track last attempt time
void attemptReconnection();
static void mqttReconnectTimerCallback(TimerHandle_t xTimer);
unsigned long getReconnectDelay(); // Calculate backoff delay
// Network Stabilization Timer (non-blocking replacement for delay)
TimerHandle_t _networkStabilizationTimer;
static const unsigned long NETWORK_STABILIZATION_DELAY = 2000; // 2 seconds
void connectAfterStabilization();
static void networkStabilizationTimerCallback(TimerHandle_t xTimer);
// Heartbeat Timer (30 seconds)
TimerHandle_t _heartbeatTimer;