Phase 3 of Migration
This commit is contained in:
143
backend/migration/migrate_staff.py
Normal file
143
backend/migration/migrate_staff.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""
|
||||
Phase 3 — Step 3.1: admin_users (Firestore → Postgres staff table)
|
||||
|
||||
Reads every document in the 'admin_users' Firestore collection and inserts
|
||||
a matching row into the Postgres 'staff' table.
|
||||
|
||||
Key transformations:
|
||||
- Legacy role names mapped to canonical roles (superadmin→sysadmin, etc.)
|
||||
- permissions=None stored as JSONB null (sysadmin/admin have no permission map)
|
||||
- ui_prefs column NOT migrated (not part of the Postgres schema — dropped)
|
||||
- Firestore doc ID preserved as staff.id and staff.firestore_id
|
||||
- created_at/updated_at default to now() if missing from Firestore doc
|
||||
|
||||
Run on VPS:
|
||||
docker compose exec backend python -m migration.migrate_staff
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
|
||||
from staff.orm import Staff
|
||||
from shared.firebase import init_firebase, get_db as get_firestore
|
||||
from migration.utils import AsyncPgSession, parse_dt, log_run, pg_count
|
||||
|
||||
SCRIPT = "migrate_staff"
|
||||
COLLECTION = "admin_users"
|
||||
|
||||
_ROLE_MAP = {
|
||||
"superadmin": "sysadmin",
|
||||
"melody_editor": "editor",
|
||||
"device_manager": "editor",
|
||||
"user_manager": "editor",
|
||||
"viewer": "user",
|
||||
# canonical roles pass through unchanged
|
||||
"sysadmin": "sysadmin",
|
||||
"admin": "admin",
|
||||
"editor": "editor",
|
||||
"user": "user",
|
||||
"staff": "user",
|
||||
}
|
||||
|
||||
|
||||
def _now_utc() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _coerce_dt(val) -> datetime | None:
|
||||
if val is None:
|
||||
return None
|
||||
if isinstance(val, datetime):
|
||||
return val.replace(tzinfo=timezone.utc) if val.tzinfo is None else val
|
||||
return parse_dt(str(val))
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
init_firebase()
|
||||
fs = get_firestore()
|
||||
if fs is None:
|
||||
print("ERROR: Firebase not initialised.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
docs = list(fs.collection(COLLECTION).stream())
|
||||
source_count = len(docs)
|
||||
print(f"Source (Firestore): {source_count} admin_users documents")
|
||||
|
||||
if source_count == 0:
|
||||
print("Nothing to migrate.")
|
||||
await log_run(SCRIPT, 0, 0, notes="source empty")
|
||||
return
|
||||
|
||||
records = []
|
||||
skipped = 0
|
||||
|
||||
for doc in docs:
|
||||
d = doc.to_dict()
|
||||
|
||||
hashed_password = d.get("hashed_password") or ""
|
||||
if not hashed_password:
|
||||
print(f" WARNING: {doc.id} ({d.get('email')}) has no hashed_password — skipping",
|
||||
file=sys.stderr)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
email = d.get("email") or ""
|
||||
if not email:
|
||||
print(f" WARNING: {doc.id} has no email — skipping", file=sys.stderr)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
raw_role = d.get("role") or "user"
|
||||
role = _ROLE_MAP.get(raw_role, "user")
|
||||
|
||||
# sysadmin/admin have no permission map
|
||||
permissions = d.get("permissions")
|
||||
if role in ("sysadmin", "admin"):
|
||||
permissions = None
|
||||
|
||||
now = _now_utc()
|
||||
records.append({
|
||||
"id": doc.id,
|
||||
"firestore_id": doc.id,
|
||||
"email": email,
|
||||
"name": d.get("name") or "",
|
||||
"role": role,
|
||||
"permissions": permissions,
|
||||
"hashed_password": hashed_password,
|
||||
"is_active": bool(d.get("is_active", True)),
|
||||
"created_at": _coerce_dt(d.get("created_at")) or now,
|
||||
"updated_at": _coerce_dt(d.get("updated_at")) or now,
|
||||
})
|
||||
|
||||
actual_source = source_count - skipped
|
||||
print(f" {skipped} skipped (missing email or password), {actual_source} to insert")
|
||||
|
||||
if not records:
|
||||
print("Nothing to insert after filtering.")
|
||||
await log_run(SCRIPT, source_count, 0, success=False,
|
||||
notes="all docs skipped — missing required fields")
|
||||
sys.exit(1)
|
||||
|
||||
async with AsyncPgSession() as session:
|
||||
async with session.begin():
|
||||
stmt = pg_insert(Staff).values(records)
|
||||
stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
|
||||
await session.execute(stmt)
|
||||
dest_count = await pg_count(session, "staff")
|
||||
|
||||
if dest_count < actual_source:
|
||||
msg = f"Count mismatch: expected>={actual_source} postgres={dest_count}"
|
||||
print(f"ERROR: {msg}", file=sys.stderr)
|
||||
await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Postgres: {dest_count} rows ✓")
|
||||
note = f"{skipped} skipped (missing fields)" if skipped else None
|
||||
await log_run(SCRIPT, source_count, dest_count, notes=note)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user