Phase 2 of Migration
This commit is contained in:
102
backend/migration/migrate_crm_products.py
Normal file
102
backend/migration/migrate_crm_products.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""
|
||||
Phase 2 — Step 2.3: crm_products (Firestore → Postgres)
|
||||
|
||||
Reads the 'crm_products' Firestore collection. The Firestore schema is richer
|
||||
than the Postgres target (has costs, stock, name_en, etc.) — we extract only
|
||||
what the Postgres ORM model covers. The rest stays in Firestore until the
|
||||
service is fully cut over.
|
||||
|
||||
Run on VPS:
|
||||
docker compose exec backend python -m migration.migrate_crm_products
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
|
||||
from crm.orm import CrmProduct
|
||||
from shared.firebase import init_firebase, get_db as get_firestore
|
||||
from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count
|
||||
|
||||
SCRIPT = "migrate_crm_products"
|
||||
COLLECTION = "crm_products"
|
||||
|
||||
_LEGACY_STATUS_MAP = {
|
||||
"active": True,
|
||||
"discontinued": False,
|
||||
"planned": True,
|
||||
}
|
||||
|
||||
|
||||
def _now_utc() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
init_firebase()
|
||||
fs = get_firestore()
|
||||
if fs is None:
|
||||
print("ERROR: Firebase not initialised.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
docs = list(fs.collection(COLLECTION).stream())
|
||||
source_count = len(docs)
|
||||
print(f"Source (Firestore): {source_count} crm_products documents")
|
||||
|
||||
if source_count == 0:
|
||||
print("Nothing to migrate.")
|
||||
await log_run(SCRIPT, 0, 0, notes="source empty")
|
||||
return
|
||||
|
||||
records = []
|
||||
for doc in docs:
|
||||
d = doc.to_dict()
|
||||
|
||||
# is_active: prefer 'active' bool field, fall back to 'status' string
|
||||
if "active" in d:
|
||||
is_active = bool(d["active"])
|
||||
else:
|
||||
is_active = _LEGACY_STATUS_MAP.get(d.get("status", "active"), True)
|
||||
|
||||
# unit_cost: Firestore uses 'price'
|
||||
unit_cost = d.get("unit_cost") or d.get("price") or 0
|
||||
|
||||
created_at = parse_dt(d.get("created_at")) or _now_utc()
|
||||
updated_at = parse_dt(d.get("updated_at")) or _now_utc()
|
||||
|
||||
records.append({
|
||||
"id": doc.id,
|
||||
"firestore_id": doc.id,
|
||||
"name": d.get("name") or d.get("name_en") or "",
|
||||
"sku": d.get("sku"),
|
||||
"category": d.get("category"),
|
||||
"description": d.get("description") or d.get("description_en"),
|
||||
"unit_cost": unit_cost,
|
||||
"currency": d.get("currency") or "EUR",
|
||||
"unit_type": d.get("unit_type") or "pcs",
|
||||
"is_active": is_active,
|
||||
"created_at": created_at,
|
||||
"updated_at": updated_at,
|
||||
})
|
||||
|
||||
async with AsyncPgSession() as session:
|
||||
async with session.begin():
|
||||
stmt = pg_insert(CrmProduct).values(records)
|
||||
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
|
||||
await session.execute(stmt)
|
||||
dest_count = await pg_count(session, "crm_products")
|
||||
|
||||
if dest_count < source_count:
|
||||
msg = f"Count mismatch: source={source_count} postgres={dest_count}"
|
||||
print(f"ERROR: {msg}", file=sys.stderr)
|
||||
await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Postgres: {dest_count} rows ✓")
|
||||
await log_run(SCRIPT, source_count, dest_count)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user