""" Phase 2 — Step 2.3: crm_products (Firestore → Postgres) Reads the 'crm_products' Firestore collection. The Firestore schema is richer than the Postgres target (has costs, stock, name_en, etc.) — we extract only what the Postgres ORM model covers. The rest stays in Firestore until the service is fully cut over. Run on VPS: docker compose exec backend python -m migration.migrate_crm_products """ import asyncio import sys from datetime import datetime, timezone from sqlalchemy.dialects.postgresql import insert as pg_insert from crm.orm import CrmProduct from shared.firebase import init_firebase, get_db as get_firestore from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count SCRIPT = "migrate_crm_products" COLLECTION = "crm_products" _LEGACY_STATUS_MAP = { "active": True, "discontinued": False, "planned": True, } def _now_utc() -> datetime: return datetime.now(timezone.utc) async def run() -> None: init_firebase() fs = get_firestore() if fs is None: print("ERROR: Firebase not initialised.", file=sys.stderr) sys.exit(1) docs = list(fs.collection(COLLECTION).stream()) source_count = len(docs) print(f"Source (Firestore): {source_count} crm_products documents") if source_count == 0: print("Nothing to migrate.") await log_run(SCRIPT, 0, 0, notes="source empty") return records = [] for doc in docs: d = doc.to_dict() # is_active: prefer 'active' bool field, fall back to 'status' string if "active" in d: is_active = bool(d["active"]) else: is_active = _LEGACY_STATUS_MAP.get(d.get("status", "active"), True) # unit_cost: Firestore uses 'price' unit_cost = d.get("unit_cost") or d.get("price") or 0 created_at = parse_dt(d.get("created_at")) or _now_utc() updated_at = parse_dt(d.get("updated_at")) or _now_utc() records.append({ "id": doc.id, "firestore_id": doc.id, "name": d.get("name") or d.get("name_en") or "", "sku": d.get("sku"), "category": d.get("category"), "description": d.get("description") or d.get("description_en"), "unit_cost": unit_cost, "currency": d.get("currency") or "EUR", "unit_type": d.get("unit_type") or "pcs", "is_active": is_active, "created_at": created_at, "updated_at": updated_at, }) async with AsyncPgSession() as session: async with session.begin(): stmt = pg_insert(CrmProduct).values(records) stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) await session.execute(stmt) dest_count = await pg_count(session, "crm_products") if dest_count < source_count: msg = f"Count mismatch: source={source_count} postgres={dest_count}" print(f"ERROR: {msg}", file=sys.stderr) await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) sys.exit(1) print(f"Postgres: {dest_count} rows ✓") await log_run(SCRIPT, source_count, dest_count) if __name__ == "__main__": asyncio.run(run())