/* * COMMUNICATIONROUTER.CPP - Communication Router Implementation */ #include "CommunicationRouter.hpp" #define TAG "CommRouter" #include "../../ConfigManager/ConfigManager.hpp" #include "../../OTAManager/OTAManager.hpp" #include "../../Networking/Networking.hpp" #include "../../Logging/Logging.hpp" #include "../../Player/Player.hpp" #include "../../FileManager/FileManager.hpp" #include "../../TimeKeeper/TimeKeeper.hpp" #include "../../FirmwareValidator/FirmwareValidator.hpp" CommunicationRouter::CommunicationRouter(ConfigManager& configManager, OTAManager& otaManager, Networking& networking, AsyncWebServer& server, AsyncWebSocket& webSocket, AsyncUDP& udp) : _configManager(configManager) , _otaManager(otaManager) , _networking(networking) , _server(server) , _webSocket(webSocket) , _udp(udp) , _player(nullptr) , _fileManager(nullptr) , _timeKeeper(nullptr) , _firmwareValidator(nullptr) , _mqttClient(configManager, networking) , _clientManager() , _wsServer(webSocket, _clientManager) , _commandHandler(configManager, otaManager) , _httpHandler(server, configManager) , _uartHandler() , _settingsServer(server, configManager, networking) {} CommunicationRouter::~CommunicationRouter() {} void CommunicationRouter::begin() { LOG_INFO(TAG, "Initializing Communication Router v4.0 (Modular)"); // 🔥 CRITICAL: Initialize WebSocket FIRST to ensure it's always set up // Even if MQTT fails, we want WebSocket to work! LOG_INFO(TAG, "Setting up WebSocket server..."); // Initialize WebSocket server _wsServer.begin(); _wsServer.setCallback([this](uint32_t clientId, const JsonDocument& message) { onWebSocketMessage(clientId, message); }); // 🔥 CRITICAL FIX: Attach WebSocket handler to AsyncWebServer // This MUST happen before any potential failures! _server.addHandler(&_webSocket); LOG_INFO(TAG, "✅ WebSocket handler attached to AsyncWebServer on /ws"); //Now initialize MQTT client (can fail without breaking WebSocket) try { LOG_INFO(TAG, "Setting up MQTT client..."); _mqttClient.begin(); _mqttClient.setCallback([this](const String& topic, const String& payload) { onMqttMessage(topic, payload); }); // Setup MQTT logging callback String logTopic = "vesper/" + _configManager.getDeviceUID() + "/logs"; Logging::setMqttPublishCallback( [this](const String& topic, const String& payload, int qos) { _mqttClient.publish(topic, payload, qos, false); }, logTopic ); // Apply log levels from config for all three channels Logging::setSerialLevel((Logging::LogLevel)_configManager.getSerialLogLevel()); Logging::setMqttLevel((Logging::LogLevel)_configManager.getMqttLogLevel()); Logging::setSdLevel((Logging::LogLevel)_configManager.getSdLogLevel()); LOG_INFO(TAG, "Log levels applied — Serial:%d MQTT:%d SD:%d", _configManager.getSerialLogLevel(), _configManager.getMqttLogLevel(), _configManager.getSdLogLevel()); // Silence MQTT-internal subsystems on the MQTT channel to prevent log storms. // These systems generate logs while sending logs — suppress them over MQTT only. Logging::setSubsystemMqttLevel("MQTTClient", Logging::NONE); Logging::setSubsystemMqttLevel("CommRouter", Logging::WARNING); Logging::setSubsystemMqttLevel("Logger", Logging::NONE); LOG_INFO(TAG, "✅ MQTT client initialized"); } catch (...) { LOG_ERROR(TAG, "❌ MQTT initialization failed, but WebSocket is still available"); } // Wire up SD logging channel (requires FileManager to be set first via setFileManagerReference) // SD callback is registered lazily in setFileManagerReference once the pointer is available // 🔥 CRITICAL FIX: Connect ClientManager to CommandHandler _commandHandler.setClientManagerReference(&_clientManager); LOG_INFO(TAG, "ClientManager reference set for CommandHandler"); // 🔥 Set CommunicationRouter reference for MQTT control commands _commandHandler.setCommunicationRouterReference(this); LOG_INFO(TAG, "CommunicationRouter reference set for CommandHandler"); // Setup command handler response callback _commandHandler.setResponseCallback([this](const String& response, const CommandHandler::MessageContext& context) { sendResponse(response, context); }); // Initialize HTTP Request Handler LOG_INFO(TAG, "Setting up HTTP REST API..."); _httpHandler.begin(); _httpHandler.setCommandHandlerReference(&_commandHandler); LOG_INFO(TAG, "✅ HTTP REST API initialized"); // Initialize Settings Web Server LOG_INFO(TAG, "Setting up Settings Web Server..."); _settingsServer.begin(); LOG_INFO(TAG, "✅ Settings Web Server initialized at /settings"); // Initialize UART Command Handler LOG_INFO(TAG, "Setting up UART Command Handler..."); _uartHandler.begin(); _uartHandler.setCallback([this](JsonDocument& message) { onUartMessage(message); }); LOG_INFO(TAG, "✅ UART Command Handler initialized (TX: GPIO12, RX: GPIO13)"); LOG_INFO(TAG, "Communication Router initialized with modular architecture"); LOG_INFO(TAG, " • MQTT: AsyncMqttClient"); LOG_INFO(TAG, " • WebSocket: Multi-client support"); LOG_INFO(TAG, " • HTTP REST API: /api endpoints"); LOG_INFO(TAG, " • UART: External device control"); LOG_INFO(TAG, " • Settings Page: /settings"); } void CommunicationRouter::loop() { // Process UART incoming data _uartHandler.loop(); } void CommunicationRouter::setPlayerReference(Player* player) { _player = player; _commandHandler.setPlayerReference(player); } void CommunicationRouter::setFileManagerReference(FileManager* fm) { _fileManager = fm; _commandHandler.setFileManagerReference(fm); // Register SD log channel now that FileManager is available if (fm != nullptr) { Logging::setSdWriteCallback([fm](const String& line) { fm->appendLine("/logs/vesper.log", line); }); LOG_INFO(TAG, "SD log channel registered -> /logs/vesper.log"); } } void CommunicationRouter::setTimeKeeperReference(Timekeeper* tk) { _timeKeeper = tk; _commandHandler.setTimeKeeperReference(tk); } void CommunicationRouter::setFirmwareValidatorReference(FirmwareValidator* fv) { _firmwareValidator = fv; _commandHandler.setFirmwareValidatorReference(fv); } void CommunicationRouter::setTelemetryReference(Telemetry* telemetry) { _commandHandler.setTelemetryReference(telemetry); } void CommunicationRouter::setupUdpDiscovery() { uint16_t discoveryPort = _configManager.getNetworkConfig().discoveryPort; if (_udp.listen(discoveryPort)) { LOG_INFO(TAG, "UDP discovery listening on port %u", discoveryPort); _udp.onPacket([this](AsyncUDPPacket packet) { String msg = String((const char*)packet.data(), packet.length()); LOG_DEBUG(TAG, "UDP from %s:%u -> %s", packet.remoteIP().toString().c_str(), packet.remotePort(), msg.c_str()); bool shouldReply = false; if (msg.indexOf("discover") >= 0) { shouldReply = true; } else { StaticJsonDocument<128> req; DeserializationError err = deserializeJson(req, msg); if (!err) { shouldReply = (req["op"] == "discover"); } } if (!shouldReply) return; StaticJsonDocument<256> doc; doc["op"] = "discover_reply"; doc["svc"] = "vesper"; doc["ver"] = 1; doc["name"] = "Proj. Vesper v2.0"; doc["id"] = _configManager.getDeviceUID(); doc["ip"] = _networking.getLocalIP(); char wsUrl[64]; snprintf(wsUrl, sizeof(wsUrl), "ws://%s:80/ws", _networking.getLocalIP().c_str()); doc["ws"] = wsUrl; doc["port"] = 80; doc["fw"] = "2.0"; doc["clients"] = _clientManager.getClientCount(); String out; serializeJson(doc, out); _udp.writeTo((const uint8_t*)out.c_str(), out.length(), packet.remoteIP(), packet.remotePort()); }); } else { LOG_ERROR(TAG, "Failed to start UDP discovery."); } } bool CommunicationRouter::isMqttConnected() const { return _mqttClient.isConnected(); } bool CommunicationRouter::hasActiveWebSocketClients() const { return _wsServer.hasClients(); } size_t CommunicationRouter::getWebSocketClientCount() const { return _wsServer.getClientCount(); } bool CommunicationRouter::isHealthy() const { // Check if required references are set if (!_player || !_fileManager || !_timeKeeper) { LOG_WARNING(TAG, "Unhealthy - missing subsystem references (player=%d fileManager=%d timeKeeper=%d)", _player != nullptr, _fileManager != nullptr, _timeKeeper != nullptr); return false; } // Check network connectivity first — no point checking connections without a network if (!_networking.isConnected()) { LOG_WARNING(TAG, "Unhealthy - no network connection"); return false; } // Check if at least one protocol is connected if (!isMqttConnected() && !hasActiveWebSocketClients()) { LOG_WARNING(TAG, "Unhealthy - no active connections (MQTT=%d, WebSocket=%d)", isMqttConnected(), hasActiveWebSocketClients()); return false; } return true; } void CommunicationRouter::broadcastStatus(const String& statusMessage) { publishToMqtt(statusMessage); _wsServer.broadcastToAll(statusMessage); } void CommunicationRouter::broadcastStatus(const JsonDocument& statusJson) { String statusMessage; serializeJson(statusJson, statusMessage); broadcastStatus(statusMessage); } void CommunicationRouter::broadcastToMasterClients(const String& message) { _wsServer.broadcastToMaster(message); } void CommunicationRouter::broadcastToSecondaryClients(const String& message) { _wsServer.broadcastToSecondary(message); } void CommunicationRouter::broadcastToAllWebSocketClients(const String& message) { _wsServer.broadcastToAll(message); } void CommunicationRouter::broadcastToAllWebSocketClients(const JsonDocument& message) { String messageStr; serializeJson(message, messageStr); _wsServer.broadcastToAll(messageStr); } void CommunicationRouter::publishToMqtt(const String& data) { if (_mqttClient.isConnected()) { _mqttClient.publish("data", data, 0, false); LOG_DEBUG(TAG, "Published to MQTT: %s", data.c_str()); } else { LOG_ERROR(TAG, "MQTT Not Connected! Message Failed: %s", data.c_str()); } } void CommunicationRouter::sendBellOverloadNotification(const std::vector& bellNumbers, const std::vector& bellLoads, const String& severity) { StaticJsonDocument<512> overloadMsg; overloadMsg["status"] = "INFO"; overloadMsg["type"] = "bell_overload"; JsonArray bellsArray = overloadMsg["payload"]["bells"].to(); JsonArray loadsArray = overloadMsg["payload"]["loads"].to(); for (size_t i = 0; i < bellNumbers.size() && i < bellLoads.size(); i++) { bellsArray.add(bellNumbers[i] + 1); loadsArray.add(bellLoads[i]); } overloadMsg["payload"]["severity"] = severity; broadcastStatus(overloadMsg); LOG_WARNING(TAG, "Bell overload notification sent: %d bells, severity: %s", bellNumbers.size(), severity.c_str()); } void CommunicationRouter::onNetworkConnected() { LOG_DEBUG(TAG, "Network connected - notifying MQTT client"); _mqttClient.onNetworkConnected(); } void CommunicationRouter::onNetworkDisconnected() { LOG_DEBUG(TAG, "Network disconnected - notifying MQTT client"); _mqttClient.onNetworkDisconnected(); } void CommunicationRouter::onMqttMessage(const String& topic, const String& payload) { LOG_DEBUG(TAG, "MQTT message received: %s", payload.c_str()); // Parse JSON StaticJsonDocument<2048> doc; DeserializationError error = deserializeJson(doc, payload); if (error) { LOG_ERROR(TAG, "Failed to parse MQTT JSON: %s", error.c_str()); return; } // Create message context for MQTT CommandHandler::MessageContext context(CommandHandler::MessageSource::MQTT); // Forward to command handler _commandHandler.processCommand(doc, context); } void CommunicationRouter::onWebSocketMessage(uint32_t clientId, const JsonDocument& message) { // Extract command for logging String cmd = message["cmd"] | "unknown"; LOG_INFO(TAG, "📨 WebSocket message from client #%u: cmd=%s", clientId, cmd.c_str()); // Create message context for WebSocket with client ID CommandHandler::MessageContext context(CommandHandler::MessageSource::WEBSOCKET, clientId); // Forward to command handler (need to cast away const for now) JsonDocument& mutableDoc = const_cast(message); _commandHandler.processCommand(mutableDoc, context); LOG_DEBUG(TAG, "WebSocket message from client #%u processed", clientId); } void CommunicationRouter::onUartMessage(JsonDocument& message) { // Extract command and action for filtering String cmd = message["cmd"] | ""; String action = message["contents"]["action"] | ""; // UART COMMAND WHITELIST: Only allow specific commands // This prevents feedback loops between devices when bad messages occur. // To re-enable full UART command support, remove this filter. bool allowed = false; if (cmd == "system_info" && action == "sync_time_to_lcd") { allowed = true; } else if (cmd == "playback" && (action == "play" || action == "stop")) { allowed = true; } if (!allowed) { // Silently ignore - do NOT send error response to avoid feedback loop LOG_DEBUG(TAG, "UART: Ignoring non-whitelisted command (cmd=%s, action=%s)", cmd.c_str(), action.c_str()); return; } LOG_INFO(TAG, "🔌 UART command received: cmd=%s, action=%s", cmd.c_str(), action.c_str()); // Create message context for UART CommandHandler::MessageContext context(CommandHandler::MessageSource::UART); // Forward to command handler _commandHandler.processCommand(message, context); LOG_DEBUG(TAG, "UART message processed"); } void CommunicationRouter::sendResponse(const String& response, const CommandHandler::MessageContext& context) { if (context.source == CommandHandler::MessageSource::MQTT) { LOG_DEBUG(TAG, "↗️ Sending response via MQTT: %s", response.c_str()); publishToMqtt(response); } else if (context.source == CommandHandler::MessageSource::WEBSOCKET) { LOG_DEBUG(TAG, "↗️ Sending response to WebSocket client #%u: %s", context.clientId, response.c_str()); _wsServer.sendToClient(context.clientId, response); } else if (context.source == CommandHandler::MessageSource::UART) { LOG_DEBUG(TAG, "↗️ Sending response via UART: %s", response.c_str()); _uartHandler.send(response); } else { LOG_ERROR(TAG, "❌ Unknown message source for response routing!"); } }