import aiosqlite import asyncio import json import logging from datetime import datetime, timedelta, timezone from config import settings logger = logging.getLogger("mqtt.database") _db: aiosqlite.Connection | None = None SCHEMA_STATEMENTS = [ """CREATE TABLE IF NOT EXISTS device_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_serial TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, device_timestamp INTEGER, received_at TEXT NOT NULL DEFAULT (datetime('now')) )""", """CREATE TABLE IF NOT EXISTS heartbeats ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_serial TEXT NOT NULL, device_id TEXT, firmware_version TEXT, ip_address TEXT, gateway TEXT, uptime_ms INTEGER, uptime_display TEXT, received_at TEXT NOT NULL DEFAULT (datetime('now')) )""", """CREATE TABLE IF NOT EXISTS commands ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_serial TEXT NOT NULL, command_name TEXT NOT NULL, command_payload TEXT, status TEXT NOT NULL DEFAULT 'pending', response_payload TEXT, sent_at TEXT NOT NULL DEFAULT (datetime('now')), responded_at TEXT )""", "CREATE INDEX IF NOT EXISTS idx_logs_serial_time ON device_logs(device_serial, received_at)", "CREATE INDEX IF NOT EXISTS idx_logs_level ON device_logs(level)", "CREATE INDEX IF NOT EXISTS idx_heartbeats_serial_time ON heartbeats(device_serial, received_at)", "CREATE INDEX IF NOT EXISTS idx_commands_serial_time ON commands(device_serial, sent_at)", "CREATE INDEX IF NOT EXISTS idx_commands_status ON commands(status)", # Melody drafts table """CREATE TABLE IF NOT EXISTS melody_drafts ( id TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'draft', data TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) )""", "CREATE INDEX IF NOT EXISTS idx_melody_drafts_status ON melody_drafts(status)", # Built melodies table (local melody builder) """CREATE TABLE IF NOT EXISTS built_melodies ( id TEXT PRIMARY KEY, name TEXT NOT NULL, pid TEXT NOT NULL, steps TEXT NOT NULL, binary_path TEXT, progmem_code TEXT, assigned_melody_ids TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) )""", # Manufacturing audit log """CREATE TABLE IF NOT EXISTS mfg_audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL DEFAULT (datetime('now')), admin_user TEXT NOT NULL, action TEXT NOT NULL, serial_number TEXT, detail TEXT )""", "CREATE INDEX IF NOT EXISTS idx_mfg_audit_time ON mfg_audit_log(timestamp)", "CREATE INDEX IF NOT EXISTS idx_mfg_audit_action ON mfg_audit_log(action)", # Active device alerts (current state, not history) """CREATE TABLE IF NOT EXISTS device_alerts ( device_serial TEXT NOT NULL, subsystem TEXT NOT NULL, state TEXT NOT NULL, message TEXT, updated_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (device_serial, subsystem) )""", "CREATE INDEX IF NOT EXISTS idx_device_alerts_serial ON device_alerts(device_serial)", # CRM communications log """CREATE TABLE IF NOT EXISTS crm_comms_log ( id TEXT PRIMARY KEY, customer_id TEXT, type TEXT NOT NULL, mail_account TEXT, direction TEXT NOT NULL, subject TEXT, body TEXT, body_html TEXT, attachments TEXT NOT NULL DEFAULT '[]', ext_message_id TEXT, from_addr TEXT, to_addrs TEXT, logged_by TEXT, occurred_at TEXT NOT NULL, created_at TEXT NOT NULL )""", "CREATE INDEX IF NOT EXISTS idx_crm_comms_customer ON crm_comms_log(customer_id, occurred_at)", # CRM media references """CREATE TABLE IF NOT EXISTS crm_media ( id TEXT PRIMARY KEY, customer_id TEXT, order_id TEXT, filename TEXT NOT NULL, nextcloud_path TEXT NOT NULL, mime_type TEXT, direction TEXT, tags TEXT NOT NULL DEFAULT '[]', uploaded_by TEXT, created_at TEXT NOT NULL )""", "CREATE INDEX IF NOT EXISTS idx_crm_media_customer ON crm_media(customer_id)", "CREATE INDEX IF NOT EXISTS idx_crm_media_order ON crm_media(order_id)", # CRM sync state (last email sync timestamp, etc.) """CREATE TABLE IF NOT EXISTS crm_sync_state ( key TEXT PRIMARY KEY, value TEXT )""", # CRM Quotations """CREATE TABLE IF NOT EXISTS crm_quotations ( id TEXT PRIMARY KEY, quotation_number TEXT UNIQUE NOT NULL, title TEXT, subtitle TEXT, customer_id TEXT NOT NULL, language TEXT NOT NULL DEFAULT 'en', status TEXT NOT NULL DEFAULT 'draft', order_type TEXT, shipping_method TEXT, estimated_shipping_date TEXT, global_discount_label TEXT, global_discount_percent REAL NOT NULL DEFAULT 0, vat_percent REAL NOT NULL DEFAULT 24, shipping_cost REAL NOT NULL DEFAULT 0, shipping_cost_discount REAL NOT NULL DEFAULT 0, install_cost REAL NOT NULL DEFAULT 0, install_cost_discount REAL NOT NULL DEFAULT 0, extras_label TEXT, extras_cost REAL NOT NULL DEFAULT 0, comments TEXT NOT NULL DEFAULT '[]', subtotal_before_discount REAL NOT NULL DEFAULT 0, global_discount_amount REAL NOT NULL DEFAULT 0, new_subtotal REAL NOT NULL DEFAULT 0, vat_amount REAL NOT NULL DEFAULT 0, final_total REAL NOT NULL DEFAULT 0, nextcloud_pdf_path TEXT, nextcloud_pdf_url TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL )""", """CREATE TABLE IF NOT EXISTS crm_quotation_items ( id TEXT PRIMARY KEY, quotation_id TEXT NOT NULL, product_id TEXT, description TEXT, unit_type TEXT NOT NULL DEFAULT 'pcs', unit_cost REAL NOT NULL DEFAULT 0, discount_percent REAL NOT NULL DEFAULT 0, quantity REAL NOT NULL DEFAULT 1, line_total REAL NOT NULL DEFAULT 0, sort_order INTEGER NOT NULL DEFAULT 0, FOREIGN KEY (quotation_id) REFERENCES crm_quotations(id) )""", "CREATE INDEX IF NOT EXISTS idx_crm_quotations_customer ON crm_quotations(customer_id)", "CREATE INDEX IF NOT EXISTS idx_crm_quotation_items_quotation ON crm_quotation_items(quotation_id, sort_order)", ] async def init_db(): global _db _db = await aiosqlite.connect(settings.sqlite_db_path) _db.row_factory = aiosqlite.Row for stmt in SCHEMA_STATEMENTS: await _db.execute(stmt) await _db.commit() # Migrations: add columns that may not exist in older DBs _migrations = [ "ALTER TABLE crm_comms_log ADD COLUMN body_html TEXT", "ALTER TABLE crm_comms_log ADD COLUMN mail_account TEXT", "ALTER TABLE crm_comms_log ADD COLUMN from_addr TEXT", "ALTER TABLE crm_comms_log ADD COLUMN to_addrs TEXT", "ALTER TABLE crm_comms_log ADD COLUMN is_important INTEGER NOT NULL DEFAULT 0", "ALTER TABLE crm_comms_log ADD COLUMN is_read INTEGER NOT NULL DEFAULT 0", "ALTER TABLE crm_quotation_items ADD COLUMN vat_percent REAL NOT NULL DEFAULT 24", "ALTER TABLE crm_quotations ADD COLUMN quick_notes TEXT NOT NULL DEFAULT '{}'", "ALTER TABLE crm_quotations ADD COLUMN client_org TEXT", "ALTER TABLE crm_quotations ADD COLUMN client_name TEXT", "ALTER TABLE crm_quotations ADD COLUMN client_location TEXT", "ALTER TABLE crm_quotations ADD COLUMN client_phone TEXT", "ALTER TABLE crm_quotations ADD COLUMN client_email TEXT", ] for m in _migrations: try: await _db.execute(m) await _db.commit() except Exception: pass # column already exists # Migration: drop NOT NULL on crm_comms_log.customer_id if it exists. # SQLite doesn't support ALTER COLUMN, so we check via table_info and # rebuild the table if needed. rows = await _db.execute_fetchall("PRAGMA table_info(crm_comms_log)") for row in rows: # row: (cid, name, type, notnull, dflt_value, pk) if row[1] == "customer_id" and row[3] == 1: # notnull=1 logger.info("Migrating crm_comms_log: removing NOT NULL from customer_id") await _db.execute("ALTER TABLE crm_comms_log RENAME TO crm_comms_log_old") await _db.execute("""CREATE TABLE crm_comms_log ( id TEXT PRIMARY KEY, customer_id TEXT, type TEXT NOT NULL, mail_account TEXT, direction TEXT NOT NULL, subject TEXT, body TEXT, body_html TEXT, attachments TEXT NOT NULL DEFAULT '[]', ext_message_id TEXT, from_addr TEXT, to_addrs TEXT, logged_by TEXT, occurred_at TEXT NOT NULL, created_at TEXT NOT NULL )""") await _db.execute("""INSERT INTO crm_comms_log SELECT id, customer_id, type, NULL, direction, subject, body, body_html, attachments, ext_message_id, from_addr, to_addrs, logged_by, occurred_at, created_at FROM crm_comms_log_old""") await _db.execute("DROP TABLE crm_comms_log_old") await _db.execute("CREATE INDEX IF NOT EXISTS idx_crm_comms_customer ON crm_comms_log(customer_id, occurred_at)") await _db.commit() logger.info("Migration complete: crm_comms_log.customer_id is now nullable") break logger.info(f"SQLite database initialized at {settings.sqlite_db_path}") async def close_db(): global _db if _db: await _db.close() _db = None async def get_db() -> aiosqlite.Connection: if _db is None: await init_db() return _db # --- Insert Operations --- async def insert_log(device_serial: str, level: str, message: str, device_timestamp: int | None = None): db = await get_db() cursor = await db.execute( "INSERT INTO device_logs (device_serial, level, message, device_timestamp) VALUES (?, ?, ?, ?)", (device_serial, level, message, device_timestamp) ) await db.commit() return cursor.lastrowid async def insert_heartbeat(device_serial: str, device_id: str, firmware_version: str, ip_address: str, gateway: str, uptime_ms: int, uptime_display: str): db = await get_db() cursor = await db.execute( """INSERT INTO heartbeats (device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display) VALUES (?, ?, ?, ?, ?, ?, ?)""", (device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display) ) await db.commit() return cursor.lastrowid async def insert_command(device_serial: str, command_name: str, command_payload: dict) -> int: db = await get_db() cursor = await db.execute( "INSERT INTO commands (device_serial, command_name, command_payload) VALUES (?, ?, ?)", (device_serial, command_name, json.dumps(command_payload)) ) await db.commit() return cursor.lastrowid async def update_command_response(command_id: int, status: str, response_payload: dict | None = None): db = await get_db() await db.execute( """UPDATE commands SET status = ?, response_payload = ?, responded_at = datetime('now') WHERE id = ?""", (status, json.dumps(response_payload) if response_payload else None, command_id) ) await db.commit() # --- Query Operations --- async def get_logs(device_serial: str, level: str | None = None, search: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list, int]: db = await get_db() where_clauses = ["device_serial = ?"] params: list = [device_serial] if level: where_clauses.append("level = ?") params.append(level) if search: where_clauses.append("message LIKE ?") params.append(f"%{search}%") where = " AND ".join(where_clauses) count_row = await db.execute_fetchall( f"SELECT COUNT(*) as cnt FROM device_logs WHERE {where}", params ) total = count_row[0][0] rows = await db.execute_fetchall( f"SELECT * FROM device_logs WHERE {where} ORDER BY received_at DESC LIMIT ? OFFSET ?", params + [limit, offset] ) return [dict(r) for r in rows], total async def get_heartbeats(device_serial: str, limit: int = 100, offset: int = 0) -> tuple[list, int]: db = await get_db() count_row = await db.execute_fetchall( "SELECT COUNT(*) FROM heartbeats WHERE device_serial = ?", (device_serial,) ) total = count_row[0][0] rows = await db.execute_fetchall( "SELECT * FROM heartbeats WHERE device_serial = ? ORDER BY received_at DESC LIMIT ? OFFSET ?", (device_serial, limit, offset) ) return [dict(r) for r in rows], total async def get_commands(device_serial: str, limit: int = 100, offset: int = 0) -> tuple[list, int]: db = await get_db() count_row = await db.execute_fetchall( "SELECT COUNT(*) FROM commands WHERE device_serial = ?", (device_serial,) ) total = count_row[0][0] rows = await db.execute_fetchall( "SELECT * FROM commands WHERE device_serial = ? ORDER BY sent_at DESC LIMIT ? OFFSET ?", (device_serial, limit, offset) ) return [dict(r) for r in rows], total async def get_latest_heartbeats() -> list: db = await get_db() rows = await db.execute_fetchall(""" SELECT h.* FROM heartbeats h INNER JOIN ( SELECT device_serial, MAX(received_at) as max_time FROM heartbeats GROUP BY device_serial ) latest ON h.device_serial = latest.device_serial AND h.received_at = latest.max_time """) return [dict(r) for r in rows] async def get_pending_command(device_serial: str) -> dict | None: db = await get_db() rows = await db.execute_fetchall( """SELECT * FROM commands WHERE device_serial = ? AND status = 'pending' ORDER BY sent_at DESC LIMIT 1""", (device_serial,) ) return dict(rows[0]) if rows else None # --- Cleanup --- async def purge_old_data(retention_days: int | None = None): days = retention_days or settings.mqtt_data_retention_days cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() db = await get_db() await db.execute("DELETE FROM device_logs WHERE received_at < ?", (cutoff,)) await db.execute("DELETE FROM heartbeats WHERE received_at < ?", (cutoff,)) await db.execute("DELETE FROM commands WHERE sent_at < ?", (cutoff,)) await db.commit() logger.info(f"Purged MQTT data older than {days} days") async def purge_loop(): while True: await asyncio.sleep(86400) try: await purge_old_data() except Exception as e: logger.error(f"Purge failed: {e}") # --- Device Alerts --- async def upsert_alert(device_serial: str, subsystem: str, state: str, message: str | None = None): db = await get_db() await db.execute( """INSERT INTO device_alerts (device_serial, subsystem, state, message, updated_at) VALUES (?, ?, ?, ?, datetime('now')) ON CONFLICT(device_serial, subsystem) DO UPDATE SET state=excluded.state, message=excluded.message, updated_at=excluded.updated_at""", (device_serial, subsystem, state, message), ) await db.commit() async def delete_alert(device_serial: str, subsystem: str): db = await get_db() await db.execute( "DELETE FROM device_alerts WHERE device_serial = ? AND subsystem = ?", (device_serial, subsystem), ) await db.commit() async def get_alerts(device_serial: str) -> list: db = await get_db() rows = await db.execute_fetchall( "SELECT * FROM device_alerts WHERE device_serial = ? ORDER BY updated_at DESC", (device_serial,), ) return [dict(r) for r in rows]