""" Phase 2 — Step 2.5: crm_orders (Firestore → Postgres) Orders are stored as a subcollection under each customer: crm_customers/{customer_id}/orders/{order_id} Uses collection_group("orders") to fetch all orders in one pass, then inserts into crm_orders. crm_customers MUST already be in Postgres (step 2.4) so the FK constraint is satisfied. Run on VPS: docker compose exec backend python -m migration.migrate_crm_orders """ import asyncio import sys from datetime import datetime, timezone from sqlalchemy.dialects.postgresql import insert as pg_insert from crm.orm import CrmOrder from shared.firebase import init_firebase, get_db as get_firestore from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count SCRIPT = "migrate_crm_orders" def _now_utc() -> datetime: return datetime.now(timezone.utc) def _coerce_dt(val) -> datetime | None: if val is None: return None if isinstance(val, datetime): return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val return parse_dt(str(val)) def _coerce_list(val) -> list: return val if isinstance(val, list) else [] def _coerce_dict(val) -> dict: return val if isinstance(val, dict) else {} async def run() -> None: init_firebase() fs = get_firestore() if fs is None: print("ERROR: Firebase not initialised.", file=sys.stderr) sys.exit(1) # collection_group fetches from ALL customers' 'orders' subcollections docs = list(fs.collection_group("orders").stream()) source_count = len(docs) print(f"Source (Firestore): {source_count} order documents (via collection_group)") if source_count == 0: print("Nothing to migrate.") await log_run(SCRIPT, 0, 0, notes="source empty") return # First, collect all customer IDs already in Postgres so we can skip # orphaned orders whose customer didn't migrate (missing folder_id edge case) async with AsyncPgSession() as session: from sqlalchemy import text result = await session.execute(text("SELECT id FROM crm_customers")) valid_customer_ids = {row[0] for row in result.fetchall()} records = [] skipped = 0 seen_order_numbers: set[str] = set() for doc in docs: d = doc.to_dict() # Extract customer_id from the document path: # crm_customers/{customer_id}/orders/{order_id} path_parts = doc.reference.path.split("/") # path: crm_customers / / orders / try: customer_id = path_parts[1] except IndexError: print(f" WARNING: cannot parse customer_id from path {doc.reference.path} — skipping") skipped += 1 continue if customer_id not in valid_customer_ids: print(f" WARNING: order {doc.id} references unknown customer {customer_id} — skipping") skipped += 1 continue order_number = d.get("order_number") or f"ORD-LEGACY-{doc.id}" # Deduplicate: if this order_number was already seen in this batch, # make it unique by appending the doc ID suffix. if order_number in seen_order_numbers: order_number = f"{order_number}-{doc.id[:8]}" print(f" INFO: duplicate order_number — renamed to {order_number}") seen_order_numbers.add(order_number) created_at = _coerce_dt(d.get("created_at")) or _now_utc() updated_at = _coerce_dt(d.get("updated_at")) or _now_utc() status_updated_date = _coerce_dt(d.get("status_updated_date")) records.append({ "id": doc.id, "customer_id": customer_id, "order_number": order_number, "title": d.get("title"), "created_by": d.get("created_by"), "status": d.get("status") or "negotiating", "status_updated_date": status_updated_date, "status_updated_by": d.get("status_updated_by"), "items": _coerce_list(d.get("items")), "subtotal": float(d.get("subtotal") or 0), "discount": d.get("discount") if isinstance(d.get("discount"), dict) else None, "total_price": float(d.get("total_price") or 0), "currency": d.get("currency") or "EUR", "shipping": d.get("shipping") if isinstance(d.get("shipping"), dict) else None, "payment_status": _coerce_dict(d.get("payment_status")), "invoice_path": d.get("invoice_path"), "notes": d.get("notes") if isinstance(d.get("notes"), str) else None, "timeline": _coerce_list(d.get("timeline")), "created_at": created_at, "updated_at": updated_at, }) actual_source = source_count - skipped print(f" {skipped} skipped (orphaned/bad path), {actual_source} to insert") if not records: print("Nothing valid to insert.") await log_run(SCRIPT, source_count, 0, notes=f"{skipped} all skipped") return async with AsyncPgSession() as session: async with session.begin(): stmt = pg_insert(CrmOrder).values(records) stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) await session.execute(stmt) dest_count = await pg_count(session, "crm_orders") if dest_count < actual_source: msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}" print(f"ERROR: {msg}", file=sys.stderr) await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) sys.exit(1) print(f"Postgres: {dest_count} rows ✓") await log_run(SCRIPT, source_count, dest_count, notes=f"{skipped} skipped" if skipped else None) if __name__ == "__main__": asyncio.run(run())