Files
bellsystems-cp/backend/migration/migrate_crm_customers.py
2026-04-17 15:25:58 +03:00

173 lines
6.0 KiB
Python

"""
Phase 2 — Step 2.4: crm_customers (Firestore → Postgres)
Reads the 'crm_customers' Firestore collection.
- Strips legacy fields: 'negotiating', 'has_problem'
- Converts Firestore DatetimeWithNanoseconds → UTC datetime
- Converts nested dicts/lists → JSONB-ready Python objects
After this runs, the FK constraints on crm_quotations, crm_comms_log,
crm_media, and crm_orders (all inserted in Phase 1 with FK enforcement
suppressed) become valid.
Run on VPS:
docker compose exec backend python -m migration.migrate_crm_customers
"""
import asyncio
import sys
from datetime import datetime, timezone
from sqlalchemy.dialects.postgresql import insert as pg_insert
from crm.orm import CrmCustomer
from shared.firebase import init_firebase, get_db as get_firestore
from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count
SCRIPT = "migrate_crm_customers"
COLLECTION = "crm_customers"
_LEGACY_FIELDS = {"negotiating", "has_problem"}
_VALID_STATUSES = {
"lead", "active", "inactive", "archived",
"prospect", "churned", "vip",
}
def _now_utc() -> datetime:
return datetime.now(timezone.utc)
def _coerce_dt(val) -> datetime | None:
"""Handle both Firestore DatetimeWithNanoseconds and ISO strings."""
if val is None:
return None
if isinstance(val, datetime):
return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val
return parse_dt(str(val))
def _coerce_list(val, default=None) -> list:
if isinstance(val, list):
return val
return default if default is not None else []
async def run() -> None:
init_firebase()
fs = get_firestore()
if fs is None:
print("ERROR: Firebase not initialised.", file=sys.stderr)
sys.exit(1)
docs = list(fs.collection(COLLECTION).stream())
source_count = len(docs)
print(f"Source (Firestore): {source_count} crm_customers documents")
if source_count == 0:
print("Nothing to migrate.")
await log_run(SCRIPT, 0, 0, notes="source empty")
return
records = []
skipped = 0
for doc in docs:
d = doc.to_dict()
# Strip legacy fields
for f in _LEGACY_FIELDS:
d.pop(f, None)
# folder_id is NOT NULL UNIQUE — skip docs missing it
folder_id = d.get("folder_id") or ""
if not folder_id:
print(f" WARNING: customer {doc.id} has no folder_id — skipping", file=sys.stderr)
skipped += 1
continue
# relationship_status — normalise unknown values to 'lead'
rel_status = d.get("relationship_status") or "lead"
if rel_status not in _VALID_STATUSES:
rel_status = "lead"
# contacts / notes — Firestore stores as list of maps
contacts = _coerce_list(d.get("contacts"))
# Serialise nested Pydantic-style objects to plain dicts
contacts = [c if isinstance(c, dict) else vars(c) for c in contacts]
notes = _coerce_list(d.get("notes"))
notes = [n if isinstance(n, dict) else vars(n) for n in notes]
# location — may be a map or None
location = d.get("location")
if location and not isinstance(location, dict):
location = vars(location)
tags = _coerce_list(d.get("tags"))
owned_items = _coerce_list(d.get("owned_items"))
owned_items = [o if isinstance(o, dict) else vars(o) for o in owned_items]
linked_user_ids = _coerce_list(d.get("linked_user_ids"))
technical_issues = _coerce_list(d.get("technical_issues"))
install_support = _coerce_list(d.get("install_support"))
transaction_history = _coerce_list(d.get("transaction_history"))
crm_summary = d.get("crm_summary")
if crm_summary and not isinstance(crm_summary, dict):
crm_summary = vars(crm_summary)
created_at = _coerce_dt(d.get("created_at")) or _now_utc()
updated_at = _coerce_dt(d.get("updated_at")) or _now_utc()
records.append({
"id": doc.id,
"firestore_id": doc.id,
"title": d.get("title"),
"name": d.get("name") or "",
"surname": d.get("surname"),
"organization": d.get("organization"),
"religion": d.get("religion"),
"language": d.get("language") or "el",
"folder_id": folder_id,
"relationship_status": rel_status,
"nextcloud_folder": d.get("nextcloud_folder"),
"contacts": contacts,
"notes": notes,
"location": location,
"tags": tags,
"owned_items": owned_items,
"linked_user_ids": linked_user_ids,
"technical_issues": technical_issues,
"install_support": install_support,
"transaction_history": transaction_history,
"crm_summary": crm_summary,
"created_at": created_at,
"updated_at": updated_at,
})
actual_source = source_count - skipped
print(f" {skipped} skipped (missing folder_id), {actual_source} to insert")
async with AsyncPgSession() as session:
async with session.begin():
stmt = pg_insert(CrmCustomer).values(records)
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
await session.execute(stmt)
dest_count = await pg_count(session, "crm_customers")
if dest_count < actual_source:
msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}"
print(f"ERROR: {msg}", file=sys.stderr)
await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg)
sys.exit(1)
print(f"Postgres: {dest_count} rows ✓")
await log_run(SCRIPT, source_count, dest_count,
notes=f"{skipped} skipped (no folder_id)" if skipped else None)
if __name__ == "__main__":
asyncio.run(run())