Phase 5 Complete by Claude Code

This commit is contained in:
2026-02-17 23:45:20 +02:00
parent fc2d04b8bb
commit c0605c77db
17 changed files with 1663 additions and 12 deletions

View File

@@ -1 +1,139 @@
# TODO: MQTT client wrapper (paho-mqtt)
import json
import logging
import asyncio
from typing import Set
import paho.mqtt.client as paho_mqtt
from config import settings
logger = logging.getLogger("mqtt.client")
class MqttManager:
"""Singleton MQTT client manager."""
def __init__(self):
self._client: paho_mqtt.Client | None = None
self._connected = False
self._loop: asyncio.AbstractEventLoop | None = None
self._ws_subscribers: Set = set()
@property
def connected(self) -> bool:
return self._connected
def start(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
self._client = paho_mqtt.Client(
callback_api_version=paho_mqtt.CallbackAPIVersion.VERSION2,
client_id="bellsystems-admin-panel",
clean_session=True,
)
if settings.mqtt_admin_username and settings.mqtt_admin_password:
self._client.username_pw_set(
settings.mqtt_admin_username,
settings.mqtt_admin_password,
)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
try:
self._client.connect_async(
settings.mqtt_broker_host,
settings.mqtt_broker_port,
)
self._client.loop_start()
logger.info(f"MQTT client connecting to {settings.mqtt_broker_host}:{settings.mqtt_broker_port}")
except Exception as e:
logger.warning(f"MQTT client failed to start: {e}")
def stop(self):
if self._client:
self._client.loop_stop()
self._client.disconnect()
self._connected = False
logger.info("MQTT client disconnected")
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
self._connected = True
logger.info("MQTT connected, subscribing to topics")
client.subscribe([
("vesper/+/data", 1),
("vesper/+/status/heartbeat", 1),
("vesper/+/logs", 1),
])
else:
logger.error(f"MQTT connection failed: {reason_code}")
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
self._connected = False
if reason_code != 0:
logger.warning(f"MQTT disconnected unexpectedly: {reason_code}")
def _on_message(self, client, userdata, msg):
"""Called from paho thread — bridge to asyncio."""
try:
topic = msg.topic
payload = json.loads(msg.payload.decode("utf-8"))
parts = topic.split("/")
if len(parts) < 3 or parts[0] != "vesper":
return
serial = parts[1]
topic_type = "/".join(parts[2:])
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(
self._process_message(serial, topic_type, payload, topic),
self._loop,
)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON on topic {msg.topic}")
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
async def _process_message(self, serial: str, topic_type: str,
payload: dict, raw_topic: str):
from mqtt.logger import handle_message
await handle_message(serial, topic_type, payload)
ws_data = {
"type": topic_type,
"device_serial": serial,
"payload": payload,
"topic": raw_topic,
}
await self._broadcast_ws(ws_data)
async def _broadcast_ws(self, data: dict):
msg = json.dumps(data)
dead = set()
for ws in self._ws_subscribers:
try:
await ws.send_text(msg)
except Exception:
dead.add(ws)
self._ws_subscribers -= dead
def add_ws_subscriber(self, websocket):
self._ws_subscribers.add(websocket)
def remove_ws_subscriber(self, websocket):
self._ws_subscribers.discard(websocket)
def publish_command(self, device_serial: str, cmd: str,
contents: dict) -> bool:
if not self._client or not self._connected:
return False
topic = f"vesper/{device_serial}/control"
payload = json.dumps({"cmd": cmd, "contents": contents})
result = self._client.publish(topic, payload, qos=1)
return result.rc == paho_mqtt.MQTT_ERR_SUCCESS
mqtt_manager = MqttManager()

222
backend/mqtt/database.py Normal file
View File

@@ -0,0 +1,222 @@
import aiosqlite
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from config import settings
logger = logging.getLogger("mqtt.database")
_db: aiosqlite.Connection | None = None
SCHEMA_STATEMENTS = [
"""CREATE TABLE IF NOT EXISTS device_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_serial TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
device_timestamp INTEGER,
received_at TEXT NOT NULL DEFAULT (datetime('now'))
)""",
"""CREATE TABLE IF NOT EXISTS heartbeats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_serial TEXT NOT NULL,
device_id TEXT,
firmware_version TEXT,
ip_address TEXT,
gateway TEXT,
uptime_ms INTEGER,
uptime_display TEXT,
received_at TEXT NOT NULL DEFAULT (datetime('now'))
)""",
"""CREATE TABLE IF NOT EXISTS commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_serial TEXT NOT NULL,
command_name TEXT NOT NULL,
command_payload TEXT,
status TEXT NOT NULL DEFAULT 'pending',
response_payload TEXT,
sent_at TEXT NOT NULL DEFAULT (datetime('now')),
responded_at TEXT
)""",
"CREATE INDEX IF NOT EXISTS idx_logs_serial_time ON device_logs(device_serial, received_at)",
"CREATE INDEX IF NOT EXISTS idx_logs_level ON device_logs(level)",
"CREATE INDEX IF NOT EXISTS idx_heartbeats_serial_time ON heartbeats(device_serial, received_at)",
"CREATE INDEX IF NOT EXISTS idx_commands_serial_time ON commands(device_serial, sent_at)",
"CREATE INDEX IF NOT EXISTS idx_commands_status ON commands(status)",
]
async def init_db():
global _db
_db = await aiosqlite.connect(settings.sqlite_db_path)
_db.row_factory = aiosqlite.Row
for stmt in SCHEMA_STATEMENTS:
await _db.execute(stmt)
await _db.commit()
logger.info(f"SQLite database initialized at {settings.sqlite_db_path}")
async def close_db():
global _db
if _db:
await _db.close()
_db = None
async def get_db() -> aiosqlite.Connection:
if _db is None:
await init_db()
return _db
# --- Insert Operations ---
async def insert_log(device_serial: str, level: str, message: str,
device_timestamp: int | None = None):
db = await get_db()
cursor = await db.execute(
"INSERT INTO device_logs (device_serial, level, message, device_timestamp) VALUES (?, ?, ?, ?)",
(device_serial, level, message, device_timestamp)
)
await db.commit()
return cursor.lastrowid
async def insert_heartbeat(device_serial: str, device_id: str,
firmware_version: str, ip_address: str,
gateway: str, uptime_ms: int, uptime_display: str):
db = await get_db()
cursor = await db.execute(
"""INSERT INTO heartbeats
(device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display)
)
await db.commit()
return cursor.lastrowid
async def insert_command(device_serial: str, command_name: str,
command_payload: dict) -> int:
db = await get_db()
cursor = await db.execute(
"INSERT INTO commands (device_serial, command_name, command_payload) VALUES (?, ?, ?)",
(device_serial, command_name, json.dumps(command_payload))
)
await db.commit()
return cursor.lastrowid
async def update_command_response(command_id: int, status: str,
response_payload: dict | None = None):
db = await get_db()
await db.execute(
"""UPDATE commands SET status = ?, response_payload = ?,
responded_at = datetime('now') WHERE id = ?""",
(status, json.dumps(response_payload) if response_payload else None, command_id)
)
await db.commit()
# --- Query Operations ---
async def get_logs(device_serial: str, level: str | None = None,
search: str | None = None,
limit: int = 100, offset: int = 0) -> tuple[list, int]:
db = await get_db()
where_clauses = ["device_serial = ?"]
params: list = [device_serial]
if level:
where_clauses.append("level = ?")
params.append(level)
if search:
where_clauses.append("message LIKE ?")
params.append(f"%{search}%")
where = " AND ".join(where_clauses)
count_row = await db.execute_fetchall(
f"SELECT COUNT(*) as cnt FROM device_logs WHERE {where}", params
)
total = count_row[0][0]
rows = await db.execute_fetchall(
f"SELECT * FROM device_logs WHERE {where} ORDER BY received_at DESC LIMIT ? OFFSET ?",
params + [limit, offset]
)
return [dict(r) for r in rows], total
async def get_heartbeats(device_serial: str, limit: int = 100,
offset: int = 0) -> tuple[list, int]:
db = await get_db()
count_row = await db.execute_fetchall(
"SELECT COUNT(*) FROM heartbeats WHERE device_serial = ?", (device_serial,)
)
total = count_row[0][0]
rows = await db.execute_fetchall(
"SELECT * FROM heartbeats WHERE device_serial = ? ORDER BY received_at DESC LIMIT ? OFFSET ?",
(device_serial, limit, offset)
)
return [dict(r) for r in rows], total
async def get_commands(device_serial: str, limit: int = 100,
offset: int = 0) -> tuple[list, int]:
db = await get_db()
count_row = await db.execute_fetchall(
"SELECT COUNT(*) FROM commands WHERE device_serial = ?", (device_serial,)
)
total = count_row[0][0]
rows = await db.execute_fetchall(
"SELECT * FROM commands WHERE device_serial = ? ORDER BY sent_at DESC LIMIT ? OFFSET ?",
(device_serial, limit, offset)
)
return [dict(r) for r in rows], total
async def get_latest_heartbeats() -> list:
db = await get_db()
rows = await db.execute_fetchall("""
SELECT h.* FROM heartbeats h
INNER JOIN (
SELECT device_serial, MAX(received_at) as max_time
FROM heartbeats GROUP BY device_serial
) latest ON h.device_serial = latest.device_serial
AND h.received_at = latest.max_time
""")
return [dict(r) for r in rows]
async def get_pending_command(device_serial: str) -> dict | None:
db = await get_db()
rows = await db.execute_fetchall(
"""SELECT * FROM commands WHERE device_serial = ? AND status = 'pending'
ORDER BY sent_at DESC LIMIT 1""",
(device_serial,)
)
return dict(rows[0]) if rows else None
# --- Cleanup ---
async def purge_old_data(retention_days: int | None = None):
days = retention_days or settings.mqtt_data_retention_days
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
db = await get_db()
await db.execute("DELETE FROM device_logs WHERE received_at < ?", (cutoff,))
await db.execute("DELETE FROM heartbeats WHERE received_at < ?", (cutoff,))
await db.execute("DELETE FROM commands WHERE sent_at < ?", (cutoff,))
await db.commit()
logger.info(f"Purged MQTT data older than {days} days")
async def purge_loop():
while True:
await asyncio.sleep(86400)
try:
await purge_old_data()
except Exception as e:
logger.error(f"Purge failed: {e}")

View File

@@ -1 +1,70 @@
# TODO: Log storage and retrieval
import logging
from mqtt import database as db
logger = logging.getLogger("mqtt.logger")
LEVEL_MAP = {
"🟢 INFO": "INFO",
"🟡 WARN": "WARN",
"🔴 EROR": "ERROR",
"INFO": "INFO",
"WARN": "WARN",
"ERROR": "ERROR",
"EROR": "ERROR",
}
async def handle_message(serial: str, topic_type: str, payload: dict):
try:
if topic_type == "status/heartbeat":
await _handle_heartbeat(serial, payload)
elif topic_type == "logs":
await _handle_log(serial, payload)
elif topic_type == "data":
await _handle_data_response(serial, payload)
else:
logger.debug(f"Unhandled topic type: {topic_type} for {serial}")
except Exception as e:
logger.error(f"Error handling {topic_type} for {serial}: {e}")
async def _handle_heartbeat(serial: str, payload: dict):
inner = payload.get("payload", {})
await db.insert_heartbeat(
device_serial=serial,
device_id=inner.get("device_id", ""),
firmware_version=inner.get("firmware_version", ""),
ip_address=inner.get("ip_address", ""),
gateway=inner.get("gateway", ""),
uptime_ms=inner.get("uptime_ms", 0),
uptime_display=inner.get("timestamp", ""),
)
async def _handle_log(serial: str, payload: dict):
raw_level = payload.get("level", "INFO")
level = LEVEL_MAP.get(raw_level, "INFO")
message = payload.get("message", "")
device_timestamp = payload.get("timestamp")
await db.insert_log(
device_serial=serial,
level=level,
message=message,
device_timestamp=device_timestamp,
)
async def _handle_data_response(serial: str, payload: dict):
status = payload.get("status", "")
pending = await db.get_pending_command(serial)
if pending:
cmd_status = "success" if status == "SUCCESS" else "error"
await db.update_command_response(
command_id=pending["id"],
status=cmd_status,
response_payload=payload,
)
else:
logger.debug(f"Received data response for {serial} with no pending command")

86
backend/mqtt/models.py Normal file
View File

@@ -0,0 +1,86 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Any, Dict
from enum import Enum
class LogLevel(str, Enum):
INFO = "INFO"
WARN = "WARN"
ERROR = "ERROR"
class CommandStatus(str, Enum):
PENDING = "pending"
SUCCESS = "success"
ERROR = "error"
TIMEOUT = "timeout"
class MqttCommandRequest(BaseModel):
cmd: str = Field(..., description="Command name: ping, playback, file_manager, relay_setup, clock_setup, system_info, system")
contents: Dict[str, Any] = Field(default_factory=dict, description="Command payload contents")
class DeviceLogEntry(BaseModel):
id: int
device_serial: str
level: str
message: str
device_timestamp: Optional[int] = None
received_at: str
class HeartbeatEntry(BaseModel):
id: int
device_serial: str
device_id: Optional[str] = None
firmware_version: Optional[str] = None
ip_address: Optional[str] = None
gateway: Optional[str] = None
uptime_ms: Optional[int] = None
uptime_display: Optional[str] = None
received_at: str
class CommandEntry(BaseModel):
id: int
device_serial: str
command_name: str
command_payload: Optional[str] = None
status: str
response_payload: Optional[str] = None
sent_at: str
responded_at: Optional[str] = None
class DeviceMqttStatus(BaseModel):
device_serial: str
online: bool
last_heartbeat: Optional[HeartbeatEntry] = None
seconds_since_heartbeat: Optional[int] = None
class MqttStatusResponse(BaseModel):
devices: List[DeviceMqttStatus]
broker_connected: bool = False
class LogListResponse(BaseModel):
logs: List[DeviceLogEntry]
total: int
class HeartbeatListResponse(BaseModel):
heartbeats: List[HeartbeatEntry]
total: int
class CommandListResponse(BaseModel):
commands: List[CommandEntry]
total: int
class CommandSendResponse(BaseModel):
success: bool
command_id: int
message: str

View File

@@ -1 +1,151 @@
# TODO: Command endpoints + WebSocket for live data
from fastapi import APIRouter, Depends, Query, WebSocket, WebSocketDisconnect
from typing import Optional
from auth.models import TokenPayload
from auth.dependencies import require_device_access, require_viewer
from mqtt.models import (
MqttCommandRequest, CommandSendResponse, MqttStatusResponse,
DeviceMqttStatus, LogListResponse, HeartbeatListResponse,
CommandListResponse, HeartbeatEntry,
)
from mqtt.client import mqtt_manager
from mqtt import database as db
from datetime import datetime, timezone
router = APIRouter(prefix="/api/mqtt", tags=["mqtt"])
@router.get("/status", response_model=MqttStatusResponse)
async def get_all_device_status(
_user: TokenPayload = Depends(require_viewer),
):
heartbeats = await db.get_latest_heartbeats()
now = datetime.now(timezone.utc)
devices = []
for hb in heartbeats:
received_str = hb["received_at"]
try:
received = datetime.fromisoformat(received_str)
if received.tzinfo is None:
received = received.replace(tzinfo=timezone.utc)
seconds_ago = int((now - received).total_seconds())
except (ValueError, TypeError):
seconds_ago = 9999
devices.append(DeviceMqttStatus(
device_serial=hb["device_serial"],
online=seconds_ago < 90,
last_heartbeat=HeartbeatEntry(**hb),
seconds_since_heartbeat=seconds_ago,
))
return MqttStatusResponse(
devices=devices,
broker_connected=mqtt_manager.connected,
)
@router.post("/command/{device_serial}", response_model=CommandSendResponse)
async def send_command(
device_serial: str,
body: MqttCommandRequest,
_user: TokenPayload = Depends(require_device_access),
):
command_id = await db.insert_command(
device_serial=device_serial,
command_name=body.cmd,
command_payload={"cmd": body.cmd, "contents": body.contents},
)
success = mqtt_manager.publish_command(
device_serial=device_serial,
cmd=body.cmd,
contents=body.contents,
)
if not success:
await db.update_command_response(
command_id, "error",
{"error": "MQTT broker not connected"},
)
return CommandSendResponse(
success=False, command_id=command_id,
message="MQTT broker not connected",
)
return CommandSendResponse(
success=True, command_id=command_id,
message=f"Command '{body.cmd}' sent to {device_serial}",
)
@router.get("/logs/{device_serial}", response_model=LogListResponse)
async def get_device_logs(
device_serial: str,
level: Optional[str] = Query(None, description="Filter: INFO, WARN, ERROR"),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
_user: TokenPayload = Depends(require_viewer),
):
logs, total = await db.get_logs(
device_serial, level=level, search=search,
limit=limit, offset=offset,
)
return LogListResponse(logs=logs, total=total)
@router.get("/heartbeats/{device_serial}", response_model=HeartbeatListResponse)
async def get_device_heartbeats(
device_serial: str,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
_user: TokenPayload = Depends(require_viewer),
):
heartbeats, total = await db.get_heartbeats(
device_serial, limit=limit, offset=offset,
)
return HeartbeatListResponse(heartbeats=heartbeats, total=total)
@router.get("/commands/{device_serial}", response_model=CommandListResponse)
async def get_device_commands(
device_serial: str,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
_user: TokenPayload = Depends(require_viewer),
):
commands, total = await db.get_commands(
device_serial, limit=limit, offset=offset,
)
return CommandListResponse(commands=commands, total=total)
@router.websocket("/ws")
async def mqtt_websocket(websocket: WebSocket):
"""Live MQTT data stream. Auth via query param: ?token=JWT"""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Missing token")
return
try:
from auth.utils import decode_access_token
payload = decode_access_token(token)
role = payload.get("role", "")
allowed = {"superadmin", "device_manager", "viewer"}
if role not in allowed:
await websocket.close(code=4003, reason="Insufficient permissions")
return
except Exception:
await websocket.close(code=4001, reason="Invalid token")
return
await websocket.accept()
mqtt_manager.add_ws_subscriber(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
mqtt_manager.remove_ws_subscriber(websocket)