140 lines
4.6 KiB
Python
140 lines
4.6 KiB
Python
import json
|
|
import logging
|
|
import asyncio
|
|
from typing import Set
|
|
import paho.mqtt.client as paho_mqtt
|
|
from config import settings
|
|
|
|
logger = logging.getLogger("mqtt.client")
|
|
|
|
|
|
class MqttManager:
|
|
"""Singleton MQTT client manager."""
|
|
|
|
def __init__(self):
|
|
self._client: paho_mqtt.Client | None = None
|
|
self._connected = False
|
|
self._loop: asyncio.AbstractEventLoop | None = None
|
|
self._ws_subscribers: Set = set()
|
|
|
|
@property
|
|
def connected(self) -> bool:
|
|
return self._connected
|
|
|
|
def start(self, loop: asyncio.AbstractEventLoop):
|
|
self._loop = loop
|
|
|
|
self._client = paho_mqtt.Client(
|
|
callback_api_version=paho_mqtt.CallbackAPIVersion.VERSION2,
|
|
client_id="bellsystems-admin-panel",
|
|
clean_session=True,
|
|
)
|
|
|
|
if settings.mqtt_admin_username and settings.mqtt_admin_password:
|
|
self._client.username_pw_set(
|
|
settings.mqtt_admin_username,
|
|
settings.mqtt_admin_password,
|
|
)
|
|
|
|
self._client.on_connect = self._on_connect
|
|
self._client.on_disconnect = self._on_disconnect
|
|
self._client.on_message = self._on_message
|
|
|
|
try:
|
|
self._client.connect_async(
|
|
settings.mqtt_broker_host,
|
|
settings.mqtt_broker_port,
|
|
)
|
|
self._client.loop_start()
|
|
logger.info(f"MQTT client connecting to {settings.mqtt_broker_host}:{settings.mqtt_broker_port}")
|
|
except Exception as e:
|
|
logger.warning(f"MQTT client failed to start: {e}")
|
|
|
|
def stop(self):
|
|
if self._client:
|
|
self._client.loop_stop()
|
|
self._client.disconnect()
|
|
self._connected = False
|
|
logger.info("MQTT client disconnected")
|
|
|
|
def _on_connect(self, client, userdata, flags, reason_code, properties):
|
|
if reason_code == 0:
|
|
self._connected = True
|
|
logger.info("MQTT connected, subscribing to topics")
|
|
client.subscribe([
|
|
("vesper/+/data", 1),
|
|
("vesper/+/status/heartbeat", 1),
|
|
("vesper/+/logs", 1),
|
|
])
|
|
else:
|
|
logger.error(f"MQTT connection failed: {reason_code}")
|
|
|
|
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
|
|
self._connected = False
|
|
if reason_code != 0:
|
|
logger.warning(f"MQTT disconnected unexpectedly: {reason_code}")
|
|
|
|
def _on_message(self, client, userdata, msg):
|
|
"""Called from paho thread — bridge to asyncio."""
|
|
try:
|
|
topic = msg.topic
|
|
payload = json.loads(msg.payload.decode("utf-8"))
|
|
parts = topic.split("/")
|
|
if len(parts) < 3 or parts[0] != "vesper":
|
|
return
|
|
|
|
serial = parts[1]
|
|
topic_type = "/".join(parts[2:])
|
|
|
|
if self._loop and self._loop.is_running():
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._process_message(serial, topic_type, payload, topic),
|
|
self._loop,
|
|
)
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Invalid JSON on topic {msg.topic}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing MQTT message: {e}")
|
|
|
|
async def _process_message(self, serial: str, topic_type: str,
|
|
payload: dict, raw_topic: str):
|
|
from mqtt.logger import handle_message
|
|
await handle_message(serial, topic_type, payload)
|
|
|
|
ws_data = {
|
|
"type": topic_type,
|
|
"device_serial": serial,
|
|
"payload": payload,
|
|
"topic": raw_topic,
|
|
}
|
|
await self._broadcast_ws(ws_data)
|
|
|
|
async def _broadcast_ws(self, data: dict):
|
|
msg = json.dumps(data)
|
|
dead = set()
|
|
for ws in self._ws_subscribers:
|
|
try:
|
|
await ws.send_text(msg)
|
|
except Exception:
|
|
dead.add(ws)
|
|
self._ws_subscribers -= dead
|
|
|
|
def add_ws_subscriber(self, websocket):
|
|
self._ws_subscribers.add(websocket)
|
|
|
|
def remove_ws_subscriber(self, websocket):
|
|
self._ws_subscribers.discard(websocket)
|
|
|
|
def publish_command(self, device_serial: str, cmd: str,
|
|
contents: dict) -> bool:
|
|
if not self._client or not self._connected:
|
|
return False
|
|
|
|
topic = f"vesper/{device_serial}/control"
|
|
payload = json.dumps({"cmd": cmd, "contents": contents})
|
|
result = self._client.publish(topic, payload, qos=1)
|
|
return result.rc == paho_mqtt.MQTT_ERR_SUCCESS
|
|
|
|
|
|
mqtt_manager = MqttManager()
|