Files
bellsystems-cp/backend/auth/router.py
2026-04-17 15:44:17 +03:00

75 lines
2.0 KiB
Python

from fastapi import APIRouter, Depends, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database.postgres import get_pg_session
from staff.orm import Staff
from auth.models import LoginRequest, TokenResponse
from auth.utils import verify_password, create_access_token
from shared.audit import log_action
from shared.exceptions import AuthenticationError
router = APIRouter(prefix="/api/auth", tags=["auth"])
_ROLE_MAP = {
"superadmin": "sysadmin",
"melody_editor": "editor",
"device_manager": "editor",
"user_manager": "editor",
"viewer": "user",
"staff": "user",
}
@router.post("/login", response_model=TokenResponse)
async def login(
body: LoginRequest,
request: Request,
db: AsyncSession = Depends(get_pg_session),
):
result = await db.execute(
select(Staff).where(Staff.email == body.email).limit(1)
)
staff = result.scalar_one_or_none()
if staff is None:
raise AuthenticationError("Invalid email or password")
if not staff.is_active:
raise AuthenticationError("Account is disabled")
if not verify_password(body.password, staff.hashed_password):
raise AuthenticationError("Invalid email or password")
role = _ROLE_MAP.get(staff.role, staff.role)
token = create_access_token({
"sub": staff.id,
"email": staff.email,
"role": role,
"name": staff.name,
})
permissions = None
if role in ("editor", "user"):
permissions = staff.permissions
await log_action(
db,
actor_id=staff.id,
actor_name=staff.name,
action="LOGIN",
entity_type="staff",
entity_id=staff.id,
entity_label=staff.email,
meta={"ip": request.client.host if request.client else None},
)
await db.commit()
return TokenResponse(
access_token=token,
role=role,
name=staff.name,
permissions=permissions,
)