Phase 1 Complete by Claude Code

This commit is contained in:
2026-02-16 22:32:28 +02:00
parent 19c069949d
commit 5e2d4b6b1b
20 changed files with 692 additions and 32 deletions

View File

@@ -1 +1,47 @@
# TODO: JWT verification, role checks
from fastapi import Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError
from auth.utils import decode_access_token
from auth.models import TokenPayload, Role
from shared.exceptions import AuthenticationError, AuthorizationError
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> TokenPayload:
try:
payload = decode_access_token(credentials.credentials)
token_data = TokenPayload(
sub=payload["sub"],
email=payload["email"],
role=payload["role"],
name=payload["name"],
)
except (JWTError, KeyError):
raise AuthenticationError()
return token_data
def require_roles(*allowed_roles: Role):
async def role_checker(
current_user: TokenPayload = Depends(get_current_user),
) -> TokenPayload:
if current_user.role == Role.superadmin:
return current_user
if current_user.role not in [r.value for r in allowed_roles]:
raise AuthorizationError()
return current_user
return role_checker
# Pre-built convenience dependencies
require_superadmin = require_roles(Role.superadmin)
require_melody_access = require_roles(Role.superadmin, Role.melody_editor)
require_device_access = require_roles(Role.superadmin, Role.device_manager)
require_user_access = require_roles(Role.superadmin, Role.user_manager)
require_viewer = require_roles(
Role.superadmin, Role.melody_editor, Role.device_manager,
Role.user_manager, Role.viewer,
)

View File

@@ -1 +1,40 @@
# TODO: User/token Pydantic schemas
from pydantic import BaseModel
from typing import Optional
from enum import Enum
class Role(str, Enum):
superadmin = "superadmin"
melody_editor = "melody_editor"
device_manager = "device_manager"
user_manager = "user_manager"
viewer = "viewer"
class AdminUserInDB(BaseModel):
uid: str
email: str
hashed_password: str
name: str
role: Role
is_active: bool = True
class LoginRequest(BaseModel):
email: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
role: str
name: str
class TokenPayload(BaseModel):
sub: str
email: str
role: str
name: str
exp: Optional[int] = None

View File

@@ -1 +1,42 @@
# TODO: Login / token endpoints
from fastapi import APIRouter
from shared.firebase import get_db
from auth.models import LoginRequest, TokenResponse
from auth.utils import verify_password, create_access_token
from shared.exceptions import AuthenticationError
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest):
db = get_db()
if not db:
raise AuthenticationError("Service unavailable")
users_ref = db.collection("admin_users")
query = users_ref.where("email", "==", body.email).limit(1).get()
if not query:
raise AuthenticationError("Invalid email or password")
doc = query[0]
user_data = doc.to_dict()
if not user_data.get("is_active", True):
raise AuthenticationError("Account is disabled")
if not verify_password(body.password, user_data["hashed_password"]):
raise AuthenticationError("Invalid email or password")
token = create_access_token({
"sub": doc.id,
"email": user_data["email"],
"role": user_data["role"],
"name": user_data["name"],
})
return TokenResponse(
access_token=token,
role=user_data["role"],
name=user_data["name"],
)

View File

@@ -1 +1,35 @@
# TODO: Password hashing, token creation
from datetime import datetime, timedelta, timezone
from jose import jwt
from passlib.context import CryptContext
from config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(
minutes=settings.jwt_expiration_minutes
)
to_encode.update({"exp": expire})
return jwt.encode(
to_encode,
settings.jwt_secret_key,
algorithm=settings.jwt_algorithm,
)
def decode_access_token(token: str) -> dict:
return jwt.decode(
token,
settings.jwt_secret_key,
algorithms=[settings.jwt_algorithm],
)