from fastapi import Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError from auth.utils import decode_access_token from auth.models import TokenPayload, Role from shared.exceptions import AuthenticationError, AuthorizationError security = HTTPBearer() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> TokenPayload: try: payload = decode_access_token(credentials.credentials) token_data = TokenPayload( sub=payload["sub"], email=payload["email"], role=payload["role"], name=payload["name"], ) except (JWTError, KeyError): raise AuthenticationError() return token_data def require_roles(*allowed_roles: Role): async def role_checker( current_user: TokenPayload = Depends(get_current_user), ) -> TokenPayload: if current_user.role == Role.superadmin: return current_user if current_user.role not in [r.value for r in allowed_roles]: raise AuthorizationError() return current_user return role_checker # Pre-built convenience dependencies require_superadmin = require_roles(Role.superadmin) require_melody_access = require_roles(Role.superadmin, Role.melody_editor) require_device_access = require_roles(Role.superadmin, Role.device_manager) require_user_access = require_roles(Role.superadmin, Role.user_manager) require_viewer = require_roles( Role.superadmin, Role.melody_editor, Role.device_manager, Role.user_manager, Role.viewer, )