import bcrypt from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from database import get_db from models.user import User from schemas.auth import LoginRequest, TokenResponse, UpdateMeRequest from pydantic import BaseModel as _PydanticBase class LoginByIdRequest(_PydanticBase): waiter_id: int pin: str from schemas.user import UserOut from routers.deps import get_current_user, make_token, decode_token, blacklist_token router = APIRouter() class NoAuthLoginRequest(_PydanticBase): username: str @router.post("/login-no-auth", response_model=TokenResponse) def login_no_auth(body: NoAuthLoginRequest, db: Session = Depends(get_db)): """Login with no credentials — only works when security.login_method = 'none'.""" from models.settings import PosSettings setting = db.query(PosSettings).filter(PosSettings.key == "security.login_method").first() if not setting or setting.value != "none": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No-auth login is not enabled.") user = db.query(User).filter(User.username == body.username, User.is_active == True).first() if not user or user.role not in ("manager", "sysadmin"): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") token = make_token(user) return TokenResponse(access_token=token, user=UserOut.model_validate(user)) @router.post("/login", response_model=TokenResponse) def login(body: LoginRequest, db: Session = Depends(get_db)): user = db.query(User).filter(User.username == body.username, User.is_active == True).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") authenticated = False if body.password and user.password_hash: authenticated = bcrypt.checkpw(body.password.encode(), user.password_hash.encode()) elif body.pin and user.pin_hash: authenticated = bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()) if not authenticated: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") token = make_token(user) return TokenResponse(access_token=token, user=UserOut.model_validate(user)) @router.post("/login-by-id", response_model=TokenResponse) def login_by_id(body: LoginByIdRequest, db: Session = Depends(get_db)): """Login using waiter id + PIN (used by the waiter-picker login screen).""" user = db.query(User).filter(User.id == body.waiter_id, User.is_active == True).first() if not user or not bcrypt.checkpw(body.pin.encode(), user.pin_hash.encode()): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Λανθασμένο PIN") token = make_token(user) return TokenResponse(access_token=token, user=UserOut.model_validate(user)) @router.post("/refresh", response_model=TokenResponse) def refresh(token: str, db: Session = Depends(get_db)): payload = decode_token(token) user = db.query(User).filter(User.id == int(payload["sub"]), User.is_active == True).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") blacklist_token(token) new_token = make_token(user) return TokenResponse(access_token=new_token, user=UserOut.model_validate(user)) @router.post("/logout") def logout(token: str): blacklist_token(token) return {"status": "logged out"} @router.get("/me", response_model=UserOut) def me(user: User = Depends(get_current_user)): return user # ─── Public manager list (login screen — no auth required) ─────────────────── class PublicManagerOut(_PydanticBase): id: int username: str full_name: str | None model_config = {"from_attributes": True} @router.get("/managers", response_model=list[PublicManagerOut]) def public_manager_list(db: Session = Depends(get_db)): """Public endpoint — returns active manager/sysadmin accounts for login screen.""" managers = db.query(User).filter( User.role.in_(["manager", "sysadmin"]), User.is_active == True, ).all() return [PublicManagerOut(id=m.id, username=m.username, full_name=m.full_name) for m in managers] # ─── Public waiter list (login screen — no auth required) ──────────────────── from pydantic import BaseModel as _BaseModel class PublicWaiterOut(_BaseModel): id: int full_name: str | None nickname: str | None avatar_url: str | None on_shift: bool model_config = {"from_attributes": True} @router.get("/waiters", response_model=list[PublicWaiterOut]) def public_waiter_list(db: Session = Depends(get_db)): """Public endpoint — returns active waiters with on-shift flag. No auth required.""" from models.shift import WaiterShift waiters = db.query(User).filter(User.role == "waiter", User.is_active == True).all() on_shift_ids = { row.waiter_id for row in db.query(WaiterShift).filter(WaiterShift.ended_at == None).all() } return [ PublicWaiterOut( id=w.id, full_name=w.full_name, nickname=w.nickname, avatar_url=w.avatar_url, on_shift=w.id in on_shift_ids, ) for w in waiters ] @router.patch("/me", response_model=UserOut) def update_me(body: UpdateMeRequest, db: Session = Depends(get_db), user: User = Depends(get_current_user)): # Password change — requires current_password verification if body.new_password is not None: if not body.current_password: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="current_password is required to set a new password") if not user.password_hash: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Account has no password set") if not bcrypt.checkpw(body.current_password.encode(), user.password_hash.encode()): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Current password is incorrect") if len(body.new_password) < 6: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="New password must be at least 6 characters") user.password_hash = bcrypt.hashpw(body.new_password.encode(), bcrypt.gensalt()).decode() # PIN change — no current PIN required (already authenticated) if body.new_pin is not None: if len(body.new_pin) < 4: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="PIN must be at least 4 digits") user.pin_hash = bcrypt.hashpw(body.new_pin.encode(), bcrypt.gensalt()).decode() # Username change — check uniqueness if body.username is not None and body.username != user.username: conflict = db.query(User).filter(User.username == body.username, User.id != user.id).first() if conflict: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Username already taken") user.username = body.username # Display name if body.full_name is not None: user.full_name = body.full_name db.commit() db.refresh(user) return user