diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 9738ff1..947397a 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -5,7 +5,8 @@ "Bash(npm install:*)", "Bash(npm run build:*)", "Bash(python -c:*)", - "Bash(npx vite build:*)" + "Bash(npx vite build:*)", + "Bash(wc:*)" ] } } diff --git a/.gitignore b/.gitignore index 6808ee7..787ce99 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ dist/ # OS .DS_Store Thumbs.db + +MAIN-APP-REFERENCE/ \ No newline at end of file diff --git a/backend/config.py b/backend/config.py index 722fdab..8a12a62 100644 --- a/backend/config.py +++ b/backend/config.py @@ -20,6 +20,10 @@ class Settings(BaseSettings): mqtt_admin_password: str = "" mosquitto_password_file: str = "/etc/mosquitto/passwd" + # SQLite (MQTT data storage) + sqlite_db_path: str = "./mqtt_data.db" + mqtt_data_retention_days: int = 90 + # App backend_cors_origins: str = '["http://localhost:5173"]' debug: bool = True diff --git a/backend/main.py b/backend/main.py index 9dc2caf..e8d0b31 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,3 +1,4 @@ +import asyncio from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from config import settings @@ -7,6 +8,9 @@ from melodies.router import router as melodies_router from devices.router import router as devices_router from settings.router import router as settings_router from users.router import router as users_router +from mqtt.router import router as mqtt_router +from mqtt.client import mqtt_manager +from mqtt import database as mqtt_db app = FastAPI( title="BellSystems Admin Panel", @@ -28,13 +32,27 @@ app.include_router(melodies_router) app.include_router(devices_router) app.include_router(settings_router) app.include_router(users_router) +app.include_router(mqtt_router) @app.on_event("startup") async def startup(): init_firebase() + await mqtt_db.init_db() + mqtt_manager.start(asyncio.get_event_loop()) + asyncio.create_task(mqtt_db.purge_loop()) + + +@app.on_event("shutdown") +async def shutdown(): + mqtt_manager.stop() + await mqtt_db.close_db() @app.get("/api/health") async def health_check(): - return {"status": "ok", "firebase": firebase_initialized} + return { + "status": "ok", + "firebase": firebase_initialized, + "mqtt": mqtt_manager.connected, + } diff --git a/backend/mqtt/client.py b/backend/mqtt/client.py index 7099d03..eb4743f 100644 --- a/backend/mqtt/client.py +++ b/backend/mqtt/client.py @@ -1 +1,139 @@ -# TODO: MQTT client wrapper (paho-mqtt) +import json +import logging +import asyncio +from typing import Set +import paho.mqtt.client as paho_mqtt +from config import settings + +logger = logging.getLogger("mqtt.client") + + +class MqttManager: + """Singleton MQTT client manager.""" + + def __init__(self): + self._client: paho_mqtt.Client | None = None + self._connected = False + self._loop: asyncio.AbstractEventLoop | None = None + self._ws_subscribers: Set = set() + + @property + def connected(self) -> bool: + return self._connected + + def start(self, loop: asyncio.AbstractEventLoop): + self._loop = loop + + self._client = paho_mqtt.Client( + callback_api_version=paho_mqtt.CallbackAPIVersion.VERSION2, + client_id="bellsystems-admin-panel", + clean_session=True, + ) + + if settings.mqtt_admin_username and settings.mqtt_admin_password: + self._client.username_pw_set( + settings.mqtt_admin_username, + settings.mqtt_admin_password, + ) + + self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect + self._client.on_message = self._on_message + + try: + self._client.connect_async( + settings.mqtt_broker_host, + settings.mqtt_broker_port, + ) + self._client.loop_start() + logger.info(f"MQTT client connecting to {settings.mqtt_broker_host}:{settings.mqtt_broker_port}") + except Exception as e: + logger.warning(f"MQTT client failed to start: {e}") + + def stop(self): + if self._client: + self._client.loop_stop() + self._client.disconnect() + self._connected = False + logger.info("MQTT client disconnected") + + def _on_connect(self, client, userdata, flags, reason_code, properties): + if reason_code == 0: + self._connected = True + logger.info("MQTT connected, subscribing to topics") + client.subscribe([ + ("vesper/+/data", 1), + ("vesper/+/status/heartbeat", 1), + ("vesper/+/logs", 1), + ]) + else: + logger.error(f"MQTT connection failed: {reason_code}") + + def _on_disconnect(self, client, userdata, flags, reason_code, properties): + self._connected = False + if reason_code != 0: + logger.warning(f"MQTT disconnected unexpectedly: {reason_code}") + + def _on_message(self, client, userdata, msg): + """Called from paho thread — bridge to asyncio.""" + try: + topic = msg.topic + payload = json.loads(msg.payload.decode("utf-8")) + parts = topic.split("/") + if len(parts) < 3 or parts[0] != "vesper": + return + + serial = parts[1] + topic_type = "/".join(parts[2:]) + + if self._loop and self._loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._process_message(serial, topic_type, payload, topic), + self._loop, + ) + except json.JSONDecodeError: + logger.warning(f"Invalid JSON on topic {msg.topic}") + except Exception as e: + logger.error(f"Error processing MQTT message: {e}") + + async def _process_message(self, serial: str, topic_type: str, + payload: dict, raw_topic: str): + from mqtt.logger import handle_message + await handle_message(serial, topic_type, payload) + + ws_data = { + "type": topic_type, + "device_serial": serial, + "payload": payload, + "topic": raw_topic, + } + await self._broadcast_ws(ws_data) + + async def _broadcast_ws(self, data: dict): + msg = json.dumps(data) + dead = set() + for ws in self._ws_subscribers: + try: + await ws.send_text(msg) + except Exception: + dead.add(ws) + self._ws_subscribers -= dead + + def add_ws_subscriber(self, websocket): + self._ws_subscribers.add(websocket) + + def remove_ws_subscriber(self, websocket): + self._ws_subscribers.discard(websocket) + + def publish_command(self, device_serial: str, cmd: str, + contents: dict) -> bool: + if not self._client or not self._connected: + return False + + topic = f"vesper/{device_serial}/control" + payload = json.dumps({"cmd": cmd, "contents": contents}) + result = self._client.publish(topic, payload, qos=1) + return result.rc == paho_mqtt.MQTT_ERR_SUCCESS + + +mqtt_manager = MqttManager() diff --git a/backend/mqtt/database.py b/backend/mqtt/database.py new file mode 100644 index 0000000..cfe5667 --- /dev/null +++ b/backend/mqtt/database.py @@ -0,0 +1,222 @@ +import aiosqlite +import asyncio +import json +import logging +from datetime import datetime, timedelta, timezone +from config import settings + +logger = logging.getLogger("mqtt.database") + +_db: aiosqlite.Connection | None = None + +SCHEMA_STATEMENTS = [ + """CREATE TABLE IF NOT EXISTS device_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + device_serial TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + device_timestamp INTEGER, + received_at TEXT NOT NULL DEFAULT (datetime('now')) + )""", + """CREATE TABLE IF NOT EXISTS heartbeats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + device_serial TEXT NOT NULL, + device_id TEXT, + firmware_version TEXT, + ip_address TEXT, + gateway TEXT, + uptime_ms INTEGER, + uptime_display TEXT, + received_at TEXT NOT NULL DEFAULT (datetime('now')) + )""", + """CREATE TABLE IF NOT EXISTS commands ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + device_serial TEXT NOT NULL, + command_name TEXT NOT NULL, + command_payload TEXT, + status TEXT NOT NULL DEFAULT 'pending', + response_payload TEXT, + sent_at TEXT NOT NULL DEFAULT (datetime('now')), + responded_at TEXT + )""", + "CREATE INDEX IF NOT EXISTS idx_logs_serial_time ON device_logs(device_serial, received_at)", + "CREATE INDEX IF NOT EXISTS idx_logs_level ON device_logs(level)", + "CREATE INDEX IF NOT EXISTS idx_heartbeats_serial_time ON heartbeats(device_serial, received_at)", + "CREATE INDEX IF NOT EXISTS idx_commands_serial_time ON commands(device_serial, sent_at)", + "CREATE INDEX IF NOT EXISTS idx_commands_status ON commands(status)", +] + + +async def init_db(): + global _db + _db = await aiosqlite.connect(settings.sqlite_db_path) + _db.row_factory = aiosqlite.Row + for stmt in SCHEMA_STATEMENTS: + await _db.execute(stmt) + await _db.commit() + logger.info(f"SQLite database initialized at {settings.sqlite_db_path}") + + +async def close_db(): + global _db + if _db: + await _db.close() + _db = None + + +async def get_db() -> aiosqlite.Connection: + if _db is None: + await init_db() + return _db + + +# --- Insert Operations --- + +async def insert_log(device_serial: str, level: str, message: str, + device_timestamp: int | None = None): + db = await get_db() + cursor = await db.execute( + "INSERT INTO device_logs (device_serial, level, message, device_timestamp) VALUES (?, ?, ?, ?)", + (device_serial, level, message, device_timestamp) + ) + await db.commit() + return cursor.lastrowid + + +async def insert_heartbeat(device_serial: str, device_id: str, + firmware_version: str, ip_address: str, + gateway: str, uptime_ms: int, uptime_display: str): + db = await get_db() + cursor = await db.execute( + """INSERT INTO heartbeats + (device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display) + ) + await db.commit() + return cursor.lastrowid + + +async def insert_command(device_serial: str, command_name: str, + command_payload: dict) -> int: + db = await get_db() + cursor = await db.execute( + "INSERT INTO commands (device_serial, command_name, command_payload) VALUES (?, ?, ?)", + (device_serial, command_name, json.dumps(command_payload)) + ) + await db.commit() + return cursor.lastrowid + + +async def update_command_response(command_id: int, status: str, + response_payload: dict | None = None): + db = await get_db() + await db.execute( + """UPDATE commands SET status = ?, response_payload = ?, + responded_at = datetime('now') WHERE id = ?""", + (status, json.dumps(response_payload) if response_payload else None, command_id) + ) + await db.commit() + + +# --- Query Operations --- + +async def get_logs(device_serial: str, level: str | None = None, + search: str | None = None, + limit: int = 100, offset: int = 0) -> tuple[list, int]: + db = await get_db() + where_clauses = ["device_serial = ?"] + params: list = [device_serial] + + if level: + where_clauses.append("level = ?") + params.append(level) + if search: + where_clauses.append("message LIKE ?") + params.append(f"%{search}%") + + where = " AND ".join(where_clauses) + + count_row = await db.execute_fetchall( + f"SELECT COUNT(*) as cnt FROM device_logs WHERE {where}", params + ) + total = count_row[0][0] + + rows = await db.execute_fetchall( + f"SELECT * FROM device_logs WHERE {where} ORDER BY received_at DESC LIMIT ? OFFSET ?", + params + [limit, offset] + ) + return [dict(r) for r in rows], total + + +async def get_heartbeats(device_serial: str, limit: int = 100, + offset: int = 0) -> tuple[list, int]: + db = await get_db() + count_row = await db.execute_fetchall( + "SELECT COUNT(*) FROM heartbeats WHERE device_serial = ?", (device_serial,) + ) + total = count_row[0][0] + rows = await db.execute_fetchall( + "SELECT * FROM heartbeats WHERE device_serial = ? ORDER BY received_at DESC LIMIT ? OFFSET ?", + (device_serial, limit, offset) + ) + return [dict(r) for r in rows], total + + +async def get_commands(device_serial: str, limit: int = 100, + offset: int = 0) -> tuple[list, int]: + db = await get_db() + count_row = await db.execute_fetchall( + "SELECT COUNT(*) FROM commands WHERE device_serial = ?", (device_serial,) + ) + total = count_row[0][0] + rows = await db.execute_fetchall( + "SELECT * FROM commands WHERE device_serial = ? ORDER BY sent_at DESC LIMIT ? OFFSET ?", + (device_serial, limit, offset) + ) + return [dict(r) for r in rows], total + + +async def get_latest_heartbeats() -> list: + db = await get_db() + rows = await db.execute_fetchall(""" + SELECT h.* FROM heartbeats h + INNER JOIN ( + SELECT device_serial, MAX(received_at) as max_time + FROM heartbeats GROUP BY device_serial + ) latest ON h.device_serial = latest.device_serial + AND h.received_at = latest.max_time + """) + return [dict(r) for r in rows] + + +async def get_pending_command(device_serial: str) -> dict | None: + db = await get_db() + rows = await db.execute_fetchall( + """SELECT * FROM commands WHERE device_serial = ? AND status = 'pending' + ORDER BY sent_at DESC LIMIT 1""", + (device_serial,) + ) + return dict(rows[0]) if rows else None + + +# --- Cleanup --- + +async def purge_old_data(retention_days: int | None = None): + days = retention_days or settings.mqtt_data_retention_days + cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() + db = await get_db() + await db.execute("DELETE FROM device_logs WHERE received_at < ?", (cutoff,)) + await db.execute("DELETE FROM heartbeats WHERE received_at < ?", (cutoff,)) + await db.execute("DELETE FROM commands WHERE sent_at < ?", (cutoff,)) + await db.commit() + logger.info(f"Purged MQTT data older than {days} days") + + +async def purge_loop(): + while True: + await asyncio.sleep(86400) + try: + await purge_old_data() + except Exception as e: + logger.error(f"Purge failed: {e}") diff --git a/backend/mqtt/logger.py b/backend/mqtt/logger.py index 432bce6..53f2019 100644 --- a/backend/mqtt/logger.py +++ b/backend/mqtt/logger.py @@ -1 +1,70 @@ -# TODO: Log storage and retrieval +import logging +from mqtt import database as db + +logger = logging.getLogger("mqtt.logger") + +LEVEL_MAP = { + "🟢 INFO": "INFO", + "🟡 WARN": "WARN", + "🔴 EROR": "ERROR", + "INFO": "INFO", + "WARN": "WARN", + "ERROR": "ERROR", + "EROR": "ERROR", +} + + +async def handle_message(serial: str, topic_type: str, payload: dict): + try: + if topic_type == "status/heartbeat": + await _handle_heartbeat(serial, payload) + elif topic_type == "logs": + await _handle_log(serial, payload) + elif topic_type == "data": + await _handle_data_response(serial, payload) + else: + logger.debug(f"Unhandled topic type: {topic_type} for {serial}") + except Exception as e: + logger.error(f"Error handling {topic_type} for {serial}: {e}") + + +async def _handle_heartbeat(serial: str, payload: dict): + inner = payload.get("payload", {}) + await db.insert_heartbeat( + device_serial=serial, + device_id=inner.get("device_id", ""), + firmware_version=inner.get("firmware_version", ""), + ip_address=inner.get("ip_address", ""), + gateway=inner.get("gateway", ""), + uptime_ms=inner.get("uptime_ms", 0), + uptime_display=inner.get("timestamp", ""), + ) + + +async def _handle_log(serial: str, payload: dict): + raw_level = payload.get("level", "INFO") + level = LEVEL_MAP.get(raw_level, "INFO") + message = payload.get("message", "") + device_timestamp = payload.get("timestamp") + + await db.insert_log( + device_serial=serial, + level=level, + message=message, + device_timestamp=device_timestamp, + ) + + +async def _handle_data_response(serial: str, payload: dict): + status = payload.get("status", "") + + pending = await db.get_pending_command(serial) + if pending: + cmd_status = "success" if status == "SUCCESS" else "error" + await db.update_command_response( + command_id=pending["id"], + status=cmd_status, + response_payload=payload, + ) + else: + logger.debug(f"Received data response for {serial} with no pending command") diff --git a/backend/mqtt/models.py b/backend/mqtt/models.py new file mode 100644 index 0000000..0336a26 --- /dev/null +++ b/backend/mqtt/models.py @@ -0,0 +1,86 @@ +from pydantic import BaseModel, Field +from typing import Optional, List, Any, Dict +from enum import Enum + + +class LogLevel(str, Enum): + INFO = "INFO" + WARN = "WARN" + ERROR = "ERROR" + + +class CommandStatus(str, Enum): + PENDING = "pending" + SUCCESS = "success" + ERROR = "error" + TIMEOUT = "timeout" + + +class MqttCommandRequest(BaseModel): + cmd: str = Field(..., description="Command name: ping, playback, file_manager, relay_setup, clock_setup, system_info, system") + contents: Dict[str, Any] = Field(default_factory=dict, description="Command payload contents") + + +class DeviceLogEntry(BaseModel): + id: int + device_serial: str + level: str + message: str + device_timestamp: Optional[int] = None + received_at: str + + +class HeartbeatEntry(BaseModel): + id: int + device_serial: str + device_id: Optional[str] = None + firmware_version: Optional[str] = None + ip_address: Optional[str] = None + gateway: Optional[str] = None + uptime_ms: Optional[int] = None + uptime_display: Optional[str] = None + received_at: str + + +class CommandEntry(BaseModel): + id: int + device_serial: str + command_name: str + command_payload: Optional[str] = None + status: str + response_payload: Optional[str] = None + sent_at: str + responded_at: Optional[str] = None + + +class DeviceMqttStatus(BaseModel): + device_serial: str + online: bool + last_heartbeat: Optional[HeartbeatEntry] = None + seconds_since_heartbeat: Optional[int] = None + + +class MqttStatusResponse(BaseModel): + devices: List[DeviceMqttStatus] + broker_connected: bool = False + + +class LogListResponse(BaseModel): + logs: List[DeviceLogEntry] + total: int + + +class HeartbeatListResponse(BaseModel): + heartbeats: List[HeartbeatEntry] + total: int + + +class CommandListResponse(BaseModel): + commands: List[CommandEntry] + total: int + + +class CommandSendResponse(BaseModel): + success: bool + command_id: int + message: str diff --git a/backend/mqtt/router.py b/backend/mqtt/router.py index b56e9d9..9f31786 100644 --- a/backend/mqtt/router.py +++ b/backend/mqtt/router.py @@ -1 +1,151 @@ -# TODO: Command endpoints + WebSocket for live data +from fastapi import APIRouter, Depends, Query, WebSocket, WebSocketDisconnect +from typing import Optional +from auth.models import TokenPayload +from auth.dependencies import require_device_access, require_viewer +from mqtt.models import ( + MqttCommandRequest, CommandSendResponse, MqttStatusResponse, + DeviceMqttStatus, LogListResponse, HeartbeatListResponse, + CommandListResponse, HeartbeatEntry, +) +from mqtt.client import mqtt_manager +from mqtt import database as db +from datetime import datetime, timezone + +router = APIRouter(prefix="/api/mqtt", tags=["mqtt"]) + + +@router.get("/status", response_model=MqttStatusResponse) +async def get_all_device_status( + _user: TokenPayload = Depends(require_viewer), +): + heartbeats = await db.get_latest_heartbeats() + now = datetime.now(timezone.utc) + devices = [] + for hb in heartbeats: + received_str = hb["received_at"] + try: + received = datetime.fromisoformat(received_str) + if received.tzinfo is None: + received = received.replace(tzinfo=timezone.utc) + seconds_ago = int((now - received).total_seconds()) + except (ValueError, TypeError): + seconds_ago = 9999 + + devices.append(DeviceMqttStatus( + device_serial=hb["device_serial"], + online=seconds_ago < 90, + last_heartbeat=HeartbeatEntry(**hb), + seconds_since_heartbeat=seconds_ago, + )) + return MqttStatusResponse( + devices=devices, + broker_connected=mqtt_manager.connected, + ) + + +@router.post("/command/{device_serial}", response_model=CommandSendResponse) +async def send_command( + device_serial: str, + body: MqttCommandRequest, + _user: TokenPayload = Depends(require_device_access), +): + command_id = await db.insert_command( + device_serial=device_serial, + command_name=body.cmd, + command_payload={"cmd": body.cmd, "contents": body.contents}, + ) + + success = mqtt_manager.publish_command( + device_serial=device_serial, + cmd=body.cmd, + contents=body.contents, + ) + + if not success: + await db.update_command_response( + command_id, "error", + {"error": "MQTT broker not connected"}, + ) + return CommandSendResponse( + success=False, command_id=command_id, + message="MQTT broker not connected", + ) + + return CommandSendResponse( + success=True, command_id=command_id, + message=f"Command '{body.cmd}' sent to {device_serial}", + ) + + +@router.get("/logs/{device_serial}", response_model=LogListResponse) +async def get_device_logs( + device_serial: str, + level: Optional[str] = Query(None, description="Filter: INFO, WARN, ERROR"), + search: Optional[str] = Query(None), + limit: int = Query(100, ge=1, le=1000), + offset: int = Query(0, ge=0), + _user: TokenPayload = Depends(require_viewer), +): + logs, total = await db.get_logs( + device_serial, level=level, search=search, + limit=limit, offset=offset, + ) + return LogListResponse(logs=logs, total=total) + + +@router.get("/heartbeats/{device_serial}", response_model=HeartbeatListResponse) +async def get_device_heartbeats( + device_serial: str, + limit: int = Query(100, ge=1, le=1000), + offset: int = Query(0, ge=0), + _user: TokenPayload = Depends(require_viewer), +): + heartbeats, total = await db.get_heartbeats( + device_serial, limit=limit, offset=offset, + ) + return HeartbeatListResponse(heartbeats=heartbeats, total=total) + + +@router.get("/commands/{device_serial}", response_model=CommandListResponse) +async def get_device_commands( + device_serial: str, + limit: int = Query(100, ge=1, le=1000), + offset: int = Query(0, ge=0), + _user: TokenPayload = Depends(require_viewer), +): + commands, total = await db.get_commands( + device_serial, limit=limit, offset=offset, + ) + return CommandListResponse(commands=commands, total=total) + + +@router.websocket("/ws") +async def mqtt_websocket(websocket: WebSocket): + """Live MQTT data stream. Auth via query param: ?token=JWT""" + token = websocket.query_params.get("token") + if not token: + await websocket.close(code=4001, reason="Missing token") + return + + try: + from auth.utils import decode_access_token + payload = decode_access_token(token) + role = payload.get("role", "") + allowed = {"superadmin", "device_manager", "viewer"} + if role not in allowed: + await websocket.close(code=4003, reason="Insufficient permissions") + return + except Exception: + await websocket.close(code=4001, reason="Invalid token") + return + + await websocket.accept() + mqtt_manager.add_ws_subscriber(websocket) + + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + mqtt_manager.remove_ws_subscriber(websocket) diff --git a/backend/requirements.txt b/backend/requirements.txt index aa9b63e..7ea60ec 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -7,4 +7,5 @@ paho-mqtt==2.1.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 python-multipart==0.0.20 -bcrypt==4.0.1 \ No newline at end of file +bcrypt==4.0.1 +aiosqlite==0.20.0 \ No newline at end of file diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 44672ef..8aa684a 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -12,6 +12,9 @@ import DeviceForm from "./devices/DeviceForm"; import UserList from "./users/UserList"; import UserDetail from "./users/UserDetail"; import UserForm from "./users/UserForm"; +import MqttDashboard from "./mqtt/MqttDashboard"; +import CommandPanel from "./mqtt/CommandPanel"; +import LogViewer from "./mqtt/LogViewer"; function ProtectedRoute({ children }) { const { user, loading } = useAuth(); @@ -69,9 +72,9 @@ export default function App() { } /> } /> } /> - {/* Phase 5+ routes: } /> - */} + } /> + } /> } /> diff --git a/frontend/src/layout/Sidebar.jsx b/frontend/src/layout/Sidebar.jsx index ab28c31..4d6d1b4 100644 --- a/frontend/src/layout/Sidebar.jsx +++ b/frontend/src/layout/Sidebar.jsx @@ -14,7 +14,15 @@ const navItems = [ }, { to: "/devices", label: "Devices", roles: ["superadmin", "device_manager", "viewer"] }, { to: "/users", label: "Users", roles: ["superadmin", "user_manager", "viewer"] }, - { to: "/mqtt", label: "MQTT", roles: ["superadmin", "device_manager", "viewer"] }, + { + label: "MQTT", + roles: ["superadmin", "device_manager", "viewer"], + children: [ + { to: "/mqtt", label: "Dashboard" }, + { to: "/mqtt/commands", label: "Commands" }, + { to: "/mqtt/logs", label: "Logs" }, + ], + }, ]; const linkClass = (isActive) => diff --git a/frontend/src/mqtt/CommandPanel.jsx b/frontend/src/mqtt/CommandPanel.jsx index 61bac87..a51b7ea 100644 --- a/frontend/src/mqtt/CommandPanel.jsx +++ b/frontend/src/mqtt/CommandPanel.jsx @@ -1 +1,350 @@ -// TODO: Send commands to devices +import { useState, useEffect } from "react"; +import { useSearchParams } from "react-router-dom"; +import api from "../api/client"; + +const QUICK_ACTIONS = [ + { label: "Ping", cmd: "ping", contents: {} }, + { label: "Report Status", cmd: "system_info", contents: { action: "report_status" } }, + { label: "Get Settings", cmd: "system_info", contents: { action: "get_full_settings" } }, + { label: "Network Info", cmd: "system_info", contents: { action: "network_info" } }, + { label: "Device Time", cmd: "system_info", contents: { action: "get_device_time" } }, + { label: "Clock Time", cmd: "system_info", contents: { action: "get_clock_time" } }, + { label: "Firmware Status", cmd: "system_info", contents: { action: "get_firmware_status" } }, + { label: "List Melodies", cmd: "file_manager", contents: { action: "list_melodies" } }, + { label: "Restart", cmd: "system", contents: { action: "restart" }, danger: true }, +]; + +const STATUS_STYLES = { + success: { bg: "var(--success-bg)", color: "var(--success-text)" }, + error: { bg: "var(--danger-bg)", color: "var(--danger-text)" }, + pending: { bg: "var(--badge-blue-bg)", color: "var(--badge-blue-text)" }, + timeout: { bg: "var(--danger-bg)", color: "var(--danger-text)" }, +}; + +export default function CommandPanel() { + const [searchParams] = useSearchParams(); + const [devices, setDevices] = useState([]); + const [selectedDevice, setSelectedDevice] = useState(searchParams.get("device") || ""); + const [customCmd, setCustomCmd] = useState(""); + const [customContents, setCustomContents] = useState("{}"); + const [commands, setCommands] = useState([]); + const [commandsTotal, setCommandsTotal] = useState(0); + const [loading, setLoading] = useState(false); + const [sending, setSending] = useState(false); + const [error, setError] = useState(""); + const [lastResponse, setLastResponse] = useState(null); + const [expandedCmd, setExpandedCmd] = useState(null); + + useEffect(() => { + api.get("/devices").then((data) => { + setDevices(data.devices || []); + if (!selectedDevice && data.devices?.length > 0) { + setSelectedDevice(data.devices[0].device_id || ""); + } + }).catch(() => {}); + }, []); + + useEffect(() => { + if (selectedDevice) fetchCommands(); + }, [selectedDevice]); + + const fetchCommands = async () => { + setLoading(true); + try { + const data = await api.get(`/mqtt/commands/${selectedDevice}?limit=50`); + setCommands(data.commands || []); + setCommandsTotal(data.total || 0); + } catch (err) { + setError(err.message); + } finally { + setLoading(false); + } + }; + + const sendCommand = async (cmd, contents) => { + if (!selectedDevice) { + setError("Select a device first"); + return; + } + setSending(true); + setError(""); + setLastResponse(null); + try { + const result = await api.post(`/mqtt/command/${selectedDevice}`, { cmd, contents }); + setLastResponse(result); + // Wait briefly then refresh commands to see response + setTimeout(fetchCommands, 2000); + } catch (err) { + setError(err.message); + } finally { + setSending(false); + } + }; + + const sendCustomCommand = () => { + if (!customCmd.trim()) { + setError("Enter a command name"); + return; + } + let parsed; + try { + parsed = JSON.parse(customContents); + } catch { + setError("Invalid JSON in contents"); + return; + } + sendCommand(customCmd.trim(), parsed); + }; + + const formatPayload = (jsonStr) => { + if (!jsonStr) return null; + try { + return JSON.stringify(JSON.parse(jsonStr), null, 2); + } catch { + return jsonStr; + } + }; + + return ( +
+

+ Command Panel +

+ + {/* Device Selector */} +
+ + +
+ + {error && ( +
+ {error} +
+ )} + + {/* Quick Actions */} +
+

+ Quick Actions +

+
+ {QUICK_ACTIONS.map((action) => ( + + ))} +
+
+ + {/* Custom Command */} +
+

+ Custom Command +

+
+
+ + setCustomCmd(e.target.value)} + placeholder="e.g. system_info" + className="w-full max-w-md px-3 py-2 rounded-md text-sm border" + /> +
+
+ +