Files
xenia-pos-local/local_backend/routers/auth.py
bonamin 8ba8c95ecd feat: initial commit — local services (backend + manager dashboard + waiter PWA)
Includes all work to date:
- local_backend: FastAPI backend with products, orders, tables, shifts, cloud sync
- manager_dashboard: React manager UI with product/category management, reports, settings
- waiter_pwa: React PWA for waiter devices
- Category reparent endpoint and UI
- Waiter domain: local_ip sent on heartbeat, waiter_domain persisted from cloud response
- QR code modal in AppInfoTab for waiter domain
- Product form: number input spinners removed, category pre-selected on new product
- Category row: count badge moved to far right

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 14:04:38 +03:00

175 lines
7.2 KiB
Python

import bcrypt
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from database import get_db
from models.user import User
from schemas.auth import LoginRequest, TokenResponse, UpdateMeRequest
from pydantic import BaseModel as _PydanticBase
class LoginByIdRequest(_PydanticBase):
waiter_id: int
pin: str
from schemas.user import UserOut
from routers.deps import get_current_user, make_token, decode_token, blacklist_token
router = APIRouter()
class NoAuthLoginRequest(_PydanticBase):
username: str
@router.post("/login-no-auth", response_model=TokenResponse)
def login_no_auth(body: NoAuthLoginRequest, db: Session = Depends(get_db)):
"""Login with no credentials — only works when security.login_method = 'none'."""
from models.settings import PosSettings
setting = db.query(PosSettings).filter(PosSettings.key == "security.login_method").first()
if not setting or setting.value != "none":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No-auth login is not enabled.")
user = db.query(User).filter(User.username == body.username, User.is_active == True).first()
if not user or user.role not in ("manager", "sysadmin"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
token = make_token(user)
return TokenResponse(access_token=token, user=UserOut.model_validate(user))
@router.post("/login", response_model=TokenResponse)
def login(body: LoginRequest, db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == body.username, User.is_active == True).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
authenticated = False
if body.password and user.password_hash:
authenticated = bcrypt.checkpw(body.password.encode(), user.password_hash.encode())
elif body.pin and user.pin_hash:
authenticated = bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode())
if not authenticated:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
token = make_token(user)
return TokenResponse(access_token=token, user=UserOut.model_validate(user))
@router.post("/login-by-id", response_model=TokenResponse)
def login_by_id(body: LoginByIdRequest, db: Session = Depends(get_db)):
"""Login using waiter id + PIN (used by the waiter-picker login screen)."""
user = db.query(User).filter(User.id == body.waiter_id, User.is_active == True).first()
if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Λανθασμένο PIN")
token = make_token(user)
return TokenResponse(access_token=token, user=UserOut.model_validate(user))
@router.post("/refresh", response_model=TokenResponse)
def refresh(token: str, db: Session = Depends(get_db)):
payload = decode_token(token)
user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
blacklist_token(token)
new_token = make_token(user)
return TokenResponse(access_token=new_token, user=UserOut.model_validate(user))
@router.post("/logout")
def logout(token: str):
blacklist_token(token)
return {"status": "logged out"}
@router.get("/me", response_model=UserOut)
def me(user: User = Depends(get_current_user)):
return user
# ─── Public manager list (login screen — no auth required) ───────────────────
class PublicManagerOut(_PydanticBase):
id: int
username: str
full_name: str | None
model_config = {"from_attributes": True}
@router.get("/managers", response_model=list[PublicManagerOut])
def public_manager_list(db: Session = Depends(get_db)):
"""Public endpoint — returns active manager/sysadmin accounts for login screen."""
managers = db.query(User).filter(
User.role.in_(["manager", "sysadmin"]),
User.is_active == True,
).all()
return [PublicManagerOut(id=m.id, username=m.username, full_name=m.full_name) for m in managers]
# ─── Public waiter list (login screen — no auth required) ────────────────────
from pydantic import BaseModel as _BaseModel
class PublicWaiterOut(_BaseModel):
id: int
full_name: str | None
nickname: str | None
avatar_url: str | None
on_shift: bool
model_config = {"from_attributes": True}
@router.get("/waiters", response_model=list[PublicWaiterOut])
def public_waiter_list(db: Session = Depends(get_db)):
"""Public endpoint — returns active waiters with on-shift flag. No auth required."""
from models.shift import WaiterShift
waiters = db.query(User).filter(User.role == "waiter", User.is_active == True).all()
on_shift_ids = {
row.waiter_id
for row in db.query(WaiterShift).filter(WaiterShift.ended_at == None).all()
}
return [
PublicWaiterOut(
id=w.id,
full_name=w.full_name,
nickname=w.nickname,
avatar_url=w.avatar_url,
on_shift=w.id in on_shift_ids,
)
for w in waiters
]
@router.patch("/me", response_model=UserOut)
def update_me(body: UpdateMeRequest, db: Session = Depends(get_db), user: User = Depends(get_current_user)):
# Password change — requires current_password verification
if body.new_password is not None:
if not body.current_password:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="current_password is required to set a new password")
if not user.password_hash:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Account has no password set")
if not bcrypt.checkpw(body.current_password.encode(), user.password_hash.encode()):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Current password is incorrect")
if len(body.new_password) < 6:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="New password must be at least 6 characters")
user.password_hash = bcrypt.hashpw(body.new_password.encode(), bcrypt.gensalt()).decode()
# PIN change — no current PIN required (already authenticated)
if body.new_pin is not None:
if len(body.new_pin) < 4:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="PIN must be at least 4 digits")
user.pin_hash = bcrypt.hashpw(body.new_pin.encode(), bcrypt.gensalt()).decode()
# Username change — check uniqueness
if body.username is not None and body.username != user.username:
conflict = db.query(User).filter(User.username == body.username, User.id != user.id).first()
if conflict:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Username already taken")
user.username = body.username
# Display name
if body.full_name is not None:
user.full_name = body.full_name
db.commit()
db.refresh(user)
return user