Added Roles and Permissions. Some minor UI fixes
This commit is contained in:
@@ -4,6 +4,7 @@ from jose import JWTError
|
||||
from auth.utils import decode_access_token
|
||||
from auth.models import TokenPayload, Role
|
||||
from shared.exceptions import AuthenticationError, AuthorizationError
|
||||
from shared.firebase import get_db
|
||||
|
||||
security = HTTPBearer()
|
||||
|
||||
@@ -28,7 +29,7 @@ def require_roles(*allowed_roles: Role):
|
||||
async def role_checker(
|
||||
current_user: TokenPayload = Depends(get_current_user),
|
||||
) -> TokenPayload:
|
||||
if current_user.role == Role.superadmin:
|
||||
if current_user.role == Role.sysadmin:
|
||||
return current_user
|
||||
if current_user.role not in [r.value for r in allowed_roles]:
|
||||
raise AuthorizationError()
|
||||
@@ -36,12 +37,63 @@ def require_roles(*allowed_roles: Role):
|
||||
return role_checker
|
||||
|
||||
|
||||
async def _get_user_permissions(user: TokenPayload) -> dict:
|
||||
"""Fetch permissions from Firestore for the given user."""
|
||||
if user.role in (Role.sysadmin, Role.admin):
|
||||
return None # Full access
|
||||
db = get_db()
|
||||
if not db:
|
||||
raise AuthorizationError()
|
||||
doc = db.collection("admin_users").document(user.sub).get()
|
||||
if not doc.exists:
|
||||
raise AuthorizationError()
|
||||
data = doc.to_dict()
|
||||
return data.get("permissions")
|
||||
|
||||
|
||||
def require_permission(section: str, action: str):
|
||||
"""Check granular permission for a section and action.
|
||||
section: 'melodies', 'devices', 'app_users', 'equipment', 'mqtt'
|
||||
action: 'view', 'add', 'edit', 'delete' (or ignored for mqtt)
|
||||
"""
|
||||
async def permission_checker(
|
||||
current_user: TokenPayload = Depends(get_current_user),
|
||||
) -> TokenPayload:
|
||||
# sysadmin and admin have full access
|
||||
if current_user.role in (Role.sysadmin, Role.admin):
|
||||
return current_user
|
||||
|
||||
permissions = await _get_user_permissions(current_user)
|
||||
if not permissions:
|
||||
raise AuthorizationError()
|
||||
|
||||
if section == "mqtt":
|
||||
if not permissions.get("mqtt", False):
|
||||
raise AuthorizationError()
|
||||
return current_user
|
||||
|
||||
section_perms = permissions.get(section)
|
||||
if not section_perms:
|
||||
raise AuthorizationError()
|
||||
|
||||
if isinstance(section_perms, dict):
|
||||
if not section_perms.get(action, False):
|
||||
raise AuthorizationError()
|
||||
else:
|
||||
raise AuthorizationError()
|
||||
|
||||
return current_user
|
||||
return permission_checker
|
||||
|
||||
|
||||
# Pre-built convenience dependencies
|
||||
require_superadmin = require_roles(Role.superadmin)
|
||||
require_melody_access = require_roles(Role.superadmin, Role.melody_editor)
|
||||
require_device_access = require_roles(Role.superadmin, Role.device_manager)
|
||||
require_user_access = require_roles(Role.superadmin, Role.user_manager)
|
||||
require_viewer = require_roles(
|
||||
Role.superadmin, Role.melody_editor, Role.device_manager,
|
||||
Role.user_manager, Role.viewer,
|
||||
require_sysadmin = require_roles(Role.sysadmin)
|
||||
require_admin_or_above = require_roles(Role.sysadmin, Role.admin)
|
||||
|
||||
# Staff management: only sysadmin and admin
|
||||
require_staff_management = require_roles(Role.sysadmin, Role.admin)
|
||||
|
||||
# Viewer-level: any authenticated user (actual permission check per-action)
|
||||
require_any_authenticated = require_roles(
|
||||
Role.sysadmin, Role.admin, Role.editor, Role.user,
|
||||
)
|
||||
|
||||
@@ -4,11 +4,49 @@ from enum import Enum
|
||||
|
||||
|
||||
class Role(str, Enum):
|
||||
superadmin = "superadmin"
|
||||
melody_editor = "melody_editor"
|
||||
device_manager = "device_manager"
|
||||
user_manager = "user_manager"
|
||||
viewer = "viewer"
|
||||
sysadmin = "sysadmin"
|
||||
admin = "admin"
|
||||
editor = "editor"
|
||||
user = "user"
|
||||
|
||||
|
||||
class SectionPermissions(BaseModel):
|
||||
view: bool = False
|
||||
add: bool = False
|
||||
edit: bool = False
|
||||
delete: bool = False
|
||||
|
||||
|
||||
class StaffPermissions(BaseModel):
|
||||
melodies: SectionPermissions = SectionPermissions()
|
||||
devices: SectionPermissions = SectionPermissions()
|
||||
app_users: SectionPermissions = SectionPermissions()
|
||||
equipment: SectionPermissions = SectionPermissions()
|
||||
mqtt: bool = False
|
||||
|
||||
|
||||
# Default permissions per role
|
||||
def default_permissions_for_role(role: str) -> Optional[dict]:
|
||||
if role in ("sysadmin", "admin"):
|
||||
return None # Full access, permissions field not used
|
||||
full = {"view": True, "add": True, "edit": True, "delete": True}
|
||||
view_only = {"view": True, "add": False, "edit": False, "delete": False}
|
||||
if role == "editor":
|
||||
return {
|
||||
"melodies": full,
|
||||
"devices": full,
|
||||
"app_users": full,
|
||||
"equipment": full,
|
||||
"mqtt": True,
|
||||
}
|
||||
# user role - view only
|
||||
return {
|
||||
"melodies": view_only,
|
||||
"devices": view_only,
|
||||
"app_users": view_only,
|
||||
"equipment": view_only,
|
||||
"mqtt": False,
|
||||
}
|
||||
|
||||
|
||||
class AdminUserInDB(BaseModel):
|
||||
@@ -18,6 +56,7 @@ class AdminUserInDB(BaseModel):
|
||||
name: str
|
||||
role: Role
|
||||
is_active: bool = True
|
||||
permissions: Optional[StaffPermissions] = None
|
||||
|
||||
|
||||
class LoginRequest(BaseModel):
|
||||
@@ -30,6 +69,7 @@ class TokenResponse(BaseModel):
|
||||
token_type: str = "bearer"
|
||||
role: str
|
||||
name: str
|
||||
permissions: Optional[dict] = None
|
||||
|
||||
|
||||
class TokenPayload(BaseModel):
|
||||
|
||||
@@ -28,15 +28,32 @@ async def login(body: LoginRequest):
|
||||
if not verify_password(body.password, user_data["hashed_password"]):
|
||||
raise AuthenticationError("Invalid email or password")
|
||||
|
||||
role = user_data["role"]
|
||||
# Map legacy roles to new roles
|
||||
role_mapping = {
|
||||
"superadmin": "sysadmin",
|
||||
"melody_editor": "editor",
|
||||
"device_manager": "editor",
|
||||
"user_manager": "editor",
|
||||
"viewer": "user",
|
||||
}
|
||||
role = role_mapping.get(role, role)
|
||||
|
||||
token = create_access_token({
|
||||
"sub": doc.id,
|
||||
"email": user_data["email"],
|
||||
"role": user_data["role"],
|
||||
"role": role,
|
||||
"name": user_data["name"],
|
||||
})
|
||||
|
||||
# Get permissions for editor/user roles
|
||||
permissions = None
|
||||
if role in ("editor", "user"):
|
||||
permissions = user_data.get("permissions")
|
||||
|
||||
return TokenResponse(
|
||||
access_token=token,
|
||||
role=user_data["role"],
|
||||
role=role,
|
||||
name=user_data["name"],
|
||||
permissions=permissions,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user