179 lines
5.9 KiB
Python
179 lines
5.9 KiB
Python
from shared.firebase import get_db
|
|
from auth.utils import hash_password
|
|
from auth.models import default_permissions_for_role
|
|
from shared.exceptions import NotFoundError, AuthorizationError
|
|
import uuid
|
|
|
|
|
|
VALID_ROLES = ("sysadmin", "admin", "editor", "user")
|
|
|
|
|
|
def _staff_doc_to_response(doc_id: str, data: dict) -> dict:
|
|
return {
|
|
"id": doc_id,
|
|
"email": data.get("email", ""),
|
|
"name": data.get("name", ""),
|
|
"role": data.get("role", ""),
|
|
"is_active": data.get("is_active", True),
|
|
"permissions": data.get("permissions"),
|
|
}
|
|
|
|
|
|
async def list_staff(search: str = None, role_filter: str = None) -> dict:
|
|
db = get_db()
|
|
ref = db.collection("admin_users")
|
|
docs = ref.get()
|
|
|
|
staff = []
|
|
for doc in docs:
|
|
data = doc.to_dict()
|
|
if search:
|
|
s = search.lower()
|
|
if s not in (data.get("name", "").lower()) and s not in (data.get("email", "").lower()):
|
|
continue
|
|
if role_filter:
|
|
if data.get("role") != role_filter:
|
|
continue
|
|
staff.append(_staff_doc_to_response(doc.id, data))
|
|
|
|
return {"staff": staff, "total": len(staff)}
|
|
|
|
|
|
async def get_staff(staff_id: str) -> dict:
|
|
db = get_db()
|
|
doc = db.collection("admin_users").document(staff_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Staff member not found")
|
|
return _staff_doc_to_response(doc.id, doc.to_dict())
|
|
|
|
|
|
async def get_staff_me(user_sub: str) -> dict:
|
|
db = get_db()
|
|
doc = db.collection("admin_users").document(user_sub).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Staff member not found")
|
|
return _staff_doc_to_response(doc.id, doc.to_dict())
|
|
|
|
|
|
async def create_staff(data: dict, current_user_role: str) -> dict:
|
|
role = data.get("role", "user")
|
|
if role not in VALID_ROLES:
|
|
raise AuthorizationError(f"Invalid role: {role}")
|
|
|
|
# Admin cannot create sysadmin
|
|
if current_user_role == "admin" and role == "sysadmin":
|
|
raise AuthorizationError("Admin cannot create sysadmin accounts")
|
|
|
|
db = get_db()
|
|
|
|
# Check for duplicate email
|
|
existing = db.collection("admin_users").where("email", "==", data["email"]).limit(1).get()
|
|
if existing:
|
|
raise AuthorizationError("A staff member with this email already exists")
|
|
|
|
uid = str(uuid.uuid4())
|
|
hashed = hash_password(data["password"])
|
|
|
|
# Set default permissions for editor/user if not provided
|
|
permissions = data.get("permissions")
|
|
if permissions is None and role in ("editor", "user"):
|
|
permissions = default_permissions_for_role(role)
|
|
|
|
doc_data = {
|
|
"uid": uid,
|
|
"email": data["email"],
|
|
"hashed_password": hashed,
|
|
"name": data["name"],
|
|
"role": role,
|
|
"is_active": True,
|
|
"permissions": permissions,
|
|
}
|
|
|
|
doc_ref = db.collection("admin_users").document(uid)
|
|
doc_ref.set(doc_data)
|
|
|
|
return _staff_doc_to_response(uid, doc_data)
|
|
|
|
|
|
async def update_staff(staff_id: str, data: dict, current_user_role: str, current_user_id: str) -> dict:
|
|
db = get_db()
|
|
doc_ref = db.collection("admin_users").document(staff_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Staff member not found")
|
|
|
|
existing = doc.to_dict()
|
|
|
|
# Admin cannot edit sysadmin accounts
|
|
if current_user_role == "admin" and existing.get("role") == "sysadmin":
|
|
raise AuthorizationError("Admin cannot modify sysadmin accounts")
|
|
|
|
# Admin cannot promote to sysadmin
|
|
if current_user_role == "admin" and data.get("role") == "sysadmin":
|
|
raise AuthorizationError("Admin cannot promote to sysadmin")
|
|
|
|
update_data = {}
|
|
if data.get("email") is not None:
|
|
# Check for duplicate email
|
|
others = db.collection("admin_users").where("email", "==", data["email"]).limit(1).get()
|
|
for other in others:
|
|
if other.id != staff_id:
|
|
raise AuthorizationError("A staff member with this email already exists")
|
|
update_data["email"] = data["email"]
|
|
if data.get("name") is not None:
|
|
update_data["name"] = data["name"]
|
|
if data.get("role") is not None:
|
|
if data["role"] not in VALID_ROLES:
|
|
raise AuthorizationError(f"Invalid role: {data['role']}")
|
|
update_data["role"] = data["role"]
|
|
if data.get("is_active") is not None:
|
|
update_data["is_active"] = data["is_active"]
|
|
if "permissions" in data:
|
|
update_data["permissions"] = data["permissions"]
|
|
|
|
if update_data:
|
|
doc_ref.update(update_data)
|
|
|
|
updated = {**existing, **update_data}
|
|
return _staff_doc_to_response(staff_id, updated)
|
|
|
|
|
|
async def update_staff_password(staff_id: str, new_password: str, current_user_role: str) -> dict:
|
|
db = get_db()
|
|
doc_ref = db.collection("admin_users").document(staff_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Staff member not found")
|
|
|
|
existing = doc.to_dict()
|
|
|
|
# Admin cannot change sysadmin password
|
|
if current_user_role == "admin" and existing.get("role") == "sysadmin":
|
|
raise AuthorizationError("Admin cannot modify sysadmin accounts")
|
|
|
|
hashed = hash_password(new_password)
|
|
doc_ref.update({"hashed_password": hashed})
|
|
|
|
return {"message": "Password updated successfully"}
|
|
|
|
|
|
async def delete_staff(staff_id: str, current_user_role: str, current_user_id: str) -> dict:
|
|
db = get_db()
|
|
doc_ref = db.collection("admin_users").document(staff_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Staff member not found")
|
|
|
|
existing = doc.to_dict()
|
|
|
|
# Cannot delete self
|
|
if staff_id == current_user_id:
|
|
raise AuthorizationError("Cannot delete your own account")
|
|
|
|
# Admin cannot delete sysadmin
|
|
if current_user_role == "admin" and existing.get("role") == "sysadmin":
|
|
raise AuthorizationError("Admin cannot delete sysadmin accounts")
|
|
|
|
doc_ref.delete()
|
|
return {"message": "Staff member deleted"}
|