Phase 1: scaffold local backend — models, schemas, routers, printer service, Docker
This commit is contained in:
64
local_backend/routers/auth.py
Normal file
64
local_backend/routers/auth.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import jwt
|
||||
import bcrypt
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from database import get_db
|
||||
from config import settings
|
||||
from models.user import User
|
||||
from schemas.auth import LoginRequest, TokenResponse
|
||||
from schemas.user import UserOut
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
TOKEN_EXPIRY_HOURS = 8
|
||||
# In-memory token blacklist (cleared on restart — acceptable for local use)
|
||||
_blacklisted_tokens: set[str] = set()
|
||||
|
||||
|
||||
def _make_token(user: User) -> str:
|
||||
payload = {
|
||||
"sub": str(user.id),
|
||||
"username": user.username,
|
||||
"role": user.role,
|
||||
"exp": datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRY_HOURS),
|
||||
}
|
||||
return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict:
|
||||
if token in _blacklisted_tokens:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked")
|
||||
try:
|
||||
return jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
||||
except jwt.ExpiredSignatureError:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
|
||||
except jwt.InvalidTokenError:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
||||
|
||||
|
||||
@router.post("/login", response_model=TokenResponse)
|
||||
def login(body: LoginRequest, db: Session = Depends(get_db)):
|
||||
user = db.query(User).filter(User.username == body.username, User.is_active == True).first()
|
||||
if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
|
||||
token = _make_token(user)
|
||||
return TokenResponse(access_token=token, user=UserOut.model_validate(user))
|
||||
|
||||
|
||||
@router.post("/refresh", response_model=TokenResponse)
|
||||
def refresh(token: str, db: Session = Depends(get_db)):
|
||||
payload = decode_token(token)
|
||||
user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first()
|
||||
if not user:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
|
||||
_blacklisted_tokens.add(token)
|
||||
new_token = _make_token(user)
|
||||
return TokenResponse(access_token=new_token, user=UserOut.model_validate(user))
|
||||
|
||||
|
||||
@router.post("/logout")
|
||||
def logout(token: str):
|
||||
_blacklisted_tokens.add(token)
|
||||
return {"status": "logged out"}
|
||||
Reference in New Issue
Block a user