454 lines
18 KiB
Python
454 lines
18 KiB
Python
import aiosqlite
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from config import settings
|
|
|
|
logger = logging.getLogger("database")
|
|
|
|
_db: aiosqlite.Connection | None = None
|
|
|
|
SCHEMA_STATEMENTS = [
|
|
"""CREATE TABLE IF NOT EXISTS device_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
device_serial TEXT NOT NULL,
|
|
level TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
device_timestamp INTEGER,
|
|
received_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
)""",
|
|
"""CREATE TABLE IF NOT EXISTS heartbeats (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
device_serial TEXT NOT NULL,
|
|
device_id TEXT,
|
|
firmware_version TEXT,
|
|
ip_address TEXT,
|
|
gateway TEXT,
|
|
uptime_ms INTEGER,
|
|
uptime_display TEXT,
|
|
received_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
)""",
|
|
"""CREATE TABLE IF NOT EXISTS commands (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
device_serial TEXT NOT NULL,
|
|
command_name TEXT NOT NULL,
|
|
command_payload TEXT,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
response_payload TEXT,
|
|
sent_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
responded_at TEXT
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_logs_serial_time ON device_logs(device_serial, received_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_logs_level ON device_logs(level)",
|
|
"CREATE INDEX IF NOT EXISTS idx_heartbeats_serial_time ON heartbeats(device_serial, received_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_commands_serial_time ON commands(device_serial, sent_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_commands_status ON commands(status)",
|
|
# Melody drafts table
|
|
"""CREATE TABLE IF NOT EXISTS melody_drafts (
|
|
id TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL DEFAULT 'draft',
|
|
data TEXT NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_melody_drafts_status ON melody_drafts(status)",
|
|
# Built melodies table (local melody builder)
|
|
"""CREATE TABLE IF NOT EXISTS built_melodies (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
pid TEXT NOT NULL,
|
|
steps TEXT NOT NULL,
|
|
binary_path TEXT,
|
|
progmem_code TEXT,
|
|
assigned_melody_ids TEXT NOT NULL DEFAULT '[]',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
)""",
|
|
# Manufacturing audit log
|
|
"""CREATE TABLE IF NOT EXISTS mfg_audit_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
admin_user TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
serial_number TEXT,
|
|
detail TEXT
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_mfg_audit_time ON mfg_audit_log(timestamp)",
|
|
"CREATE INDEX IF NOT EXISTS idx_mfg_audit_action ON mfg_audit_log(action)",
|
|
# Active device alerts (current state, not history)
|
|
"""CREATE TABLE IF NOT EXISTS device_alerts (
|
|
device_serial TEXT NOT NULL,
|
|
subsystem TEXT NOT NULL,
|
|
state TEXT NOT NULL,
|
|
message TEXT,
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
PRIMARY KEY (device_serial, subsystem)
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_device_alerts_serial ON device_alerts(device_serial)",
|
|
# CRM communications log
|
|
"""CREATE TABLE IF NOT EXISTS crm_comms_log (
|
|
id TEXT PRIMARY KEY,
|
|
customer_id TEXT,
|
|
type TEXT NOT NULL,
|
|
mail_account TEXT,
|
|
direction TEXT NOT NULL,
|
|
subject TEXT,
|
|
body TEXT,
|
|
body_html TEXT,
|
|
attachments TEXT NOT NULL DEFAULT '[]',
|
|
ext_message_id TEXT,
|
|
from_addr TEXT,
|
|
to_addrs TEXT,
|
|
logged_by TEXT,
|
|
occurred_at TEXT NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_comms_customer ON crm_comms_log(customer_id, occurred_at)",
|
|
# CRM media references
|
|
"""CREATE TABLE IF NOT EXISTS crm_media (
|
|
id TEXT PRIMARY KEY,
|
|
customer_id TEXT,
|
|
order_id TEXT,
|
|
filename TEXT NOT NULL,
|
|
nextcloud_path TEXT NOT NULL,
|
|
mime_type TEXT,
|
|
direction TEXT,
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
uploaded_by TEXT,
|
|
created_at TEXT NOT NULL
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_media_customer ON crm_media(customer_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_media_order ON crm_media(order_id)",
|
|
# CRM sync state (last email sync timestamp, etc.)
|
|
"""CREATE TABLE IF NOT EXISTS crm_sync_state (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT
|
|
)""",
|
|
# CRM Quotations
|
|
"""CREATE TABLE IF NOT EXISTS crm_quotations (
|
|
id TEXT PRIMARY KEY,
|
|
quotation_number TEXT UNIQUE NOT NULL,
|
|
title TEXT,
|
|
subtitle TEXT,
|
|
customer_id TEXT NOT NULL,
|
|
language TEXT NOT NULL DEFAULT 'en',
|
|
status TEXT NOT NULL DEFAULT 'draft',
|
|
order_type TEXT,
|
|
shipping_method TEXT,
|
|
estimated_shipping_date TEXT,
|
|
global_discount_label TEXT,
|
|
global_discount_percent REAL NOT NULL DEFAULT 0,
|
|
vat_percent REAL NOT NULL DEFAULT 24,
|
|
shipping_cost REAL NOT NULL DEFAULT 0,
|
|
shipping_cost_discount REAL NOT NULL DEFAULT 0,
|
|
install_cost REAL NOT NULL DEFAULT 0,
|
|
install_cost_discount REAL NOT NULL DEFAULT 0,
|
|
extras_label TEXT,
|
|
extras_cost REAL NOT NULL DEFAULT 0,
|
|
comments TEXT NOT NULL DEFAULT '[]',
|
|
subtotal_before_discount REAL NOT NULL DEFAULT 0,
|
|
global_discount_amount REAL NOT NULL DEFAULT 0,
|
|
new_subtotal REAL NOT NULL DEFAULT 0,
|
|
vat_amount REAL NOT NULL DEFAULT 0,
|
|
final_total REAL NOT NULL DEFAULT 0,
|
|
nextcloud_pdf_path TEXT,
|
|
nextcloud_pdf_url TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)""",
|
|
"""CREATE TABLE IF NOT EXISTS crm_quotation_items (
|
|
id TEXT PRIMARY KEY,
|
|
quotation_id TEXT NOT NULL,
|
|
product_id TEXT,
|
|
description TEXT,
|
|
description_en TEXT,
|
|
description_gr TEXT,
|
|
unit_type TEXT NOT NULL DEFAULT 'pcs',
|
|
unit_cost REAL NOT NULL DEFAULT 0,
|
|
discount_percent REAL NOT NULL DEFAULT 0,
|
|
quantity REAL NOT NULL DEFAULT 1,
|
|
line_total REAL NOT NULL DEFAULT 0,
|
|
sort_order INTEGER NOT NULL DEFAULT 0,
|
|
FOREIGN KEY (quotation_id) REFERENCES crm_quotations(id)
|
|
)""",
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_quotations_customer ON crm_quotations(customer_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_crm_quotation_items_quotation ON crm_quotation_items(quotation_id, sort_order)",
|
|
]
|
|
|
|
|
|
async def init_db():
|
|
global _db
|
|
os.makedirs(os.path.dirname(os.path.abspath(settings.sqlite_db_path)), exist_ok=True)
|
|
_db = await aiosqlite.connect(settings.sqlite_db_path)
|
|
_db.row_factory = aiosqlite.Row
|
|
for stmt in SCHEMA_STATEMENTS:
|
|
await _db.execute(stmt)
|
|
await _db.commit()
|
|
# Migrations: add columns that may not exist in older DBs
|
|
_migrations = [
|
|
"ALTER TABLE crm_comms_log ADD COLUMN body_html TEXT",
|
|
"ALTER TABLE crm_comms_log ADD COLUMN mail_account TEXT",
|
|
"ALTER TABLE crm_comms_log ADD COLUMN from_addr TEXT",
|
|
"ALTER TABLE crm_comms_log ADD COLUMN to_addrs TEXT",
|
|
"ALTER TABLE crm_comms_log ADD COLUMN is_important INTEGER NOT NULL DEFAULT 0",
|
|
"ALTER TABLE crm_comms_log ADD COLUMN is_read INTEGER NOT NULL DEFAULT 0",
|
|
"ALTER TABLE crm_quotation_items ADD COLUMN vat_percent REAL NOT NULL DEFAULT 24",
|
|
"ALTER TABLE crm_quotations ADD COLUMN quick_notes TEXT NOT NULL DEFAULT '{}'",
|
|
"ALTER TABLE crm_quotations ADD COLUMN client_org TEXT",
|
|
"ALTER TABLE crm_quotations ADD COLUMN client_name TEXT",
|
|
"ALTER TABLE crm_quotations ADD COLUMN client_location TEXT",
|
|
"ALTER TABLE crm_quotations ADD COLUMN client_phone TEXT",
|
|
"ALTER TABLE crm_quotations ADD COLUMN client_email TEXT",
|
|
"ALTER TABLE crm_quotations ADD COLUMN is_legacy INTEGER NOT NULL DEFAULT 0",
|
|
"ALTER TABLE crm_quotations ADD COLUMN legacy_date TEXT",
|
|
"ALTER TABLE crm_quotations ADD COLUMN legacy_pdf_path TEXT",
|
|
"ALTER TABLE crm_media ADD COLUMN thumbnail_path TEXT",
|
|
"ALTER TABLE crm_quotation_items ADD COLUMN description_en TEXT",
|
|
"ALTER TABLE crm_quotation_items ADD COLUMN description_gr TEXT",
|
|
]
|
|
for m in _migrations:
|
|
try:
|
|
await _db.execute(m)
|
|
await _db.commit()
|
|
except Exception:
|
|
pass # column already exists
|
|
|
|
# Migration: drop NOT NULL on crm_comms_log.customer_id if it exists.
|
|
# SQLite doesn't support ALTER COLUMN, so we check via table_info and
|
|
# rebuild the table if needed.
|
|
rows = await _db.execute_fetchall("PRAGMA table_info(crm_comms_log)")
|
|
for row in rows:
|
|
# row: (cid, name, type, notnull, dflt_value, pk)
|
|
if row[1] == "customer_id" and row[3] == 1: # notnull=1
|
|
logger.info("Migrating crm_comms_log: removing NOT NULL from customer_id")
|
|
await _db.execute("ALTER TABLE crm_comms_log RENAME TO crm_comms_log_old")
|
|
await _db.execute("""CREATE TABLE crm_comms_log (
|
|
id TEXT PRIMARY KEY,
|
|
customer_id TEXT,
|
|
type TEXT NOT NULL,
|
|
mail_account TEXT,
|
|
direction TEXT NOT NULL,
|
|
subject TEXT,
|
|
body TEXT,
|
|
body_html TEXT,
|
|
attachments TEXT NOT NULL DEFAULT '[]',
|
|
ext_message_id TEXT,
|
|
from_addr TEXT,
|
|
to_addrs TEXT,
|
|
logged_by TEXT,
|
|
occurred_at TEXT NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
)""")
|
|
await _db.execute("""INSERT INTO crm_comms_log
|
|
SELECT id, customer_id, type, NULL, direction, subject, body, body_html,
|
|
attachments, ext_message_id, from_addr, to_addrs, logged_by,
|
|
occurred_at, created_at
|
|
FROM crm_comms_log_old""")
|
|
await _db.execute("DROP TABLE crm_comms_log_old")
|
|
await _db.execute("CREATE INDEX IF NOT EXISTS idx_crm_comms_customer ON crm_comms_log(customer_id, occurred_at)")
|
|
await _db.commit()
|
|
logger.info("Migration complete: crm_comms_log.customer_id is now nullable")
|
|
break
|
|
logger.info(f"SQLite database initialized at {settings.sqlite_db_path}")
|
|
|
|
|
|
async def close_db():
|
|
global _db
|
|
if _db:
|
|
await _db.close()
|
|
_db = None
|
|
|
|
|
|
async def get_db() -> aiosqlite.Connection:
|
|
if _db is None:
|
|
await init_db()
|
|
return _db
|
|
|
|
|
|
# --- Insert Operations ---
|
|
|
|
async def insert_log(device_serial: str, level: str, message: str,
|
|
device_timestamp: int | None = None):
|
|
db = await get_db()
|
|
cursor = await db.execute(
|
|
"INSERT INTO device_logs (device_serial, level, message, device_timestamp) VALUES (?, ?, ?, ?)",
|
|
(device_serial, level, message, device_timestamp)
|
|
)
|
|
await db.commit()
|
|
return cursor.lastrowid
|
|
|
|
|
|
async def insert_heartbeat(device_serial: str, device_id: str,
|
|
firmware_version: str, ip_address: str,
|
|
gateway: str, uptime_ms: int, uptime_display: str):
|
|
db = await get_db()
|
|
cursor = await db.execute(
|
|
"""INSERT INTO heartbeats
|
|
(device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(device_serial, device_id, firmware_version, ip_address, gateway, uptime_ms, uptime_display)
|
|
)
|
|
await db.commit()
|
|
return cursor.lastrowid
|
|
|
|
|
|
async def insert_command(device_serial: str, command_name: str,
|
|
command_payload: dict) -> int:
|
|
db = await get_db()
|
|
cursor = await db.execute(
|
|
"INSERT INTO commands (device_serial, command_name, command_payload) VALUES (?, ?, ?)",
|
|
(device_serial, command_name, json.dumps(command_payload))
|
|
)
|
|
await db.commit()
|
|
return cursor.lastrowid
|
|
|
|
|
|
async def update_command_response(command_id: int, status: str,
|
|
response_payload: dict | None = None):
|
|
db = await get_db()
|
|
await db.execute(
|
|
"""UPDATE commands SET status = ?, response_payload = ?,
|
|
responded_at = datetime('now') WHERE id = ?""",
|
|
(status, json.dumps(response_payload) if response_payload else None, command_id)
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
# --- Query Operations ---
|
|
|
|
async def get_logs(device_serial: str, level: str | None = None,
|
|
search: str | None = None,
|
|
limit: int = 100, offset: int = 0) -> tuple[list, int]:
|
|
db = await get_db()
|
|
where_clauses = ["device_serial = ?"]
|
|
params: list = [device_serial]
|
|
|
|
if level:
|
|
where_clauses.append("level = ?")
|
|
params.append(level)
|
|
if search:
|
|
where_clauses.append("message LIKE ?")
|
|
params.append(f"%{search}%")
|
|
|
|
where = " AND ".join(where_clauses)
|
|
|
|
count_row = await db.execute_fetchall(
|
|
f"SELECT COUNT(*) as cnt FROM device_logs WHERE {where}", params
|
|
)
|
|
total = count_row[0][0]
|
|
|
|
rows = await db.execute_fetchall(
|
|
f"SELECT * FROM device_logs WHERE {where} ORDER BY received_at DESC LIMIT ? OFFSET ?",
|
|
params + [limit, offset]
|
|
)
|
|
return [dict(r) for r in rows], total
|
|
|
|
|
|
async def get_heartbeats(device_serial: str, limit: int = 100,
|
|
offset: int = 0) -> tuple[list, int]:
|
|
db = await get_db()
|
|
count_row = await db.execute_fetchall(
|
|
"SELECT COUNT(*) FROM heartbeats WHERE device_serial = ?", (device_serial,)
|
|
)
|
|
total = count_row[0][0]
|
|
rows = await db.execute_fetchall(
|
|
"SELECT * FROM heartbeats WHERE device_serial = ? ORDER BY received_at DESC LIMIT ? OFFSET ?",
|
|
(device_serial, limit, offset)
|
|
)
|
|
return [dict(r) for r in rows], total
|
|
|
|
|
|
async def get_commands(device_serial: str, limit: int = 100,
|
|
offset: int = 0) -> tuple[list, int]:
|
|
db = await get_db()
|
|
count_row = await db.execute_fetchall(
|
|
"SELECT COUNT(*) FROM commands WHERE device_serial = ?", (device_serial,)
|
|
)
|
|
total = count_row[0][0]
|
|
rows = await db.execute_fetchall(
|
|
"SELECT * FROM commands WHERE device_serial = ? ORDER BY sent_at DESC LIMIT ? OFFSET ?",
|
|
(device_serial, limit, offset)
|
|
)
|
|
return [dict(r) for r in rows], total
|
|
|
|
|
|
async def get_latest_heartbeats() -> list:
|
|
db = await get_db()
|
|
rows = await db.execute_fetchall("""
|
|
SELECT h.* FROM heartbeats h
|
|
INNER JOIN (
|
|
SELECT device_serial, MAX(received_at) as max_time
|
|
FROM heartbeats GROUP BY device_serial
|
|
) latest ON h.device_serial = latest.device_serial
|
|
AND h.received_at = latest.max_time
|
|
""")
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
async def get_pending_command(device_serial: str) -> dict | None:
|
|
db = await get_db()
|
|
rows = await db.execute_fetchall(
|
|
"""SELECT * FROM commands WHERE device_serial = ? AND status = 'pending'
|
|
ORDER BY sent_at DESC LIMIT 1""",
|
|
(device_serial,)
|
|
)
|
|
return dict(rows[0]) if rows else None
|
|
|
|
|
|
# --- Cleanup ---
|
|
|
|
async def purge_old_data(retention_days: int | None = None):
|
|
days = retention_days or settings.mqtt_data_retention_days
|
|
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
|
|
db = await get_db()
|
|
await db.execute("DELETE FROM device_logs WHERE received_at < ?", (cutoff,))
|
|
await db.execute("DELETE FROM heartbeats WHERE received_at < ?", (cutoff,))
|
|
await db.execute("DELETE FROM commands WHERE sent_at < ?", (cutoff,))
|
|
await db.commit()
|
|
logger.info(f"Purged MQTT data older than {days} days")
|
|
|
|
|
|
async def purge_loop():
|
|
while True:
|
|
await asyncio.sleep(86400)
|
|
try:
|
|
await purge_old_data()
|
|
except Exception as e:
|
|
logger.error(f"Purge failed: {e}")
|
|
|
|
|
|
# --- Device Alerts ---
|
|
|
|
async def upsert_alert(device_serial: str, subsystem: str, state: str,
|
|
message: str | None = None):
|
|
db = await get_db()
|
|
await db.execute(
|
|
"""INSERT INTO device_alerts (device_serial, subsystem, state, message, updated_at)
|
|
VALUES (?, ?, ?, ?, datetime('now'))
|
|
ON CONFLICT(device_serial, subsystem)
|
|
DO UPDATE SET state=excluded.state, message=excluded.message,
|
|
updated_at=excluded.updated_at""",
|
|
(device_serial, subsystem, state, message),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def delete_alert(device_serial: str, subsystem: str):
|
|
db = await get_db()
|
|
await db.execute(
|
|
"DELETE FROM device_alerts WHERE device_serial = ? AND subsystem = ?",
|
|
(device_serial, subsystem),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def get_alerts(device_serial: str) -> list:
|
|
db = await get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT * FROM device_alerts WHERE device_serial = ? ORDER BY updated_at DESC",
|
|
(device_serial,),
|
|
)
|
|
return [dict(r) for r in rows]
|