From 4c2400b5964fa91f53be58066f3fa1a12b8c64ef Mon Sep 17 00:00:00 2001 From: bonamin Date: Fri, 17 Apr 2026 15:11:12 +0300 Subject: [PATCH] Phase 1 of Migration. Running Scripts --- backend/migration/__init__.py | 0 backend/migration/migrate_built_melodies.py | 65 ++++++++++ backend/migration/migrate_commands.py | 73 +++++++++++ backend/migration/migrate_crm_comms_log.py | 85 +++++++++++++ backend/migration/migrate_crm_media.py | 76 +++++++++++ .../migration/migrate_crm_quotation_items.py | 85 +++++++++++++ backend/migration/migrate_crm_quotations.py | 120 ++++++++++++++++++ backend/migration/migrate_crm_sync_state.py | 54 ++++++++ backend/migration/migrate_device_alerts.py | 69 ++++++++++ backend/migration/migrate_device_logs.py | 93 ++++++++++++++ backend/migration/migrate_heartbeats.py | 73 +++++++++++ backend/migration/migrate_melody_drafts.py | 66 ++++++++++ backend/migration/migrate_mfg_audit_log.py | 67 ++++++++++ backend/migration/utils.py | 116 +++++++++++++++++ strategies/DATABASE_MIGRATION.md | 54 +++++++- 15 files changed, 1094 insertions(+), 2 deletions(-) create mode 100644 backend/migration/__init__.py create mode 100644 backend/migration/migrate_built_melodies.py create mode 100644 backend/migration/migrate_commands.py create mode 100644 backend/migration/migrate_crm_comms_log.py create mode 100644 backend/migration/migrate_crm_media.py create mode 100644 backend/migration/migrate_crm_quotation_items.py create mode 100644 backend/migration/migrate_crm_quotations.py create mode 100644 backend/migration/migrate_crm_sync_state.py create mode 100644 backend/migration/migrate_device_alerts.py create mode 100644 backend/migration/migrate_device_logs.py create mode 100644 backend/migration/migrate_heartbeats.py create mode 100644 backend/migration/migrate_melody_drafts.py create mode 100644 backend/migration/migrate_mfg_audit_log.py create mode 100644 backend/migration/utils.py diff --git a/backend/migration/__init__.py b/backend/migration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/migration/migrate_built_melodies.py b/backend/migration/migrate_built_melodies.py new file mode 100644 index 0000000..722f01e --- /dev/null +++ b/backend/migration/migrate_built_melodies.py @@ -0,0 +1,65 @@ +""" +Phase 1 — Step 1.2: built_melodies (SQLite → Postgres) + +Run on VPS: + docker compose exec backend python -m migration.migrate_built_melodies +""" + +import asyncio +import sys + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from melodies.orm import BuiltMelody +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, parse_json, log_run, pg_count + +SCRIPT = "migrate_built_melodies" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM built_melodies") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} built_melodies rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for r in rows: + records.append({ + "id": r["id"], + "name": r["name"], + "pid": r["pid"], + "steps": parse_json(r["steps"], default=[]), + "binary_path": r["binary_path"], + "progmem_code": r["progmem_code"], + "assigned_melody_ids": parse_json(r["assigned_melody_ids"], default=[]), + "is_builtin": bool(r["is_builtin"]) if r["is_builtin"] is not None else False, + "created_at": parse_dt(r["created_at"]), + "updated_at": parse_dt(r["updated_at"]), + }) + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(BuiltMelody).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "built_melodies") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_commands.py b/backend/migration/migrate_commands.py new file mode 100644 index 0000000..514d216 --- /dev/null +++ b/backend/migration/migrate_commands.py @@ -0,0 +1,73 @@ +""" +Phase 1 — Step 1.10: commands (SQLite → Postgres) + +commands is a raw-SQL table (no ORM model). BIGSERIAL PK — SQLite integer IDs +are NOT preserved; rows are inserted in sent_at order. + +Run on VPS: + docker compose exec backend python -m migration.migrate_commands +""" + +import asyncio +import sys + +from sqlalchemy import text + +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_commands" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM commands ORDER BY sent_at") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} commands rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [ + { + "device_serial": r["device_serial"], + "command_name": r["command_name"], + "command_payload": r["command_payload"], + "status": r["status"] or "pending", + "response_payload": r["response_payload"], + "sent_at": parse_dt(r["sent_at"]), + "responded_at": parse_dt(r["responded_at"]), + } + for r in rows + ] + + async with AsyncPgSession() as session: + async with session.begin(): + await session.execute( + text(""" + INSERT INTO commands + (device_serial, command_name, command_payload, status, + response_payload, sent_at, responded_at) + VALUES + (:device_serial, :command_name, :command_payload, :status, + :response_payload, :sent_at, :responded_at) + """), + records, + ) + dest_count = await pg_count(session, "commands") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_comms_log.py b/backend/migration/migrate_crm_comms_log.py new file mode 100644 index 0000000..1bda412 --- /dev/null +++ b/backend/migration/migrate_crm_comms_log.py @@ -0,0 +1,85 @@ +""" +Phase 1 — Step 1.9: crm_comms_log (SQLite → Postgres) + +FK to crm_customers(id) (nullable, ON DELETE SET NULL) — FK enforcement +suppressed until Phase 2 populates crm_customers. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_comms_log +""" + +import asyncio +import sys + +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmCommsLog +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, parse_json, log_run, pg_count + +SCRIPT = "migrate_crm_comms_log" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM crm_comms_log ORDER BY occurred_at") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} crm_comms_log rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for r in rows: + # attachments stored as JSON text in SQLite + attachments = parse_json(r["attachments"], default=[]) + + # is_important / is_read stored as INTEGER (0/1) in SQLite + is_important = bool(r["is_important"]) if r["is_important"] is not None else False + is_read = bool(r["is_read"]) if r["is_read"] is not None else True + + records.append({ + "id": r["id"], + "customer_id": r["customer_id"], + "type": r["type"], + "mail_account": r["mail_account"], + "direction": r["direction"], + "subject": r["subject"], + "body": r["body"], + "body_html": r["body_html"], + "attachments": attachments, + "ext_message_id": r["ext_message_id"], + "from_addr": r["from_addr"], + "to_addrs": r["to_addrs"], + "logged_by": r["logged_by"], + "is_important": is_important, + "is_read": is_read, + "occurred_at": parse_dt(r["occurred_at"]), + "created_at": parse_dt(r["created_at"]), + }) + + async with AsyncPgSession() as session: + await session.execute(text("SET session_replication_role = replica")) + async with session.begin(): + stmt = pg_insert(CrmCommsLog).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_comms_log") + await session.execute(text("SET session_replication_role = DEFAULT")) + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_media.py b/backend/migration/migrate_crm_media.py new file mode 100644 index 0000000..114faae --- /dev/null +++ b/backend/migration/migrate_crm_media.py @@ -0,0 +1,76 @@ +""" +Phase 1 — Step 1.8: crm_media (SQLite → Postgres) + +FK to crm_customers(id) (nullable) — FK enforcement suppressed until Phase 2 +populates crm_customers. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_media +""" + +import asyncio +import sys + +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmMedia +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, parse_json, log_run, pg_count + +SCRIPT = "migrate_crm_media" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM crm_media ORDER BY created_at") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} crm_media rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for r in rows: + # SQLite stores tags as JSON text; Postgres column is JSONB + tags_raw = r["tags"] + tags = parse_json(tags_raw, default=[]) + + records.append({ + "id": r["id"], + "customer_id": r["customer_id"], + "order_id": r["order_id"], + "filename": r["filename"], + "nextcloud_path": r["nextcloud_path"], + "thumbnail_path": r["thumbnail_path"], + "mime_type": r["mime_type"], + "direction": r["direction"], + "tags": tags, + "uploaded_by": r["uploaded_by"], + "created_at": parse_dt(r["created_at"]), + }) + + async with AsyncPgSession() as session: + await session.execute(text("SET session_replication_role = replica")) + async with session.begin(): + stmt = pg_insert(CrmMedia).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_media") + await session.execute(text("SET session_replication_role = DEFAULT")) + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_quotation_items.py b/backend/migration/migrate_crm_quotation_items.py new file mode 100644 index 0000000..62cde37 --- /dev/null +++ b/backend/migration/migrate_crm_quotation_items.py @@ -0,0 +1,85 @@ +""" +Phase 1 — Step 1.7: crm_quotation_items (SQLite → Postgres) + +FK to crm_quotations(id) — quotations must be migrated first (step 1.6). +FK enforcement suppressed via session_replication_role for the same reason +as in migrate_crm_quotations (parent crm_customers not yet in PG). + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_quotation_items +""" + +import asyncio +import sys +from decimal import Decimal + +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmQuotationItem +from migration.utils import open_sqlite, AsyncPgSession, log_run, pg_count + +SCRIPT = "migrate_crm_quotation_items" + + +def _dec(val, default="0") -> Decimal: + try: + return Decimal(str(val)) if val is not None else Decimal(default) + except Exception: + return Decimal(default) + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall( + "SELECT * FROM crm_quotation_items ORDER BY quotation_id, sort_order" + ) + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} crm_quotation_items rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for r in rows: + records.append({ + "id": r["id"], + "quotation_id": r["quotation_id"], + "product_id": r["product_id"], + "description": r["description"], + "description_en": r["description_en"], + "description_gr": r["description_gr"], + "unit_type": r["unit_type"] or "pcs", + "unit_cost": _dec(r["unit_cost"]), + "discount_percent": _dec(r["discount_percent"]), + "vat_percent": _dec(r["vat_percent"], "24"), + "quantity": _dec(r["quantity"], "1"), + "line_total": _dec(r["line_total"]), + "sort_order": int(r["sort_order"]) if r["sort_order"] is not None else 0, + }) + + async with AsyncPgSession() as session: + await session.execute(text("SET session_replication_role = replica")) + async with session.begin(): + stmt = pg_insert(CrmQuotationItem).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_quotation_items") + await session.execute(text("SET session_replication_role = DEFAULT")) + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_quotations.py b/backend/migration/migrate_crm_quotations.py new file mode 100644 index 0000000..97bd0eb --- /dev/null +++ b/backend/migration/migrate_crm_quotations.py @@ -0,0 +1,120 @@ +""" +Phase 1 — Step 1.6: crm_quotations (SQLite → Postgres) + +NOTE: crm_quotations has a FK to crm_customers(id). +The customer rows DO NOT exist in Postgres yet (they migrate in Phase 2). +To avoid FK violations, this script temporarily disables FK checks for the +session using SET CONSTRAINTS ALL DEFERRED — but since customer_id is a real +FK with ON DELETE CASCADE, we instead insert with the constraint deferred. + +Safer approach used here: insert with `customer_id` as-is and rely on the +fact that crm_customers will be populated in Phase 2 before any service +reads join across the two tables. The FK is not deferred — instead we disable +the FK constraint enforcement for this transaction only via a session-level +SET session_replication_role = replica; which suppresses FK checks in Postgres. +We restore it immediately after the transaction. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_quotations +""" + +import asyncio +import sys +from decimal import Decimal + +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmQuotation +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, parse_json, log_run, pg_count + +SCRIPT = "migrate_crm_quotations" + + +def _dec(val, default="0") -> Decimal: + try: + return Decimal(str(val)) if val is not None else Decimal(default) + except Exception: + return Decimal(default) + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM crm_quotations ORDER BY created_at") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} crm_quotations rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for r in rows: + records.append({ + "id": r["id"], + "quotation_number": r["quotation_number"], + "title": r["title"], + "subtitle": r["subtitle"], + "customer_id": r["customer_id"], + "language": r["language"] or "en", + "status": r["status"] or "draft", + "order_type": r["order_type"], + "shipping_method": r["shipping_method"], + "estimated_shipping_date": r["estimated_shipping_date"], + "global_discount_label": r["global_discount_label"], + "global_discount_percent": _dec(r["global_discount_percent"]), + "vat_percent": _dec(r["vat_percent"], "24"), + "global_vat_percent": _dec(r["global_vat_percent"], "24"), + "shipping_cost": _dec(r["shipping_cost"]), + "shipping_cost_discount": _dec(r["shipping_cost_discount"]), + "install_cost": _dec(r["install_cost"]), + "install_cost_discount": _dec(r["install_cost_discount"]), + "extras_label": r["extras_label"], + "extras_cost": _dec(r["extras_cost"]), + "comments": parse_json(r["comments"], default=[]), + "quick_notes": parse_json(r["quick_notes"], default={}), + "subtotal_before_discount": _dec(r["subtotal_before_discount"]), + "global_discount_amount": _dec(r["global_discount_amount"]), + "new_subtotal": _dec(r["new_subtotal"]), + "vat_amount": _dec(r["vat_amount"]), + "final_total": _dec(r["final_total"]), + "nextcloud_pdf_path": r["nextcloud_pdf_path"], + "nextcloud_pdf_url": r["nextcloud_pdf_url"], + "client_org": r["client_org"], + "client_name": r["client_name"], + "client_location": r["client_location"], + "client_phone": r["client_phone"], + "client_email": r["client_email"], + "is_legacy": bool(r["is_legacy"]) if r["is_legacy"] is not None else False, + "legacy_date": r["legacy_date"], + "legacy_pdf_path": r["legacy_pdf_path"], + "created_at": parse_dt(r["created_at"]), + "updated_at": parse_dt(r["updated_at"]), + }) + + async with AsyncPgSession() as session: + # Disable FK enforcement for this session so we can insert quotations + # before the referenced crm_customers rows arrive in Phase 2. + await session.execute(text("SET session_replication_role = replica")) + async with session.begin(): + stmt = pg_insert(CrmQuotation).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_quotations") + await session.execute(text("SET session_replication_role = DEFAULT")) + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_sync_state.py b/backend/migration/migrate_crm_sync_state.py new file mode 100644 index 0000000..dcebfb7 --- /dev/null +++ b/backend/migration/migrate_crm_sync_state.py @@ -0,0 +1,54 @@ +""" +Phase 1 — Step 1.5: crm_sync_state (SQLite → Postgres) + +Simple key/value table — small, no FK deps. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_sync_state +""" + +import asyncio +import sys + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmSyncState +from migration.utils import open_sqlite, AsyncPgSession, log_run, pg_count + +SCRIPT = "migrate_crm_sync_state" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM crm_sync_state") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} crm_sync_state rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [{"key": r["key"], "value": r["value"]} for r in rows] + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(CrmSyncState).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["key"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_sync_state") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_device_alerts.py b/backend/migration/migrate_device_alerts.py new file mode 100644 index 0000000..d1e41f3 --- /dev/null +++ b/backend/migration/migrate_device_alerts.py @@ -0,0 +1,69 @@ +""" +Phase 1 — Step 1.4: device_alerts (SQLite → Postgres) + +device_alerts is a "current state" table — one row per (device_serial, subsystem). +The SQLite PK is (device_serial, subsystem); Postgres adds a BIGSERIAL surrogate PK +with a unique constraint on the pair. + +Run on VPS: + docker compose exec backend python -m migration.migrate_device_alerts +""" + +import asyncio +import sys + +from sqlalchemy import text + +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_device_alerts" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM device_alerts") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} device_alerts rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [ + { + "device_serial": r["device_serial"], + "subsystem": r["subsystem"], + "state": r["state"], + "message": r["message"], + "updated_at": parse_dt(r["updated_at"]), + } + for r in rows + ] + + async with AsyncPgSession() as session: + async with session.begin(): + await session.execute( + text(""" + INSERT INTO device_alerts (device_serial, subsystem, state, message, updated_at) + VALUES (:device_serial, :subsystem, :state, :message, :updated_at) + ON CONFLICT (device_serial, subsystem) DO NOTHING + """), + records, + ) + dest_count = await pg_count(session, "device_alerts") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_device_logs.py b/backend/migration/migrate_device_logs.py new file mode 100644 index 0000000..fd087e4 --- /dev/null +++ b/backend/migration/migrate_device_logs.py @@ -0,0 +1,93 @@ +""" +Phase 1 — Step 1.12: device_logs (SQLite → Postgres) + +Largest table — migrated in batches of 10,000 rows to avoid memory issues. +device_logs is a partitioned table; rows route automatically to the correct +monthly partition based on received_at. + +Run on VPS: + docker compose exec backend python -m migration.migrate_device_logs +""" + +import asyncio +import sys + +from sqlalchemy import text + +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_device_logs" +BATCH_SIZE = 10_000 + + +async def run() -> None: + sqlite = await open_sqlite() + + # Total count first + count_row = await sqlite.execute_fetchall("SELECT COUNT(*) FROM device_logs") + source_count = count_row[0][0] + print(f"Source (SQLite): {source_count} device_logs rows") + + if source_count == 0: + await sqlite.close() + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + offset = 0 + total_inserted = 0 + + while offset < source_count: + rows = await sqlite.execute_fetchall( + "SELECT * FROM device_logs ORDER BY received_at LIMIT ? OFFSET ?", + (BATCH_SIZE, offset), + ) + if not rows: + break + + records = [ + { + "device_serial": r["device_serial"], + "level": r["level"], + "message": r["message"], + "device_timestamp": r["device_timestamp"], + "received_at": parse_dt(r["received_at"]), + } + for r in rows + ] + + async with AsyncPgSession() as session: + async with session.begin(): + await session.execute( + text(""" + INSERT INTO device_logs + (device_serial, level, message, device_timestamp, received_at) + VALUES + (:device_serial, :level, :message, :device_timestamp, :received_at) + """), + records, + ) + + total_inserted += len(records) + offset += BATCH_SIZE + pct = min(100, int(total_inserted / source_count * 100)) + print(f" {total_inserted}/{source_count} rows inserted ({pct}%)") + + await sqlite.close() + + # Final count verify + async with AsyncPgSession() as session: + dest_count = await pg_count(session, "device_logs") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_heartbeats.py b/backend/migration/migrate_heartbeats.py new file mode 100644 index 0000000..cb1d254 --- /dev/null +++ b/backend/migration/migrate_heartbeats.py @@ -0,0 +1,73 @@ +""" +Phase 1 — Step 1.11: heartbeats (SQLite → Postgres) + +Raw-SQL table (no ORM model). BIGSERIAL PK — SQLite IDs not preserved. + +Run on VPS: + docker compose exec backend python -m migration.migrate_heartbeats +""" + +import asyncio +import sys + +from sqlalchemy import text + +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_heartbeats" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM heartbeats ORDER BY received_at") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} heartbeats rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [ + { + "device_serial": r["device_serial"], + "device_id": r["device_id"], + "firmware_version": r["firmware_version"], + "ip_address": r["ip_address"], + "gateway": r["gateway"], + "uptime_ms": r["uptime_ms"], + "uptime_display": r["uptime_display"], + "received_at": parse_dt(r["received_at"]), + } + for r in rows + ] + + async with AsyncPgSession() as session: + async with session.begin(): + await session.execute( + text(""" + INSERT INTO heartbeats + (device_serial, device_id, firmware_version, ip_address, + gateway, uptime_ms, uptime_display, received_at) + VALUES + (:device_serial, :device_id, :firmware_version, :ip_address, + :gateway, :uptime_ms, :uptime_display, :received_at) + """), + records, + ) + dest_count = await pg_count(session, "heartbeats") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_melody_drafts.py b/backend/migration/migrate_melody_drafts.py new file mode 100644 index 0000000..ff8f5f1 --- /dev/null +++ b/backend/migration/migrate_melody_drafts.py @@ -0,0 +1,66 @@ +""" +Phase 1 — Step 1.1: melody_drafts (SQLite → Postgres) + +Run on VPS: + docker compose exec backend python -m migration.migrate_melody_drafts +""" + +import asyncio +import json +import sys + +from sqlalchemy import text +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from melodies.orm import MelodyDraft +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, parse_json, log_run, pg_count + +SCRIPT = "migrate_melody_drafts" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM melody_drafts") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} melody_drafts rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for r in rows: + data_raw = r["data"] + # SQLite stores data as JSON text; Postgres column is JSONB + data = parse_json(data_raw, default={}) + + records.append({ + "id": r["id"], + "status": r["status"] or "draft", + "data": data, + "created_at": parse_dt(r["created_at"]), + "updated_at": parse_dt(r["updated_at"]), + }) + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(MelodyDraft).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "melody_drafts") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_mfg_audit_log.py b/backend/migration/migrate_mfg_audit_log.py new file mode 100644 index 0000000..d488e52 --- /dev/null +++ b/backend/migration/migrate_mfg_audit_log.py @@ -0,0 +1,67 @@ +""" +Phase 1 — Step 1.3: mfg_audit_log (SQLite → Postgres) + +Run on VPS: + docker compose exec backend python -m migration.migrate_mfg_audit_log +""" + +import asyncio +import sys + +from sqlalchemy import text + +from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_mfg_audit_log" + + +async def run() -> None: + sqlite = await open_sqlite() + rows = await sqlite.execute_fetchall("SELECT * FROM mfg_audit_log ORDER BY id") + await sqlite.close() + + source_count = len(rows) + print(f"Source (SQLite): {source_count} mfg_audit_log rows") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + # mfg_audit_log uses a BIGSERIAL PK — we don't preserve SQLite integer IDs + # because the Postgres sequence will assign new ones. We insert in the same + # timestamp order so the audit trail remains coherent. + records = [ + { + "timestamp": parse_dt(r["timestamp"]), + "admin_user": r["admin_user"], + "action": r["action"], + "serial_number": r["serial_number"], + "detail": r["detail"], + } + for r in rows + ] + + async with AsyncPgSession() as session: + async with session.begin(): + await session.execute( + text(""" + INSERT INTO mfg_audit_log (timestamp, admin_user, action, serial_number, detail) + VALUES (:timestamp, :admin_user, :action, :serial_number, :detail) + """), + records, + ) + dest_count = await pg_count(session, "mfg_audit_log") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/utils.py b/backend/migration/utils.py new file mode 100644 index 0000000..ee3de4a --- /dev/null +++ b/backend/migration/utils.py @@ -0,0 +1,116 @@ +""" +Shared helpers for all Phase 1 SQLite → Postgres migration scripts. + +Usage in each script: + from migration.utils import open_sqlite, get_pg, log_run, parse_dt, parse_json +""" + +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +import aiosqlite +from sqlalchemy import text +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession + +from config import settings + + +# ── SQLite ──────────────────────────────────────────────────────────────────── + +async def open_sqlite() -> aiosqlite.Connection: + """Open the SQLite database (read-only; no writes during migration).""" + db_path = Path(settings.sqlite_db_path) + if not db_path.exists(): + print(f"ERROR: SQLite database not found at {db_path.resolve()}", file=sys.stderr) + sys.exit(1) + conn = await aiosqlite.connect(str(db_path)) + conn.row_factory = aiosqlite.Row + return conn + + +# ── Postgres ────────────────────────────────────────────────────────────────── + +def _make_pg_session() -> async_sessionmaker: + engine = create_async_engine(settings.database_url, pool_size=5, echo=False) + return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + +AsyncPgSession = _make_pg_session() + + +# ── Type helpers ────────────────────────────────────────────────────────────── + +def parse_dt(value: str | None) -> datetime | None: + """Parse a SQLite TEXT timestamp → timezone-aware datetime (UTC).""" + if not value: + return None + for fmt in ( + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d", + ): + try: + dt = datetime.strptime(value, fmt) + return dt.replace(tzinfo=timezone.utc) + except ValueError: + continue + # ISO format with offset — let fromisoformat handle it + try: + dt = datetime.fromisoformat(value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + pass + print(f"WARNING: could not parse timestamp {value!r} — using now()", file=sys.stderr) + return datetime.now(timezone.utc) + + +def parse_json(value: str | None, default=None): + """Parse a SQLite TEXT JSON column → Python object.""" + if value is None: + return default + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return default + + +# ── Migration run log ───────────────────────────────────────────────────────── + +async def log_run( + script_name: str, + source_rows: int, + dest_rows: int, + success: bool = True, + notes: str | None = None, +) -> None: + """Insert a row into _migration_runs recording this script's execution.""" + async with AsyncPgSession() as session: + await session.execute( + text(""" + INSERT INTO _migration_runs + (script_name, ran_at, source_rows, dest_rows, success, notes) + VALUES + (:script_name, now(), :source_rows, :dest_rows, :success, :notes) + """), + { + "script_name": script_name, + "source_rows": source_rows, + "dest_rows": dest_rows, + "success": "ok" if success else "error", + "notes": notes, + }, + ) + await session.commit() + + +# ── Count helper ────────────────────────────────────────────────────────────── + +async def pg_count(session: AsyncSession, table: str) -> int: + row = await session.execute(text(f"SELECT COUNT(*) FROM {table}")) + return row.scalar() diff --git a/strategies/DATABASE_MIGRATION.md b/strategies/DATABASE_MIGRATION.md index 6d90d3b..7e7206e 100644 --- a/strategies/DATABASE_MIGRATION.md +++ b/strategies/DATABASE_MIGRATION.md @@ -465,11 +465,61 @@ backend/ | Phase | Description | Status | |-------|-------------|--------| -| 0 | Schema foundation (all tables in Postgres) | **COMPLETE** (local) — run `alembic upgrade head` on VPS | -| 1 | SQLite → Postgres (data migration) | NOT STARTED | +| 0 | Schema foundation (all tables in Postgres) | **COMPLETE** — applied on VPS 2026-04-17 | +| 1 | SQLite → Postgres (data migration) | **IN PROGRESS** — scripts written, not yet run | | 2 | Firestore → Postgres (data migration) | NOT STARTED | | 3 | Staff auth cutover | NOT STARTED | | 4 | Audit log system | NOT STARTED | | 5 | MQTT live data cutover | NOT STARTED | Update this table as each phase completes. + +--- + +## Phase 1 — Run Order & Commands + +Run each command on the VPS **in order**. Verify the output of each before proceeding. + +```bash +# 1.1 +docker compose exec backend python -m migration.migrate_melody_drafts + +# 1.2 +docker compose exec backend python -m migration.migrate_built_melodies + +# 1.3 +docker compose exec backend python -m migration.migrate_mfg_audit_log + +# 1.4 +docker compose exec backend python -m migration.migrate_device_alerts + +# 1.5 +docker compose exec backend python -m migration.migrate_crm_sync_state + +# 1.6 (FK enforcement suppressed — crm_customers not in PG yet) +docker compose exec backend python -m migration.migrate_crm_quotations + +# 1.7 +docker compose exec backend python -m migration.migrate_crm_quotation_items + +# 1.8 +docker compose exec backend python -m migration.migrate_crm_media + +# 1.9 +docker compose exec backend python -m migration.migrate_crm_comms_log + +# 1.10 +docker compose exec backend python -m migration.migrate_commands + +# 1.11 +docker compose exec backend python -m migration.migrate_heartbeats + +# 1.12 (largest — batched, shows progress) +docker compose exec backend python -m migration.migrate_device_logs +``` + +After all scripts complete, verify the run log: +```bash +docker compose exec postgres psql -U bellsystems_user -d bellsystems_db \ + -c "SELECT script_name, ran_at, source_rows, dest_rows, success FROM _migration_runs ORDER BY ran_at;" +```