Fixed MQTT and WS Routing - w/o SSL
This commit is contained in:
@@ -0,0 +1,300 @@
|
||||
/*
|
||||
* COMMUNICATIONROUTER.CPP - Communication Router Implementation
|
||||
*/
|
||||
|
||||
#include "CommunicationRouter.hpp"
|
||||
#include "../../ConfigManager/ConfigManager.hpp"
|
||||
#include "../../OTAManager/OTAManager.hpp"
|
||||
#include "../../Networking/Networking.hpp"
|
||||
#include "../../Logging/Logging.hpp"
|
||||
#include "../../Player/Player.hpp"
|
||||
#include "../../FileManager/FileManager.hpp"
|
||||
#include "../../TimeKeeper/TimeKeeper.hpp"
|
||||
#include "../../FirmwareValidator/FirmwareValidator.hpp"
|
||||
|
||||
CommunicationRouter::CommunicationRouter(ConfigManager& configManager,
|
||||
OTAManager& otaManager,
|
||||
Networking& networking,
|
||||
AsyncWebServer& server,
|
||||
AsyncWebSocket& webSocket,
|
||||
AsyncUDP& udp)
|
||||
: _configManager(configManager)
|
||||
, _otaManager(otaManager)
|
||||
, _networking(networking)
|
||||
, _server(server)
|
||||
, _webSocket(webSocket)
|
||||
, _udp(udp)
|
||||
, _player(nullptr)
|
||||
, _fileManager(nullptr)
|
||||
, _timeKeeper(nullptr)
|
||||
, _firmwareValidator(nullptr)
|
||||
, _mqttClient(configManager, networking)
|
||||
, _clientManager()
|
||||
, _wsServer(webSocket, _clientManager)
|
||||
, _commandHandler(configManager, otaManager) {}
|
||||
|
||||
CommunicationRouter::~CommunicationRouter() {}
|
||||
|
||||
void CommunicationRouter::begin() {
|
||||
LOG_INFO("Initializing Communication Router v4.0 (Modular)");
|
||||
|
||||
// 🔥 CRITICAL: Initialize WebSocket FIRST to ensure it's always set up
|
||||
// Even if MQTT fails, we want WebSocket to work!
|
||||
LOG_INFO("Setting up WebSocket server...");
|
||||
|
||||
// Initialize WebSocket server
|
||||
_wsServer.begin();
|
||||
_wsServer.setCallback([this](uint32_t clientId, const JsonDocument& message) {
|
||||
onWebSocketMessage(clientId, message);
|
||||
});
|
||||
|
||||
// 🔥 CRITICAL FIX: Attach WebSocket handler to AsyncWebServer
|
||||
// This MUST happen before any potential failures!
|
||||
_server.addHandler(&_webSocket);
|
||||
LOG_INFO("✅ WebSocket handler attached to AsyncWebServer on /ws");
|
||||
|
||||
//Now initialize MQTT client (can fail without breaking WebSocket)
|
||||
try {
|
||||
LOG_INFO("Setting up MQTT client...");
|
||||
_mqttClient.begin();
|
||||
_mqttClient.setCallback([this](const String& topic, const String& payload) {
|
||||
onMqttMessage(topic, payload);
|
||||
});
|
||||
LOG_INFO("✅ MQTT client initialized");
|
||||
} catch (...) {
|
||||
LOG_ERROR("❌ MQTT initialization failed, but WebSocket is still available");
|
||||
}
|
||||
|
||||
// 🔥 CRITICAL FIX: Connect ClientManager to CommandHandler
|
||||
_commandHandler.setClientManagerReference(&_clientManager);
|
||||
LOG_INFO("ClientManager reference set for CommandHandler");
|
||||
|
||||
// Setup command handler response callback
|
||||
_commandHandler.setResponseCallback([this](const String& response, const CommandHandler::MessageContext& context) {
|
||||
sendResponse(response, context);
|
||||
});
|
||||
|
||||
LOG_INFO("Communication Router initialized with modular architecture");
|
||||
}
|
||||
|
||||
void CommunicationRouter::setPlayerReference(Player* player) {
|
||||
_player = player;
|
||||
_commandHandler.setPlayerReference(player);
|
||||
}
|
||||
|
||||
void CommunicationRouter::setFileManagerReference(FileManager* fm) {
|
||||
_fileManager = fm;
|
||||
_commandHandler.setFileManagerReference(fm);
|
||||
}
|
||||
|
||||
void CommunicationRouter::setTimeKeeperReference(Timekeeper* tk) {
|
||||
_timeKeeper = tk;
|
||||
_commandHandler.setTimeKeeperReference(tk);
|
||||
}
|
||||
|
||||
void CommunicationRouter::setFirmwareValidatorReference(FirmwareValidator* fv) {
|
||||
_firmwareValidator = fv;
|
||||
_commandHandler.setFirmwareValidatorReference(fv);
|
||||
}
|
||||
|
||||
void CommunicationRouter::setupUdpDiscovery() {
|
||||
uint16_t discoveryPort = _configManager.getNetworkConfig().discoveryPort;
|
||||
if (_udp.listen(discoveryPort)) {
|
||||
LOG_INFO("UDP discovery listening on port %u", discoveryPort);
|
||||
|
||||
_udp.onPacket([this](AsyncUDPPacket packet) {
|
||||
String msg = String((const char*)packet.data(), packet.length());
|
||||
LOG_DEBUG("UDP from %s:%u -> %s",
|
||||
packet.remoteIP().toString().c_str(),
|
||||
packet.remotePort(),
|
||||
msg.c_str());
|
||||
|
||||
bool shouldReply = false;
|
||||
|
||||
if (msg.indexOf("discover") >= 0) {
|
||||
shouldReply = true;
|
||||
} else {
|
||||
StaticJsonDocument<128> req;
|
||||
DeserializationError err = deserializeJson(req, msg);
|
||||
if (!err) {
|
||||
shouldReply = (req["op"] == "discover" && req["svc"] == "vesper");
|
||||
}
|
||||
}
|
||||
|
||||
if (!shouldReply) return;
|
||||
|
||||
StaticJsonDocument<256> doc;
|
||||
doc["op"] = "discover_reply";
|
||||
doc["svc"] = "vesper";
|
||||
doc["ver"] = 1;
|
||||
|
||||
doc["name"] = "Proj. Vesper v2.0";
|
||||
doc["id"] = _configManager.getDeviceUID();
|
||||
doc["ip"] = _networking.getLocalIP();
|
||||
char wsUrl[64];
|
||||
snprintf(wsUrl, sizeof(wsUrl), "ws://%s/ws", _networking.getLocalIP().c_str());
|
||||
doc["ws"] = wsUrl;
|
||||
doc["port"] = 80;
|
||||
doc["fw"] = "2.0";
|
||||
doc["clients"] = _clientManager.getClientCount();
|
||||
|
||||
String out;
|
||||
serializeJson(doc, out);
|
||||
|
||||
_udp.writeTo((const uint8_t*)out.c_str(), out.length(),
|
||||
packet.remoteIP(), packet.remotePort());
|
||||
});
|
||||
} else {
|
||||
LOG_ERROR("Failed to start UDP discovery.");
|
||||
}
|
||||
}
|
||||
|
||||
bool CommunicationRouter::isMqttConnected() const {
|
||||
return _mqttClient.isConnected();
|
||||
}
|
||||
|
||||
bool CommunicationRouter::hasActiveWebSocketClients() const {
|
||||
return _wsServer.hasClients();
|
||||
}
|
||||
|
||||
size_t CommunicationRouter::getWebSocketClientCount() const {
|
||||
return _wsServer.getClientCount();
|
||||
}
|
||||
|
||||
bool CommunicationRouter::isHealthy() const {
|
||||
// Check if required references are set
|
||||
if (!_player || !_fileManager || !_timeKeeper) {
|
||||
LOG_DEBUG("CommunicationRouter: Unhealthy - Missing references");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if at least one protocol is connected
|
||||
if (!isMqttConnected() && !hasActiveWebSocketClients()) {
|
||||
LOG_DEBUG("CommunicationRouter: Unhealthy - No active connections");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check network connectivity
|
||||
if (!_networking.isConnected()) {
|
||||
LOG_DEBUG("CommunicationRouter: Unhealthy - No network connection");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void CommunicationRouter::broadcastStatus(const String& statusMessage) {
|
||||
publishToMqtt(statusMessage);
|
||||
_wsServer.broadcastToAll(statusMessage);
|
||||
}
|
||||
|
||||
void CommunicationRouter::broadcastStatus(const JsonDocument& statusJson) {
|
||||
String statusMessage;
|
||||
serializeJson(statusJson, statusMessage);
|
||||
broadcastStatus(statusMessage);
|
||||
}
|
||||
|
||||
void CommunicationRouter::broadcastToMasterClients(const String& message) {
|
||||
_wsServer.broadcastToMaster(message);
|
||||
}
|
||||
|
||||
void CommunicationRouter::broadcastToSecondaryClients(const String& message) {
|
||||
_wsServer.broadcastToSecondary(message);
|
||||
}
|
||||
|
||||
void CommunicationRouter::broadcastToAllWebSocketClients(const String& message) {
|
||||
_wsServer.broadcastToAll(message);
|
||||
}
|
||||
|
||||
void CommunicationRouter::broadcastToAllWebSocketClients(const JsonDocument& message) {
|
||||
String messageStr;
|
||||
serializeJson(message, messageStr);
|
||||
_wsServer.broadcastToAll(messageStr);
|
||||
}
|
||||
|
||||
void CommunicationRouter::publishToMqtt(const String& data) {
|
||||
if (_mqttClient.isConnected()) {
|
||||
_mqttClient.publish("data", data, 0, false);
|
||||
LOG_DEBUG("Published to MQTT: %s", data.c_str());
|
||||
} else {
|
||||
LOG_ERROR("MQTT Not Connected! Message Failed: %s", data.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void CommunicationRouter::sendBellOverloadNotification(const std::vector<uint8_t>& bellNumbers,
|
||||
const std::vector<uint16_t>& bellLoads,
|
||||
const String& severity) {
|
||||
StaticJsonDocument<512> overloadMsg;
|
||||
overloadMsg["status"] = "INFO";
|
||||
overloadMsg["type"] = "bell_overload";
|
||||
|
||||
JsonArray bellsArray = overloadMsg["payload"]["bells"].to<JsonArray>();
|
||||
JsonArray loadsArray = overloadMsg["payload"]["loads"].to<JsonArray>();
|
||||
|
||||
for (size_t i = 0; i < bellNumbers.size() && i < bellLoads.size(); i++) {
|
||||
bellsArray.add(bellNumbers[i] + 1);
|
||||
loadsArray.add(bellLoads[i]);
|
||||
}
|
||||
|
||||
overloadMsg["payload"]["severity"] = severity;
|
||||
broadcastStatus(overloadMsg);
|
||||
|
||||
LOG_WARNING("Bell overload notification sent: %d bells, severity: %s",
|
||||
bellNumbers.size(), severity.c_str());
|
||||
}
|
||||
|
||||
void CommunicationRouter::onNetworkConnected() {
|
||||
LOG_DEBUG("Network connected - notifying MQTT client");
|
||||
_mqttClient.onNetworkConnected();
|
||||
}
|
||||
|
||||
void CommunicationRouter::onNetworkDisconnected() {
|
||||
LOG_DEBUG("Network disconnected - notifying MQTT client");
|
||||
_mqttClient.onNetworkDisconnected();
|
||||
}
|
||||
|
||||
void CommunicationRouter::onMqttMessage(const String& topic, const String& payload) {
|
||||
LOG_DEBUG("MQTT message received: %s", payload.c_str());
|
||||
|
||||
// Parse JSON
|
||||
StaticJsonDocument<2048> doc;
|
||||
DeserializationError error = deserializeJson(doc, payload);
|
||||
|
||||
if (error) {
|
||||
LOG_ERROR("Failed to parse MQTT JSON: %s", error.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
// Create message context for MQTT
|
||||
CommandHandler::MessageContext context(CommandHandler::MessageSource::MQTT);
|
||||
|
||||
// Forward to command handler
|
||||
_commandHandler.processCommand(doc, context);
|
||||
}
|
||||
|
||||
void CommunicationRouter::onWebSocketMessage(uint32_t clientId, const JsonDocument& message) {
|
||||
// Extract command for logging
|
||||
String cmd = message["cmd"] | "unknown";
|
||||
LOG_INFO("📨 WebSocket message from client #%u: cmd=%s", clientId, cmd.c_str());
|
||||
|
||||
// Create message context for WebSocket with client ID
|
||||
CommandHandler::MessageContext context(CommandHandler::MessageSource::WEBSOCKET, clientId);
|
||||
|
||||
// Forward to command handler (need to cast away const for now)
|
||||
JsonDocument& mutableDoc = const_cast<JsonDocument&>(message);
|
||||
_commandHandler.processCommand(mutableDoc, context);
|
||||
|
||||
LOG_DEBUG("WebSocket message from client #%u processed", clientId);
|
||||
}
|
||||
|
||||
void CommunicationRouter::sendResponse(const String& response, const CommandHandler::MessageContext& context) {
|
||||
if (context.source == CommandHandler::MessageSource::MQTT) {
|
||||
LOG_DEBUG("↗️ Sending response via MQTT: %s", response.c_str());
|
||||
publishToMqtt(response);
|
||||
} else if (context.source == CommandHandler::MessageSource::WEBSOCKET) {
|
||||
LOG_DEBUG("↗️ Sending response to WebSocket client #%u: %s", context.clientId, response.c_str());
|
||||
_wsServer.sendToClient(context.clientId, response);
|
||||
} else {
|
||||
LOG_ERROR("❌ Unknown message source for response routing!");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user