100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
from fastapi import Depends
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from jose import JWTError
|
|
from auth.utils import decode_access_token
|
|
from auth.models import TokenPayload, Role
|
|
from shared.exceptions import AuthenticationError, AuthorizationError
|
|
from shared.firebase import get_db
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
) -> TokenPayload:
|
|
try:
|
|
payload = decode_access_token(credentials.credentials)
|
|
token_data = TokenPayload(
|
|
sub=payload["sub"],
|
|
email=payload["email"],
|
|
role=payload["role"],
|
|
name=payload["name"],
|
|
)
|
|
except (JWTError, KeyError):
|
|
raise AuthenticationError()
|
|
return token_data
|
|
|
|
|
|
def require_roles(*allowed_roles: Role):
|
|
async def role_checker(
|
|
current_user: TokenPayload = Depends(get_current_user),
|
|
) -> TokenPayload:
|
|
if current_user.role == Role.sysadmin:
|
|
return current_user
|
|
if current_user.role not in [r.value for r in allowed_roles]:
|
|
raise AuthorizationError()
|
|
return current_user
|
|
return role_checker
|
|
|
|
|
|
async def _get_user_permissions(user: TokenPayload) -> dict:
|
|
"""Fetch permissions from Firestore for the given user."""
|
|
if user.role in (Role.sysadmin, Role.admin):
|
|
return None # Full access
|
|
db = get_db()
|
|
if not db:
|
|
raise AuthorizationError()
|
|
doc = db.collection("admin_users").document(user.sub).get()
|
|
if not doc.exists:
|
|
raise AuthorizationError()
|
|
data = doc.to_dict()
|
|
return data.get("permissions")
|
|
|
|
|
|
def require_permission(section: str, action: str):
|
|
"""Check granular permission for a section and action.
|
|
section: 'melodies', 'devices', 'app_users', 'equipment', 'mqtt'
|
|
action: 'view', 'add', 'edit', 'delete' (or ignored for mqtt)
|
|
"""
|
|
async def permission_checker(
|
|
current_user: TokenPayload = Depends(get_current_user),
|
|
) -> TokenPayload:
|
|
# sysadmin and admin have full access
|
|
if current_user.role in (Role.sysadmin, Role.admin):
|
|
return current_user
|
|
|
|
permissions = await _get_user_permissions(current_user)
|
|
if not permissions:
|
|
raise AuthorizationError()
|
|
|
|
if section == "mqtt":
|
|
if not permissions.get("mqtt", False):
|
|
raise AuthorizationError()
|
|
return current_user
|
|
|
|
section_perms = permissions.get(section)
|
|
if not section_perms:
|
|
raise AuthorizationError()
|
|
|
|
if isinstance(section_perms, dict):
|
|
if not section_perms.get(action, False):
|
|
raise AuthorizationError()
|
|
else:
|
|
raise AuthorizationError()
|
|
|
|
return current_user
|
|
return permission_checker
|
|
|
|
|
|
# Pre-built convenience dependencies
|
|
require_sysadmin = require_roles(Role.sysadmin)
|
|
require_admin_or_above = require_roles(Role.sysadmin, Role.admin)
|
|
|
|
# Staff management: only sysadmin and admin
|
|
require_staff_management = require_roles(Role.sysadmin, Role.admin)
|
|
|
|
# Viewer-level: any authenticated user (actual permission check per-action)
|
|
require_any_authenticated = require_roles(
|
|
Role.sysadmin, Role.admin, Role.editor, Role.user,
|
|
)
|