import jwt import bcrypt from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from database import get_db from config import settings from models.user import User from schemas.auth import LoginRequest, TokenResponse from schemas.user import UserOut router = APIRouter() TOKEN_EXPIRY_HOURS = 8 # In-memory token blacklist (cleared on restart — acceptable for local use) _blacklisted_tokens: set[str] = set() def _make_token(user: User) -> str: payload = { "sub": str(user.id), "username": user.username, "role": user.role, "exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS), } return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") def decode_token(token: str) -> dict: if token in _blacklisted_tokens: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked") try: return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") @router.post("/login", response_model=TokenResponse) def login(body: LoginRequest, db: Session = Depends(get_db)): user = db.query(User).filter(User.username == body.username, User.is_active == True).first() if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") token = _make_token(user) return TokenResponse(access_token=token, user=UserOut.model_validate(user)) @router.post("/refresh", response_model=TokenResponse) def refresh(token: str, db: Session = Depends(get_db)): payload = decode_token(token) user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") _blacklisted_tokens.add(token) new_token = _make_token(user) return TokenResponse(access_token=new_token, user=UserOut.model_validate(user)) @router.post("/logout") def logout(token: str): _blacklisted_tokens.add(token) return {"status": "logged out"}