diff --git a/backend/migration/migrate_crm_customers.py b/backend/migration/migrate_crm_customers.py new file mode 100644 index 0000000..59209f2 --- /dev/null +++ b/backend/migration/migrate_crm_customers.py @@ -0,0 +1,172 @@ +""" +Phase 2 — Step 2.4: crm_customers (Firestore → Postgres) + +Reads the 'crm_customers' Firestore collection. +- Strips legacy fields: 'negotiating', 'has_problem' +- Converts Firestore DatetimeWithNanoseconds → UTC datetime +- Converts nested dicts/lists → JSONB-ready Python objects + +After this runs, the FK constraints on crm_quotations, crm_comms_log, +crm_media, and crm_orders (all inserted in Phase 1 with FK enforcement +suppressed) become valid. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_customers +""" + +import asyncio +import sys +from datetime import datetime, timezone + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmCustomer +from shared.firebase import init_firebase, get_db as get_firestore +from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_crm_customers" +COLLECTION = "crm_customers" + +_LEGACY_FIELDS = {"negotiating", "has_problem"} + +_VALID_STATUSES = { + "lead", "active", "inactive", "archived", + "prospect", "churned", "vip", +} + + +def _now_utc() -> datetime: + return datetime.now(timezone.utc) + + +def _coerce_dt(val) -> datetime | None: + """Handle both Firestore DatetimeWithNanoseconds and ISO strings.""" + if val is None: + return None + if isinstance(val, datetime): + return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val + return parse_dt(str(val)) + + +def _coerce_list(val, default=None) -> list: + if isinstance(val, list): + return val + return default if default is not None else [] + + +async def run() -> None: + init_firebase() + fs = get_firestore() + if fs is None: + print("ERROR: Firebase not initialised.", file=sys.stderr) + sys.exit(1) + + docs = list(fs.collection(COLLECTION).stream()) + source_count = len(docs) + print(f"Source (Firestore): {source_count} crm_customers documents") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + skipped = 0 + for doc in docs: + d = doc.to_dict() + + # Strip legacy fields + for f in _LEGACY_FIELDS: + d.pop(f, None) + + # folder_id is NOT NULL UNIQUE — skip docs missing it + folder_id = d.get("folder_id") or "" + if not folder_id: + print(f" WARNING: customer {doc.id} has no folder_id — skipping", file=sys.stderr) + skipped += 1 + continue + + # relationship_status — normalise unknown values to 'lead' + rel_status = d.get("relationship_status") or "lead" + if rel_status not in _VALID_STATUSES: + rel_status = "lead" + + # contacts / notes — Firestore stores as list of maps + contacts = _coerce_list(d.get("contacts")) + # Serialise nested Pydantic-style objects to plain dicts + contacts = [c if isinstance(c, dict) else vars(c) for c in contacts] + + notes = _coerce_list(d.get("notes")) + notes = [n if isinstance(n, dict) else vars(n) for n in notes] + + # location — may be a map or None + location = d.get("location") + if location and not isinstance(location, dict): + location = vars(location) + + tags = _coerce_list(d.get("tags")) + owned_items = _coerce_list(d.get("owned_items")) + owned_items = [o if isinstance(o, dict) else vars(o) for o in owned_items] + + linked_user_ids = _coerce_list(d.get("linked_user_ids")) + + technical_issues = _coerce_list(d.get("technical_issues")) + install_support = _coerce_list(d.get("install_support")) + transaction_history = _coerce_list(d.get("transaction_history")) + + crm_summary = d.get("crm_summary") + if crm_summary and not isinstance(crm_summary, dict): + crm_summary = vars(crm_summary) + + created_at = _coerce_dt(d.get("created_at")) or _now_utc() + updated_at = _coerce_dt(d.get("updated_at")) or _now_utc() + + records.append({ + "id": doc.id, + "firestore_id": doc.id, + "title": d.get("title"), + "name": d.get("name") or "", + "surname": d.get("surname"), + "organization": d.get("organization"), + "religion": d.get("religion"), + "language": d.get("language") or "el", + "folder_id": folder_id, + "relationship_status": rel_status, + "nextcloud_folder": d.get("nextcloud_folder"), + "contacts": contacts, + "notes": notes, + "location": location, + "tags": tags, + "owned_items": owned_items, + "linked_user_ids": linked_user_ids, + "technical_issues": technical_issues, + "install_support": install_support, + "transaction_history": transaction_history, + "crm_summary": crm_summary, + "created_at": created_at, + "updated_at": updated_at, + }) + + actual_source = source_count - skipped + print(f" {skipped} skipped (missing folder_id), {actual_source} to insert") + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(CrmCustomer).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_customers") + + if dest_count < actual_source: + msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count, + notes=f"{skipped} skipped (no folder_id)" if skipped else None) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_orders.py b/backend/migration/migrate_crm_orders.py new file mode 100644 index 0000000..7f67355 --- /dev/null +++ b/backend/migration/migrate_crm_orders.py @@ -0,0 +1,149 @@ +""" +Phase 2 — Step 2.5: crm_orders (Firestore → Postgres) + +Orders are stored as a subcollection under each customer: + crm_customers/{customer_id}/orders/{order_id} + +Uses collection_group("orders") to fetch all orders in one pass, +then inserts into crm_orders. crm_customers MUST already be in Postgres +(step 2.4) so the FK constraint is satisfied. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_orders +""" + +import asyncio +import sys +from datetime import datetime, timezone + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmOrder +from shared.firebase import init_firebase, get_db as get_firestore +from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_crm_orders" + + +def _now_utc() -> datetime: + return datetime.now(timezone.utc) + + +def _coerce_dt(val) -> datetime | None: + if val is None: + return None + if isinstance(val, datetime): + return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val + return parse_dt(str(val)) + + +def _coerce_list(val) -> list: + return val if isinstance(val, list) else [] + + +def _coerce_dict(val) -> dict: + return val if isinstance(val, dict) else {} + + +async def run() -> None: + init_firebase() + fs = get_firestore() + if fs is None: + print("ERROR: Firebase not initialised.", file=sys.stderr) + sys.exit(1) + + # collection_group fetches from ALL customers' 'orders' subcollections + docs = list(fs.collection_group("orders").stream()) + source_count = len(docs) + print(f"Source (Firestore): {source_count} order documents (via collection_group)") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + # First, collect all customer IDs already in Postgres so we can skip + # orphaned orders whose customer didn't migrate (missing folder_id edge case) + async with AsyncPgSession() as session: + from sqlalchemy import text + result = await session.execute(text("SELECT id FROM crm_customers")) + valid_customer_ids = {row[0] for row in result.fetchall()} + + records = [] + skipped = 0 + for doc in docs: + d = doc.to_dict() + + # Extract customer_id from the document path: + # crm_customers/{customer_id}/orders/{order_id} + path_parts = doc.reference.path.split("/") + # path: crm_customers / / orders / + try: + customer_id = path_parts[1] + except IndexError: + print(f" WARNING: cannot parse customer_id from path {doc.reference.path} — skipping") + skipped += 1 + continue + + if customer_id not in valid_customer_ids: + print(f" WARNING: order {doc.id} references unknown customer {customer_id} — skipping") + skipped += 1 + continue + + order_number = d.get("order_number") or f"ORD-LEGACY-{doc.id[:8]}" + + created_at = _coerce_dt(d.get("created_at")) or _now_utc() + updated_at = _coerce_dt(d.get("updated_at")) or _now_utc() + status_updated_date = _coerce_dt(d.get("status_updated_date")) + + records.append({ + "id": doc.id, + "customer_id": customer_id, + "order_number": order_number, + "title": d.get("title"), + "created_by": d.get("created_by"), + "status": d.get("status") or "negotiating", + "status_updated_date": status_updated_date, + "status_updated_by": d.get("status_updated_by"), + "items": _coerce_list(d.get("items")), + "subtotal": float(d.get("subtotal") or 0), + "discount": d.get("discount") if isinstance(d.get("discount"), dict) else None, + "total_price": float(d.get("total_price") or 0), + "currency": d.get("currency") or "EUR", + "shipping": d.get("shipping") if isinstance(d.get("shipping"), dict) else None, + "payment_status": _coerce_dict(d.get("payment_status")), + "invoice_path": d.get("invoice_path"), + "notes": d.get("notes") if isinstance(d.get("notes"), str) else None, + "timeline": _coerce_list(d.get("timeline")), + "created_at": created_at, + "updated_at": updated_at, + }) + + actual_source = source_count - skipped + print(f" {skipped} skipped (orphaned/bad path), {actual_source} to insert") + + if not records: + print("Nothing valid to insert.") + await log_run(SCRIPT, source_count, 0, notes=f"{skipped} all skipped") + return + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(CrmOrder).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_orders") + + if dest_count < actual_source: + msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count, + notes=f"{skipped} skipped" if skipped else None) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_crm_products.py b/backend/migration/migrate_crm_products.py new file mode 100644 index 0000000..473e859 --- /dev/null +++ b/backend/migration/migrate_crm_products.py @@ -0,0 +1,102 @@ +""" +Phase 2 — Step 2.3: crm_products (Firestore → Postgres) + +Reads the 'crm_products' Firestore collection. The Firestore schema is richer +than the Postgres target (has costs, stock, name_en, etc.) — we extract only +what the Postgres ORM model covers. The rest stays in Firestore until the +service is fully cut over. + +Run on VPS: + docker compose exec backend python -m migration.migrate_crm_products +""" + +import asyncio +import sys +from datetime import datetime, timezone + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from crm.orm import CrmProduct +from shared.firebase import init_firebase, get_db as get_firestore +from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count + +SCRIPT = "migrate_crm_products" +COLLECTION = "crm_products" + +_LEGACY_STATUS_MAP = { + "active": True, + "discontinued": False, + "planned": True, +} + + +def _now_utc() -> datetime: + return datetime.now(timezone.utc) + + +async def run() -> None: + init_firebase() + fs = get_firestore() + if fs is None: + print("ERROR: Firebase not initialised.", file=sys.stderr) + sys.exit(1) + + docs = list(fs.collection(COLLECTION).stream()) + source_count = len(docs) + print(f"Source (Firestore): {source_count} crm_products documents") + + if source_count == 0: + print("Nothing to migrate.") + await log_run(SCRIPT, 0, 0, notes="source empty") + return + + records = [] + for doc in docs: + d = doc.to_dict() + + # is_active: prefer 'active' bool field, fall back to 'status' string + if "active" in d: + is_active = bool(d["active"]) + else: + is_active = _LEGACY_STATUS_MAP.get(d.get("status", "active"), True) + + # unit_cost: Firestore uses 'price' + unit_cost = d.get("unit_cost") or d.get("price") or 0 + + created_at = parse_dt(d.get("created_at")) or _now_utc() + updated_at = parse_dt(d.get("updated_at")) or _now_utc() + + records.append({ + "id": doc.id, + "firestore_id": doc.id, + "name": d.get("name") or d.get("name_en") or "", + "sku": d.get("sku"), + "category": d.get("category"), + "description": d.get("description") or d.get("description_en"), + "unit_cost": unit_cost, + "currency": d.get("currency") or "EUR", + "unit_type": d.get("unit_type") or "pcs", + "is_active": is_active, + "created_at": created_at, + "updated_at": updated_at, + }) + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(CrmProduct).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + await session.execute(stmt) + dest_count = await pg_count(session, "crm_products") + + if dest_count < source_count: + msg = f"Count mismatch: source={source_count} postgres={dest_count}" + print(f"ERROR: {msg}", file=sys.stderr) + await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) + sys.exit(1) + + print(f"Postgres: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_public_features.py b/backend/migration/migrate_public_features.py new file mode 100644 index 0000000..54f5712 --- /dev/null +++ b/backend/migration/migrate_public_features.py @@ -0,0 +1,56 @@ +""" +Phase 2 — Step 2.2: public_features (Firestore → Postgres) + +Reads the single 'admin_settings/public_features' doc from Firestore and +flattens each field into a key/value row in public_features. + +Run on VPS: + docker compose exec backend python -m migration.migrate_public_features +""" + +import asyncio +import sys + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from settings.orm import PublicFeature +from shared.firebase import init_firebase, get_db as get_firestore +from migration.utils import AsyncPgSession, log_run, pg_count + +SCRIPT = "migrate_public_features" +COLLECTION = "admin_settings" +DOC_ID = "public_features" + + +async def run() -> None: + init_firebase() + fs = get_firestore() + if fs is None: + print("ERROR: Firebase not initialised — check service account path.", file=sys.stderr) + sys.exit(1) + + doc = fs.collection(COLLECTION).document(DOC_ID).get() + if not doc.exists: + print("No public_features document found in Firestore — skipping.") + await log_run(SCRIPT, 0, 0, notes="source doc not found") + return + + data = doc.to_dict() + source_count = len(data) + print(f"Source (Firestore): {source_count} fields in {COLLECTION}/{DOC_ID}") + + records = [{"key": k, "value": v} for k, v in data.items()] + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(PublicFeature).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["key"]) + await session.execute(stmt) + dest_count = await pg_count(session, "public_features") + + print(f"Postgres public_features: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/migration/migrate_settings.py b/backend/migration/migrate_settings.py new file mode 100644 index 0000000..8937b7b --- /dev/null +++ b/backend/migration/migrate_settings.py @@ -0,0 +1,56 @@ +""" +Phase 2 — Step 2.1: console_settings (Firestore → Postgres) + +Reads the single 'admin_settings/melody_settings' doc from Firestore and +flattens each field into a key/value row in console_settings. + +Run on VPS: + docker compose exec backend python -m migration.migrate_settings +""" + +import asyncio +import sys + +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from settings.orm import ConsoleSetting +from shared.firebase import init_firebase, get_db as get_firestore +from migration.utils import AsyncPgSession, log_run, pg_count + +SCRIPT = "migrate_settings" +COLLECTION = "admin_settings" +DOC_ID = "melody_settings" + + +async def run() -> None: + init_firebase() + fs = get_firestore() + if fs is None: + print("ERROR: Firebase not initialised — check service account path.", file=sys.stderr) + sys.exit(1) + + doc = fs.collection(COLLECTION).document(DOC_ID).get() + if not doc.exists: + print("No melody_settings document found in Firestore — skipping.") + await log_run(SCRIPT, 0, 0, notes="source doc not found") + return + + data = doc.to_dict() + source_count = len(data) + print(f"Source (Firestore): {source_count} fields in {COLLECTION}/{DOC_ID}") + + records = [{"key": k, "value": v} for k, v in data.items()] + + async with AsyncPgSession() as session: + async with session.begin(): + stmt = pg_insert(ConsoleSetting).values(records) + stmt = stmt.on_conflict_do_nothing(index_elements=["key"]) + await session.execute(stmt) + dest_count = await pg_count(session, "console_settings") + + print(f"Postgres console_settings: {dest_count} rows ✓") + await log_run(SCRIPT, source_count, dest_count) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/strategies/DATABASE_MIGRATION.md b/strategies/DATABASE_MIGRATION.md index 7e7206e..fcec0d1 100644 --- a/strategies/DATABASE_MIGRATION.md +++ b/strategies/DATABASE_MIGRATION.md @@ -466,8 +466,8 @@ backend/ | Phase | Description | Status | |-------|-------------|--------| | 0 | Schema foundation (all tables in Postgres) | **COMPLETE** — applied on VPS 2026-04-17 | -| 1 | SQLite → Postgres (data migration) | **IN PROGRESS** — scripts written, not yet run | -| 2 | Firestore → Postgres (data migration) | NOT STARTED | +| 1 | SQLite → Postgres (data migration) | **COMPLETE** — all 12 scripts ran successfully on VPS 2026-04-17 | +| 2 | Firestore → Postgres (data migration) | **IN PROGRESS** — scripts written, not yet run | | 3 | Staff auth cutover | NOT STARTED | | 4 | Audit log system | NOT STARTED | | 5 | MQTT live data cutover | NOT STARTED | @@ -523,3 +523,26 @@ After all scripts complete, verify the run log: docker compose exec postgres psql -U bellsystems_user -d bellsystems_db \ -c "SELECT script_name, ran_at, source_rows, dest_rows, success FROM _migration_runs ORDER BY ran_at;" ``` + +--- + +## Phase 2 — Run Order & Commands + +crm_customers MUST run before crm_orders (FK dependency). + +```bash +# 2.1 +docker compose exec backend python -m migration.migrate_settings + +# 2.2 +docker compose exec backend python -m migration.migrate_public_features + +# 2.3 +docker compose exec backend python -m migration.migrate_crm_products + +# 2.4 (required before 2.5) +docker compose exec backend python -m migration.migrate_crm_customers + +# 2.5 (depends on 2.4) +docker compose exec backend python -m migration.migrate_crm_orders +```