157 lines
5.9 KiB
Python
157 lines
5.9 KiB
Python
"""
|
|
Phase 2 — Step 2.5: crm_orders (Firestore → Postgres)
|
|
|
|
Orders are stored as a subcollection under each customer:
|
|
crm_customers/{customer_id}/orders/{order_id}
|
|
|
|
Uses collection_group("orders") to fetch all orders in one pass,
|
|
then inserts into crm_orders. crm_customers MUST already be in Postgres
|
|
(step 2.4) so the FK constraint is satisfied.
|
|
|
|
Run on VPS:
|
|
docker compose exec backend python -m migration.migrate_crm_orders
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
|
|
from crm.orm import CrmOrder
|
|
from shared.firebase import init_firebase, get_db as get_firestore
|
|
from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count
|
|
|
|
SCRIPT = "migrate_crm_orders"
|
|
|
|
|
|
def _now_utc() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def _coerce_dt(val) -> datetime | None:
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, datetime):
|
|
return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val
|
|
return parse_dt(str(val))
|
|
|
|
|
|
def _coerce_list(val) -> list:
|
|
return val if isinstance(val, list) else []
|
|
|
|
|
|
def _coerce_dict(val) -> dict:
|
|
return val if isinstance(val, dict) else {}
|
|
|
|
|
|
async def run() -> None:
|
|
init_firebase()
|
|
fs = get_firestore()
|
|
if fs is None:
|
|
print("ERROR: Firebase not initialised.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# collection_group fetches from ALL customers' 'orders' subcollections
|
|
docs = list(fs.collection_group("orders").stream())
|
|
source_count = len(docs)
|
|
print(f"Source (Firestore): {source_count} order documents (via collection_group)")
|
|
|
|
if source_count == 0:
|
|
print("Nothing to migrate.")
|
|
await log_run(SCRIPT, 0, 0, notes="source empty")
|
|
return
|
|
|
|
# First, collect all customer IDs already in Postgres so we can skip
|
|
# orphaned orders whose customer didn't migrate (missing folder_id edge case)
|
|
async with AsyncPgSession() as session:
|
|
from sqlalchemy import text
|
|
result = await session.execute(text("SELECT id FROM crm_customers"))
|
|
valid_customer_ids = {row[0] for row in result.fetchall()}
|
|
|
|
records = []
|
|
skipped = 0
|
|
seen_order_numbers: set[str] = set()
|
|
for doc in docs:
|
|
d = doc.to_dict()
|
|
|
|
# Extract customer_id from the document path:
|
|
# crm_customers/{customer_id}/orders/{order_id}
|
|
path_parts = doc.reference.path.split("/")
|
|
# path: crm_customers / <cid> / orders / <oid>
|
|
try:
|
|
customer_id = path_parts[1]
|
|
except IndexError:
|
|
print(f" WARNING: cannot parse customer_id from path {doc.reference.path} — skipping")
|
|
skipped += 1
|
|
continue
|
|
|
|
if customer_id not in valid_customer_ids:
|
|
print(f" WARNING: order {doc.id} references unknown customer {customer_id} — skipping")
|
|
skipped += 1
|
|
continue
|
|
|
|
order_number = d.get("order_number") or f"ORD-LEGACY-{doc.id}"
|
|
# Deduplicate: if this order_number was already seen in this batch,
|
|
# make it unique by appending the doc ID suffix.
|
|
if order_number in seen_order_numbers:
|
|
order_number = f"{order_number}-{doc.id[:8]}"
|
|
print(f" INFO: duplicate order_number — renamed to {order_number}")
|
|
seen_order_numbers.add(order_number)
|
|
|
|
created_at = _coerce_dt(d.get("created_at")) or _now_utc()
|
|
updated_at = _coerce_dt(d.get("updated_at")) or _now_utc()
|
|
status_updated_date = _coerce_dt(d.get("status_updated_date"))
|
|
|
|
records.append({
|
|
"id": doc.id,
|
|
"customer_id": customer_id,
|
|
"order_number": order_number,
|
|
"title": d.get("title"),
|
|
"created_by": d.get("created_by"),
|
|
"status": d.get("status") or "negotiating",
|
|
"status_updated_date": status_updated_date,
|
|
"status_updated_by": d.get("status_updated_by"),
|
|
"items": _coerce_list(d.get("items")),
|
|
"subtotal": float(d.get("subtotal") or 0),
|
|
"discount": d.get("discount") if isinstance(d.get("discount"), dict) else None,
|
|
"total_price": float(d.get("total_price") or 0),
|
|
"currency": d.get("currency") or "EUR",
|
|
"shipping": d.get("shipping") if isinstance(d.get("shipping"), dict) else None,
|
|
"payment_status": _coerce_dict(d.get("payment_status")),
|
|
"invoice_path": d.get("invoice_path"),
|
|
"notes": d.get("notes") if isinstance(d.get("notes"), str) else None,
|
|
"timeline": _coerce_list(d.get("timeline")),
|
|
"created_at": created_at,
|
|
"updated_at": updated_at,
|
|
})
|
|
|
|
actual_source = source_count - skipped
|
|
print(f" {skipped} skipped (orphaned/bad path), {actual_source} to insert")
|
|
|
|
if not records:
|
|
print("Nothing valid to insert.")
|
|
await log_run(SCRIPT, source_count, 0, notes=f"{skipped} all skipped")
|
|
return
|
|
|
|
async with AsyncPgSession() as session:
|
|
async with session.begin():
|
|
stmt = pg_insert(CrmOrder).values(records)
|
|
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
|
|
await session.execute(stmt)
|
|
dest_count = await pg_count(session, "crm_orders")
|
|
|
|
if dest_count < actual_source:
|
|
msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}"
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg)
|
|
sys.exit(1)
|
|
|
|
print(f"Postgres: {dest_count} rows ✓")
|
|
await log_run(SCRIPT, source_count, dest_count,
|
|
notes=f"{skipped} skipped" if skipped else None)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run())
|