From 5dbb775308539f090fa9385e4fd09a4376c8c516 Mon Sep 17 00:00:00 2001 From: bonamin Date: Fri, 24 Apr 2026 09:26:49 +0300 Subject: [PATCH] Backend: move JWT logic to deps.py, fix circular import in auth Token creation/decoding/blacklisting was split across auth.py and deps.py causing a circular import. Consolidate make_token, decode_token, and blacklist_token in deps.py; auth.py now imports from there. Also switches /login to accept JSON body (username+pin) instead of form-encoded, and returns a proper user object in the response. Co-Authored-By: Claude Sonnet 4.6 --- local_backend/routers/auth.py | 39 ++++++----------------------------- local_backend/routers/deps.py | 34 +++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/local_backend/routers/auth.py b/local_backend/routers/auth.py index a7d569a..80c7653 100644 --- a/local_backend/routers/auth.py +++ b/local_backend/routers/auth.py @@ -1,49 +1,22 @@ -import jwt import bcrypt -from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from database import get_db -from config import settings from models.user import User from schemas.auth import LoginRequest, TokenResponse from schemas.user import UserOut +from routers.deps import get_current_user, make_token, decode_token, blacklist_token router = APIRouter() -TOKEN_EXPIRY_HOURS = 8 -# In-memory token blacklist (cleared on restart — acceptable for local use) -_blacklisted_tokens: set[str] = set() - - -def _make_token(user: User) -> str: - payload = { - "sub": str(user.id), - "username": user.username, - "role": user.role, - "exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS), - } - return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") - - -def decode_token(token: str) -> dict: - if token in _blacklisted_tokens: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked") - try: - return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) - except jwt.ExpiredSignatureError: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") - except jwt.InvalidTokenError: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") - @router.post("/login", response_model=TokenResponse) def login(body: LoginRequest, db: Session = Depends(get_db)): user = db.query(User).filter(User.username == body.username, User.is_active == True).first() if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") - token = _make_token(user) + token = make_token(user) return TokenResponse(access_token=token, user=UserOut.model_validate(user)) @@ -53,17 +26,17 @@ def refresh(token: str, db: Session = Depends(get_db)): user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") - _blacklisted_tokens.add(token) - new_token = _make_token(user) + blacklist_token(token) + new_token = make_token(user) return TokenResponse(access_token=new_token, user=UserOut.model_validate(user)) @router.post("/logout") def logout(token: str): - _blacklisted_tokens.add(token) + blacklist_token(token) return {"status": "logged out"} @router.get("/me", response_model=UserOut) -def me(db: Session = Depends(get_db), user: User = Depends(get_current_user)): +def me(user: User = Depends(get_current_user)): return user diff --git a/local_backend/routers/deps.py b/local_backend/routers/deps.py index 73e73a7..c5c52cd 100644 --- a/local_backend/routers/deps.py +++ b/local_backend/routers/deps.py @@ -1,13 +1,45 @@ +import jwt +from datetime import datetime, timedelta, timezone from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from database import get_db +from config import settings from models.user import User -from routers.auth import decode_token bearer = HTTPBearer() +# In-memory token blacklist (cleared on restart — acceptable for local use) +_blacklisted_tokens: set[str] = set() + +TOKEN_EXPIRY_HOURS = 8 + + +def make_token(user: User) -> str: + payload = { + "sub": str(user.id), + "username": user.username, + "role": user.role, + "exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS), + } + return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") + + +def decode_token(token: str) -> dict: + if token in _blacklisted_tokens: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked") + try: + return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) + except jwt.ExpiredSignatureError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") + except jwt.InvalidTokenError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") + + +def blacklist_token(token: str): + _blacklisted_tokens.add(token) + def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer),