Files
bellsystems-cp/backend/staff/service.py

179 lines
5.9 KiB
Python

from shared.firebase import get_db
from auth.utils import hash_password
from auth.models import default_permissions_for_role
from shared.exceptions import NotFoundError, AuthorizationError
import uuid
VALID_ROLES = ("sysadmin", "admin", "editor", "user")
def _staff_doc_to_response(doc_id: str, data: dict) -> dict:
return {
"id": doc_id,
"email": data.get("email", ""),
"name": data.get("name", ""),
"role": data.get("role", ""),
"is_active": data.get("is_active", True),
"permissions": data.get("permissions"),
}
async def list_staff(search: str = None, role_filter: str = None) -> dict:
db = get_db()
ref = db.collection("admin_users")
docs = ref.get()
staff = []
for doc in docs:
data = doc.to_dict()
if search:
s = search.lower()
if s not in (data.get("name", "").lower()) and s not in (data.get("email", "").lower()):
continue
if role_filter:
if data.get("role") != role_filter:
continue
staff.append(_staff_doc_to_response(doc.id, data))
return {"staff": staff, "total": len(staff)}
async def get_staff(staff_id: str) -> dict:
db = get_db()
doc = db.collection("admin_users").document(staff_id).get()
if not doc.exists:
raise NotFoundError("Staff member not found")
return _staff_doc_to_response(doc.id, doc.to_dict())
async def get_staff_me(user_sub: str) -> dict:
db = get_db()
doc = db.collection("admin_users").document(user_sub).get()
if not doc.exists:
raise NotFoundError("Staff member not found")
return _staff_doc_to_response(doc.id, doc.to_dict())
async def create_staff(data: dict, current_user_role: str) -> dict:
role = data.get("role", "user")
if role not in VALID_ROLES:
raise AuthorizationError(f"Invalid role: {role}")
# Admin cannot create sysadmin
if current_user_role == "admin" and role == "sysadmin":
raise AuthorizationError("Admin cannot create sysadmin accounts")
db = get_db()
# Check for duplicate email
existing = db.collection("admin_users").where("email", "==", data["email"]).limit(1).get()
if existing:
raise AuthorizationError("A staff member with this email already exists")
uid = str(uuid.uuid4())
hashed = hash_password(data["password"])
# Set default permissions for editor/user if not provided
permissions = data.get("permissions")
if permissions is None and role in ("editor", "user"):
permissions = default_permissions_for_role(role)
doc_data = {
"uid": uid,
"email": data["email"],
"hashed_password": hashed,
"name": data["name"],
"role": role,
"is_active": True,
"permissions": permissions,
}
doc_ref = db.collection("admin_users").document(uid)
doc_ref.set(doc_data)
return _staff_doc_to_response(uid, doc_data)
async def update_staff(staff_id: str, data: dict, current_user_role: str, current_user_id: str) -> dict:
db = get_db()
doc_ref = db.collection("admin_users").document(staff_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Staff member not found")
existing = doc.to_dict()
# Admin cannot edit sysadmin accounts
if current_user_role == "admin" and existing.get("role") == "sysadmin":
raise AuthorizationError("Admin cannot modify sysadmin accounts")
# Admin cannot promote to sysadmin
if current_user_role == "admin" and data.get("role") == "sysadmin":
raise AuthorizationError("Admin cannot promote to sysadmin")
update_data = {}
if data.get("email") is not None:
# Check for duplicate email
others = db.collection("admin_users").where("email", "==", data["email"]).limit(1).get()
for other in others:
if other.id != staff_id:
raise AuthorizationError("A staff member with this email already exists")
update_data["email"] = data["email"]
if data.get("name") is not None:
update_data["name"] = data["name"]
if data.get("role") is not None:
if data["role"] not in VALID_ROLES:
raise AuthorizationError(f"Invalid role: {data['role']}")
update_data["role"] = data["role"]
if data.get("is_active") is not None:
update_data["is_active"] = data["is_active"]
if "permissions" in data:
update_data["permissions"] = data["permissions"]
if update_data:
doc_ref.update(update_data)
updated = {**existing, **update_data}
return _staff_doc_to_response(staff_id, updated)
async def update_staff_password(staff_id: str, new_password: str, current_user_role: str) -> dict:
db = get_db()
doc_ref = db.collection("admin_users").document(staff_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Staff member not found")
existing = doc.to_dict()
# Admin cannot change sysadmin password
if current_user_role == "admin" and existing.get("role") == "sysadmin":
raise AuthorizationError("Admin cannot modify sysadmin accounts")
hashed = hash_password(new_password)
doc_ref.update({"hashed_password": hashed})
return {"message": "Password updated successfully"}
async def delete_staff(staff_id: str, current_user_role: str, current_user_id: str) -> dict:
db = get_db()
doc_ref = db.collection("admin_users").document(staff_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Staff member not found")
existing = doc.to_dict()
# Cannot delete self
if staff_id == current_user_id:
raise AuthorizationError("Cannot delete your own account")
# Admin cannot delete sysadmin
if current_user_role == "admin" and existing.get("role") == "sysadmin":
raise AuthorizationError("Admin cannot delete sysadmin accounts")
doc_ref.delete()
return {"message": "Staff member deleted"}