""" Phase 3 — Step 3.1: admin_users (Firestore → Postgres staff table) Reads every document in the 'admin_users' Firestore collection and inserts a matching row into the Postgres 'staff' table. Key transformations: - Legacy role names mapped to canonical roles (superadmin→sysadmin, etc.) - permissions=None stored as JSONB null (sysadmin/admin have no permission map) - ui_prefs column NOT migrated (not part of the Postgres schema — dropped) - Firestore doc ID preserved as staff.id and staff.firestore_id - created_at/updated_at default to now() if missing from Firestore doc Run on VPS: docker compose exec backend python -m migration.migrate_staff """ import asyncio import sys from datetime import datetime, timezone from sqlalchemy.dialects.postgresql import insert as pg_insert from staff.orm import Staff from shared.firebase import init_firebase, get_db as get_firestore from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count SCRIPT = "migrate_staff" COLLECTION = "admin_users" _ROLE_MAP = { "superadmin": "sysadmin", "melody_editor": "editor", "device_manager": "editor", "user_manager": "editor", "viewer": "user", # canonical roles pass through unchanged "sysadmin": "sysadmin", "admin": "admin", "editor": "editor", "user": "user", "staff": "user", } def _now_utc() -> datetime: return datetime.now(timezone.utc) def _coerce_dt(val) -> datetime | None: if val is None: return None if isinstance(val, datetime): return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val return parse_dt(str(val)) async def run() -> None: init_firebase() fs = get_firestore() if fs is None: print("ERROR: Firebase not initialised.", file=sys.stderr) sys.exit(1) docs = list(fs.collection(COLLECTION).stream()) source_count = len(docs) print(f"Source (Firestore): {source_count} admin_users documents") if source_count == 0: print("Nothing to migrate.") await log_run(SCRIPT, 0, 0, notes="source empty") return records = [] skipped = 0 for doc in docs: d = doc.to_dict() hashed_password = d.get("hashed_password") or "" if not hashed_password: print(f" WARNING: {doc.id} ({d.get('email')}) has no hashed_password — skipping", file=sys.stderr) skipped += 1 continue email = d.get("email") or "" if not email: print(f" WARNING: {doc.id} has no email — skipping", file=sys.stderr) skipped += 1 continue raw_role = d.get("role") or "user" role = _ROLE_MAP.get(raw_role, "user") # sysadmin/admin have no permission map permissions = d.get("permissions") if role in ("sysadmin", "admin"): permissions = None now = _now_utc() records.append({ "id": doc.id, "firestore_id": doc.id, "email": email, "name": d.get("name") or "", "role": role, "permissions": permissions, "hashed_password": hashed_password, "is_active": bool(d.get("is_active", True)), "created_at": _coerce_dt(d.get("created_at")) or now, "updated_at": _coerce_dt(d.get("updated_at")) or now, }) actual_source = source_count - skipped print(f" {skipped} skipped (missing email or password), {actual_source} to insert") if not records: print("Nothing to insert after filtering.") await log_run(SCRIPT, source_count, 0, success=False, notes="all docs skipped — missing required fields") sys.exit(1) async with AsyncPgSession() as session: async with session.begin(): stmt = pg_insert(Staff).values(records) stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) await session.execute(stmt) dest_count = await pg_count(session, "staff") if dest_count < actual_source: msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}" print(f"ERROR: {msg}", file=sys.stderr) await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg) sys.exit(1) print(f"Postgres: {dest_count} rows ✓") note = f"{skipped} skipped (missing fields)" if skipped else None await log_run(SCRIPT, source_count, dest_count, notes=note) if __name__ == "__main__": asyncio.run(run())