Files
bonamin 5dbb775308 Backend: move JWT logic to deps.py, fix circular import in auth
Token creation/decoding/blacklisting was split across auth.py and deps.py
causing a circular import. Consolidate make_token, decode_token, and
blacklist_token in deps.py; auth.py now imports from there.
Also switches /login to accept JSON body (username+pin) instead of
form-encoded, and returns a proper user object in the response.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 09:26:49 +03:00

65 lines
2.2 KiB
Python

import jwt
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from database import get_db
from config import settings
from models.user import User
bearer = HTTPBearer()
# In-memory token blacklist (cleared on restart — acceptable for local use)
_blacklisted_tokens: set[str] = set()
TOKEN_EXPIRY_HOURS = 8
def make_token(user: User) -> str:
payload = {
"sub": str(user.id),
"username": user.username,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
def decode_token(token: str) -> dict:
if token in _blacklisted_tokens:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked")
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
def blacklist_token(token: str):
_blacklisted_tokens.add(token)
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer),
db: Session = Depends(get_db),
) -> User:
payload = decode_token(credentials.credentials)
user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
def require_manager(user: User = Depends(get_current_user)) -> User:
if user.role not in ("manager", "sysadmin"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Manager access required")
return user
def require_sysadmin(user: User = Depends(get_current_user)) -> User:
if user.role != "sysadmin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Sysadmin access required")
return user