from shared.firebase import get_db from auth.utils import hash_password from auth.models import default_permissions_for_role from shared.exceptions import NotFoundError, AuthorizationError import uuid VALID_ROLES = ("sysadmin", "admin", "editor", "user") def _staff_doc_to_response(doc_id: str, data: dict) -> dict: return { "id": doc_id, "email": data.get("email", ""), "name": data.get("name", ""), "role": data.get("role", ""), "is_active": data.get("is_active", True), "permissions": data.get("permissions"), } async def list_staff(search: str = None, role_filter: str = None) -> dict: db = get_db() ref = db.collection("admin_users") docs = ref.get() staff = [] for doc in docs: data = doc.to_dict() if search: s = search.lower() if s not in (data.get("name", "").lower()) and s not in (data.get("email", "").lower()): continue if role_filter: if data.get("role") != role_filter: continue staff.append(_staff_doc_to_response(doc.id, data)) return {"staff": staff, "total": len(staff)} async def get_staff(staff_id: str) -> dict: db = get_db() doc = db.collection("admin_users").document(staff_id).get() if not doc.exists: raise NotFoundError("Staff member not found") return _staff_doc_to_response(doc.id, doc.to_dict()) async def get_staff_me(user_sub: str) -> dict: db = get_db() doc = db.collection("admin_users").document(user_sub).get() if not doc.exists: raise NotFoundError("Staff member not found") return _staff_doc_to_response(doc.id, doc.to_dict()) async def create_staff(data: dict, current_user_role: str) -> dict: role = data.get("role", "user") if role not in VALID_ROLES: raise AuthorizationError(f"Invalid role: {role}") # Admin cannot create sysadmin if current_user_role == "admin" and role == "sysadmin": raise AuthorizationError("Admin cannot create sysadmin accounts") db = get_db() # Check for duplicate email existing = db.collection("admin_users").where("email", "==", data["email"]).limit(1).get() if existing: raise AuthorizationError("A staff member with this email already exists") uid = str(uuid.uuid4()) hashed = hash_password(data["password"]) # Set default permissions for editor/user if not provided permissions = data.get("permissions") if permissions is None and role in ("editor", "user"): permissions = default_permissions_for_role(role) doc_data = { "uid": uid, "email": data["email"], "hashed_password": hashed, "name": data["name"], "role": role, "is_active": True, "permissions": permissions, } doc_ref = db.collection("admin_users").document(uid) doc_ref.set(doc_data) return _staff_doc_to_response(uid, doc_data) async def update_staff(staff_id: str, data: dict, current_user_role: str, current_user_id: str) -> dict: db = get_db() doc_ref = db.collection("admin_users").document(staff_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Staff member not found") existing = doc.to_dict() # Admin cannot edit sysadmin accounts if current_user_role == "admin" and existing.get("role") == "sysadmin": raise AuthorizationError("Admin cannot modify sysadmin accounts") # Admin cannot promote to sysadmin if current_user_role == "admin" and data.get("role") == "sysadmin": raise AuthorizationError("Admin cannot promote to sysadmin") update_data = {} if data.get("email") is not None: # Check for duplicate email others = db.collection("admin_users").where("email", "==", data["email"]).limit(1).get() for other in others: if other.id != staff_id: raise AuthorizationError("A staff member with this email already exists") update_data["email"] = data["email"] if data.get("name") is not None: update_data["name"] = data["name"] if data.get("role") is not None: if data["role"] not in VALID_ROLES: raise AuthorizationError(f"Invalid role: {data['role']}") update_data["role"] = data["role"] if data.get("is_active") is not None: update_data["is_active"] = data["is_active"] if "permissions" in data: update_data["permissions"] = data["permissions"] if update_data: doc_ref.update(update_data) updated = {**existing, **update_data} return _staff_doc_to_response(staff_id, updated) async def update_staff_password(staff_id: str, new_password: str, current_user_role: str) -> dict: db = get_db() doc_ref = db.collection("admin_users").document(staff_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Staff member not found") existing = doc.to_dict() # Admin cannot change sysadmin password if current_user_role == "admin" and existing.get("role") == "sysadmin": raise AuthorizationError("Admin cannot modify sysadmin accounts") hashed = hash_password(new_password) doc_ref.update({"hashed_password": hashed}) return {"message": "Password updated successfully"} async def delete_staff(staff_id: str, current_user_role: str, current_user_id: str) -> dict: db = get_db() doc_ref = db.collection("admin_users").document(staff_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Staff member not found") existing = doc.to_dict() # Cannot delete self if staff_id == current_user_id: raise AuthorizationError("Cannot delete your own account") # Admin cannot delete sysadmin if current_user_role == "admin" and existing.get("role") == "sysadmin": raise AuthorizationError("Admin cannot delete sysadmin accounts") doc_ref.delete() return {"message": "Staff member deleted"}