# BellSystems CP — Automation & Notification Engine Strategy ## Overview This document defines the architecture and implementation plan for a three-tier intelligence layer built on top of the existing BellSystems Control Panel. The system consists of: 1. **Event Logs** — passive, timestamped record of notable system events 2. **Notifications** — real-time or near-real-time alerts surfaced in the UI 3. **Automation Rules** — trigger → condition → action pipelines, configurable via UI The existing tech stack is unchanged: **FastAPI + SQLite (aiosqlite) + Firestore + React**. Everything new slots in as additional tables in `mqtt_data.db`, new backend modules, and new frontend pages/components. --- ## Architecture ``` ┌──────────────────────────────────────────────────┐ │ Scheduler Loop (runs inside existing FastAPI │ │ startup, alongside email_sync_loop) │ │ │ │ Every 60s: evaluate_rules() │ │ ↓ │ │ Rules Engine │ │ → loads enabled rules from DB │ │ → evaluates conditions against live data │ │ → fires Action Executor on match │ │ │ │ Action Executor │ │ → create_event_log() │ │ → create_notification() │ │ → send_email() (existing) │ │ → mqtt_publish_command() (existing) │ │ → update_field() │ └──────────────────────────────────────────────────┘ ↕ REST / WebSocket ┌──────────────────────────────────────────────────┐ │ React Frontend │ │ - Bell icon in Header (unread count badge) │ │ - Notifications dropdown/panel │ │ - /automations page (rule CRUD) │ │ - Event Log viewer (filterable) │ └──────────────────────────────────────────────────┘ ``` --- ## Database Schema (additions to `mqtt_data.db`) ### `event_log` Permanent, append-only record of things that happened. ```sql CREATE TABLE IF NOT EXISTS event_log ( id TEXT PRIMARY KEY, category TEXT NOT NULL, -- 'device' | 'crm' | 'quotation' | 'user' | 'system' entity_type TEXT, -- 'device' | 'customer' | 'quotation' | 'user' entity_id TEXT, -- the ID of the affected record title TEXT NOT NULL, detail TEXT, severity TEXT NOT NULL DEFAULT 'info', -- 'info' | 'warning' | 'error' rule_id TEXT, -- which automation rule triggered this (nullable) created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_event_log_category ON event_log(category, created_at); CREATE INDEX IF NOT EXISTS idx_event_log_entity ON event_log(entity_type, entity_id); ``` ### `notifications` Short-lived, user-facing alerts. Cleared once read or after TTL. ```sql CREATE TABLE IF NOT EXISTS notifications ( id TEXT PRIMARY KEY, title TEXT NOT NULL, body TEXT, link TEXT, -- optional frontend route, e.g. "/crm/customers/abc123" severity TEXT NOT NULL DEFAULT 'info', -- 'info' | 'warning' | 'error' | 'success' is_read INTEGER NOT NULL DEFAULT 0, rule_id TEXT, entity_type TEXT, entity_id TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_notifications_read ON notifications(is_read, created_at); ``` ### `automation_rules` Stores user-defined rules. Evaluated by the scheduler. ```sql CREATE TABLE IF NOT EXISTS automation_rules ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, enabled INTEGER NOT NULL DEFAULT 1, trigger_type TEXT NOT NULL, -- 'schedule' | 'mqtt_alert' | 'email_received' trigger_config TEXT NOT NULL DEFAULT '{}', -- JSON conditions TEXT NOT NULL DEFAULT '[]', -- JSON array of condition objects actions TEXT NOT NULL DEFAULT '[]', -- JSON array of action objects cooldown_hours REAL NOT NULL DEFAULT 0, -- min hours between firing on same entity last_run_at TEXT, run_count INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); ``` ### `automation_run_log` Deduplication and audit trail for rule executions. ```sql CREATE TABLE IF NOT EXISTS automation_run_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, rule_id TEXT NOT NULL, entity_type TEXT, entity_id TEXT, status TEXT NOT NULL, -- 'fired' | 'skipped_cooldown' | 'error' detail TEXT, fired_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_run_log_rule ON automation_run_log(rule_id, fired_at); CREATE INDEX IF NOT EXISTS idx_run_log_entity ON automation_run_log(entity_type, entity_id, fired_at); ``` --- ## Backend Module: `automation/` New module at `backend/automation/`, registered in `main.py`. ``` backend/automation/ ├── __init__.py ├── router.py # CRUD for rules, event_log GET, notifications GET/PATCH ├── models.py # Pydantic schemas for rules, conditions, actions ├── engine.py # evaluate_rules(), condition evaluators, action executors ├── scheduler.py # automation_loop() async task, wired into main.py startup └── database.py # DB helpers for all 4 new tables ``` ### Wiring into `main.py` ```python from automation.router import router as automation_router from automation.scheduler import automation_loop app.include_router(automation_router) # In startup(): asyncio.create_task(automation_loop()) ``` --- ## Rule Object Structure (JSON, stored in DB) ```json { "id": "rule_abc123", "name": "Quotation follow-up after 7 days", "enabled": true, "trigger_type": "schedule", "trigger_config": { "interval_hours": 24 }, "conditions": [ { "entity": "quotation", "field": "status", "op": "eq", "value": "sent" }, { "entity": "quotation", "field": "days_since_updated", "op": "gte", "value": 7 }, { "entity": "quotation", "field": "has_reply", "op": "eq", "value": false } ], "actions": [ { "type": "send_email", "template_key": "quotation_followup", "to": "{{quotation.client_email}}", "subject": "Following up on Quotation {{quotation.quotation_number}}", "body": "Hi {{customer.name}}, did you have a chance to review our quotation?" }, { "type": "create_notification", "title": "Follow-up sent to {{customer.name}}", "link": "/crm/customers/{{quotation.customer_id}}" }, { "type": "create_event_log", "category": "quotation", "severity": "info", "title": "Auto follow-up sent for {{quotation.quotation_number}}" } ], "cooldown_hours": 168 } ``` --- ## Supported Trigger Types | Trigger | How it works | |---|---| | `schedule` | Evaluated every N hours by the background loop | | `mqtt_alert` | Fires immediately when `_handle_alerts()` in `mqtt/logger.py` upserts an alert — hook into that function | | `email_received` | Fires inside `sync_emails()` in `crm/email_sync.py` after a new inbound email is stored | > **Note:** `mqtt_alert` and `email_received` triggers bypass the scheduler loop — they are called directly from the relevant handler functions, giving near-real-time response. --- ## Supported Condition Operators | op | Meaning | |---|---| | `eq` | equals | | `neq` | not equals | | `gt` / `gte` / `lt` / `lte` | numeric comparisons | | `contains` | string contains | | `is_null` / `not_null` | field presence | | `days_since` | computed: (now - field_datetime) in days | --- ## Supported Action Types | Action | What it does | Notes | |---|---|---| | `create_event_log` | Writes to `event_log` table | Always safe to fire | | `create_notification` | Writes to `notifications` table | Surfaces in UI bell icon | | `send_email` | Calls existing `crm.email_sync.send_email()` | Uses existing mail accounts | | `update_field` | Updates a field on an entity in DB/Firestore | Use carefully — define allowed fields explicitly | | `mqtt_publish` | Calls `mqtt_manager.publish_command()` | For device auto-actions | | `webhook` | HTTP POST to an external URL | Future / optional | --- ## Notification System (Frontend) ### Bell Icon in Header - Polling endpoint: `GET /api/notifications?unread=true&limit=20` - Poll interval: 30 seconds (or switch to WebSocket push — the WS infrastructure already exists via `mqtt_manager`) - Badge shows unread count - Click opens a dropdown panel listing recent notifications with title, time, severity color, and optional link ### Notification Panel - Mark as read: `PATCH /api/notifications/{id}/read` - Mark all read: `PATCH /api/notifications/read-all` - Link field navigates to the relevant page on click ### Toast Popups (optional, Phase 3 polish) - Triggered by polling detecting new unread notifications since last check - Use an existing toast component if one exists, otherwise add a lightweight one --- ## Automation Rules UI (`/automations`) A new sidebar entry under Settings (sysadmin/admin only). ### Rule List Page - Table: name, enabled toggle, trigger type, last run, run count, edit/delete - "New Rule" button ### Rule Editor (modal or full page) - **Name & description** — free text - **Trigger** — dropdown: Schedule / MQTT Alert / Email Received - Schedule: interval hours input - MQTT Alert: subsystem filter (optional) - Email Received: from address filter (optional) - **Conditions** — dynamic list, each row: - Entity selector (Quotation / Device / Customer / User) - Field selector (populated based on entity) - Operator dropdown - Value input - **Actions** — dynamic list, each row: - Action type dropdown - Type-specific fields (to address, subject, body for email; notification title/body; etc.) - Template variables hint: `{{quotation.quotation_number}}`, `{{customer.name}}`, etc. - **Cooldown** — hours between firings on the same entity - **Enabled** toggle ### Rule Run History - Per-rule log: when it fired, on which entity, success/error --- ## Event Log UI Accessible from `/event-log` route, linked from Dashboard. - Filterable by: category, severity, entity type, date range - Columns: time, category, severity badge, title, entity link - Append-only (no deletion from UI) - Retention: purge entries older than configurable days (e.g. 180 days) via the existing `purge_loop` pattern in `mqtt/database.py` --- ## Pre-Built Rules (Seeded on First Run, All Disabled) These are created on first startup — the admin enables and customizes them. | Rule | Trigger | Condition | Action | |---|---|---|---| | Quotation follow-up | Schedule 24h | status=sent AND days_since_updated ≥ 7 AND no reply | Send follow-up email + notify | | Device offline warning | Schedule 1h | no heartbeat for > 2h | Create notification + event log | | New unknown email | email_received | customer_id IS NULL | Create notification | | Subscription expiring soon | Schedule 24h | subscription.expiry_date within 7 days | Notify + send email | | Device critical alert | mqtt_alert | state = CRITICAL | Notify + event log + optional MQTT restart | | Quotation expired | Schedule 24h | status=sent AND days_since_updated ≥ 30 | Update status → expired + notify | --- ## Implementation Phases ### Phase 1 — Foundation (DB + API) - [ ] Add 4 new tables to `mqtt/database.py` schema + migrations - [ ] Create `automation/database.py` with all DB helpers - [ ] Create `automation/models.py` — Pydantic schemas for rules, conditions, actions, notifications, event_log - [ ] Create `automation/router.py` — CRUD for rules, GET event_log, GET/PATCH notifications - [ ] Wire router into `main.py` ### Phase 2 — Rules Engine + Scheduler - [ ] Create `automation/engine.py` — condition evaluator, template renderer, action executor - [ ] Create `automation/scheduler.py` — `automation_loop()` async task - [ ] Hook `email_received` trigger into `crm/email_sync.sync_emails()` - [ ] Hook `mqtt_alert` trigger into `mqtt/logger._handle_alerts()` - [ ] Seed pre-built (disabled) rules on first startup - [ ] Wire `automation_loop()` into `main.py` startup ### Phase 3 — Notification UI - [ ] Bell icon with unread badge in `Header.jsx` - [ ] Notifications dropdown panel component - [ ] 30s polling hook in React - [ ] Mark read / mark all read ### Phase 4 — Automation Rules UI - [ ] `/automations` route and rule list page - [ ] Rule editor form (conditions + actions dynamic builder) - [ ] Enable/disable toggle - [ ] Run history per rule - [ ] Add "Automations" entry to Sidebar under Settings ### Phase 5 — Event Log UI - [ ] `/event-log` route with filterable table - [ ] Purge policy wired into existing `purge_loop` - [ ] Dashboard widget showing recent high-severity events ### Phase 6 — Polish - [ ] Toast notifications on new unread detection - [ ] Template variable previewer in rule editor - [ ] "Run now" button per rule (for testing without waiting for scheduler) - [ ] Named email templates stored in DB (reusable across rules) --- ## Key Design Decisions | Decision | Choice | Reason | |---|---|---| | Storage | SQLite (same `mqtt_data.db`) | Consistent with existing pattern; no new infra | | Scheduler | `asyncio` task in FastAPI startup | Same pattern as `email_sync_loop` and `purge_loop` already in `main.py` | | Rule format | JSON columns in DB | Flexible, UI-editable, no schema migrations per new rule type | | Template variables | `{{entity.field}}` string interpolation | Simple to implement, readable in UI | | Cooldown dedup | `automation_run_log` per (rule_id, entity_id) | Prevents repeat firing on same quotation/device within cooldown window | | Notification delivery | DB polling (30s) initially | The WS infra exists (`mqtt_manager._ws_subscribers`) — easy to upgrade later | | Pre-built rules | Seeded as disabled | Non-intrusive — admin must consciously enable each one | | `update_field` safety | Explicit allowlist of permitted fields | Prevents accidental data corruption from misconfigured rules | --- ## Template Variables Reference Available inside action `body`, `subject`, `title`, `link` fields: | Variable | Source | |---|---| | `{{customer.name}}` | Firestore `crm_customers` | | `{{customer.organization}}` | Firestore `crm_customers` | | `{{quotation.quotation_number}}` | SQLite `crm_quotations` | | `{{quotation.final_total}}` | SQLite `crm_quotations` | | `{{quotation.status}}` | SQLite `crm_quotations` | | `{{quotation.client_email}}` | SQLite `crm_quotations` | | `{{device.serial}}` | Firestore `devices` | | `{{device.label}}` | Firestore `devices` | | `{{alert.subsystem}}` | MQTT alert payload | | `{{alert.state}}` | MQTT alert payload | | `{{user.email}}` | Firestore `users` | --- ## Notes - `crm/email_sync.send_email()` is reused as-is for the `send_email` action type. The engine constructs the call parameters. - `update_field` actions start with an allowlist of: `quotation.status`, `user.status`. Expand deliberately. - For MQTT auto-restart, `mqtt_manager.publish_command(serial, "restart", {})` already works — the engine just calls it. - Firestore is read-only from the automation engine (for customer/device lookups). All writes go to SQLite, consistent with the existing architecture. - The `has_reply` condition on quotations is computed by checking whether any `crm_comms_log` entry exists with `direction='inbound'` and `customer_id` matching the quotation's customer, dated after the quotation's `updated_at`.