Backend: move JWT logic to deps.py, fix circular import in auth

Token creation/decoding/blacklisting was split across auth.py and deps.py
causing a circular import. Consolidate make_token, decode_token, and
blacklist_token in deps.py; auth.py now imports from there.
Also switches /login to accept JSON body (username+pin) instead of
form-encoded, and returns a proper user object in the response.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-24 09:26:49 +03:00
parent e283b8b50f
commit 5dbb775308
2 changed files with 39 additions and 34 deletions

View File

@@ -1,49 +1,22 @@
import jwt
import bcrypt import bcrypt
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from database import get_db from database import get_db
from config import settings
from models.user import User from models.user import User
from schemas.auth import LoginRequest, TokenResponse from schemas.auth import LoginRequest, TokenResponse
from schemas.user import UserOut from schemas.user import UserOut
from routers.deps import get_current_user, make_token, decode_token, blacklist_token
router = APIRouter() router = APIRouter()
TOKEN_EXPIRY_HOURS = 8
# In-memory token blacklist (cleared on restart — acceptable for local use)
_blacklisted_tokens: set[str] = set()
def _make_token(user: User) -> str:
payload = {
"sub": str(user.id),
"username": user.username,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
def decode_token(token: str) -> dict:
if token in _blacklisted_tokens:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked")
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
@router.post("/login", response_model=TokenResponse) @router.post("/login", response_model=TokenResponse)
def login(body: LoginRequest, db: Session = Depends(get_db)): def login(body: LoginRequest, db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == body.username, User.is_active == True).first() user = db.query(User).filter(User.username == body.username, User.is_active == True).first()
if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()): if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
token = _make_token(user) token = make_token(user)
return TokenResponse(access_token=token, user=UserOut.model_validate(user)) return TokenResponse(access_token=token, user=UserOut.model_validate(user))
@@ -53,17 +26,17 @@ def refresh(token: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first() user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first()
if not user: if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
_blacklisted_tokens.add(token) blacklist_token(token)
new_token = _make_token(user) new_token = make_token(user)
return TokenResponse(access_token=new_token, user=UserOut.model_validate(user)) return TokenResponse(access_token=new_token, user=UserOut.model_validate(user))
@router.post("/logout") @router.post("/logout")
def logout(token: str): def logout(token: str):
_blacklisted_tokens.add(token) blacklist_token(token)
return {"status": "logged out"} return {"status": "logged out"}
@router.get("/me", response_model=UserOut) @router.get("/me", response_model=UserOut)
def me(db: Session = Depends(get_db), user: User = Depends(get_current_user)): def me(user: User = Depends(get_current_user)):
return user return user

View File

@@ -1,13 +1,45 @@
import jwt
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from database import get_db from database import get_db
from config import settings
from models.user import User from models.user import User
from routers.auth import decode_token
bearer = HTTPBearer() bearer = HTTPBearer()
# In-memory token blacklist (cleared on restart — acceptable for local use)
_blacklisted_tokens: set[str] = set()
TOKEN_EXPIRY_HOURS = 8
def make_token(user: User) -> str:
payload = {
"sub": str(user.id),
"username": user.username,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
def decode_token(token: str) -> dict:
if token in _blacklisted_tokens:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked")
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
def blacklist_token(token: str):
_blacklisted_tokens.add(token)
def get_current_user( def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer), credentials: HTTPAuthorizationCredentials = Depends(bearer),