Files

302 lines
12 KiB
Python

import os
import uuid
import json
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from sqlalchemy.orm import Session
from typing import List
from database import get_db
from models.product import Product, Category, ProductOption, ProductQuickOption, ProductIngredient, ProductPreferenceSet, ProductPreferenceChoice
from models.order import OrderItem
from models.user import User
from schemas.product import (
ProductCreate, ProductUpdate, ProductOut, ProductReorderItem,
CategoryCreate, CategoryUpdate, CategoryOut, CategoryReorderItem,
SubcategoryReorderItem, ParentGeneralReorderItem,
PreferenceSetCreate, ProductQuickOptionCreate,
)
from routers.deps import get_current_user, require_manager
router = APIRouter()
IMAGE_DIR = "/app/data/product_images"
def _replace_quick_options(db, product, quick_options):
for qo in product.quick_options:
db.delete(qo)
db.flush()
for i, qo in enumerate(quick_options):
db.add(ProductQuickOption(
product_id=product.id,
name=qo.name,
price=qo.price,
allow_multiple=qo.allow_multiple,
sort_order=qo.sort_order if qo.sort_order else i,
is_favorite=qo.is_favorite,
favorite_sort_order=qo.favorite_sort_order,
is_compact=qo.is_compact,
))
def _replace_options(db, product, options):
for opt in product.options:
db.delete(opt)
db.flush()
for opt in options:
sub_json = json.dumps([s.model_dump() for s in opt.sub_choices]) if opt.sub_choices else None
db.add(ProductOption(
product_id=product.id,
name=opt.name,
extra_cost=opt.extra_cost,
allow_multiple=opt.allow_multiple,
sub_choices=sub_json,
is_favorite=opt.is_favorite,
favorite_sort_order=opt.favorite_sort_order,
))
def _replace_ingredients(db, product, ingredients):
for ing in product.ingredients:
db.delete(ing)
db.flush()
for ing in ingredients:
db.add(ProductIngredient(product_id=product.id, **ing.model_dump()))
def _replace_preference_sets(db, product, sets: List[PreferenceSetCreate]):
for ps in product.preference_sets:
db.delete(ps)
db.flush()
for ps in sets:
shared_json = json.dumps(ps.shared_subset.model_dump()) if ps.shared_subset else None
new_set = ProductPreferenceSet(
product_id=product.id,
name=ps.name,
shared_subset=shared_json,
is_favorite=ps.is_favorite,
favorite_sort_order=ps.favorite_sort_order,
)
db.add(new_set)
db.flush()
created_choices = []
for ch in ps.choices:
sub_json = json.dumps([s.model_dump() for s in ch.sub_choices]) if ch.sub_choices else None
choice = ProductPreferenceChoice(
set_id=new_set.id,
name=ch.name,
extra_cost=ch.extra_cost,
sub_choices=sub_json,
disables_subset=ch.disables_subset,
)
db.add(choice)
db.flush()
created_choices.append(choice)
if ps.default_choice_index is not None and 0 <= ps.default_choice_index < len(created_choices):
new_set.default_choice_id = created_choices[ps.default_choice_index].id
# ── Categories ────────────────────────────────────────────────────────────────
@router.get("/categories", response_model=List[CategoryOut])
def list_categories(db: Session = Depends(get_db), user: User = Depends(get_current_user)):
return db.query(Category).order_by(Category.sort_order).all()
@router.post("/categories", response_model=CategoryOut, status_code=status.HTTP_201_CREATED)
def create_category(body: CategoryCreate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
# sort_order is among siblings (same parent_id level)
sibling_count = db.query(Category).filter(Category.parent_id == body.parent_id).count()
cat = Category(
name=body.name,
color=body.color,
sort_order=sibling_count,
parent_id=body.parent_id,
general_sort_order=body.general_sort_order,
)
db.add(cat)
db.commit()
db.refresh(cat)
return cat
@router.put("/categories/reorder", status_code=status.HTTP_204_NO_CONTENT)
def reorder_categories(items: List[CategoryReorderItem], db: Session = Depends(get_db), user: User = Depends(require_manager)):
for item in items:
cat = db.query(Category).filter(Category.id == item.id).first()
if cat:
cat.sort_order = item.sort_order
db.commit()
@router.put("/categories/reorder-subcategories", status_code=status.HTTP_204_NO_CONTENT)
def reorder_subcategories(items: List[SubcategoryReorderItem], db: Session = Depends(get_db), user: User = Depends(require_manager)):
"""Reorder sub-categories within their parent (sort_order among siblings)."""
for item in items:
cat = db.query(Category).filter(Category.id == item.id).first()
if cat:
cat.sort_order = item.sort_order
db.commit()
@router.put("/categories/reorder-general", status_code=status.HTTP_204_NO_CONTENT)
def reorder_general(items: List[ParentGeneralReorderItem], db: Session = Depends(get_db), user: User = Depends(require_manager)):
"""Update general_sort_order on parent categories (position of the General group)."""
for item in items:
cat = db.query(Category).filter(Category.id == item.id).first()
if cat:
cat.general_sort_order = item.general_sort_order
db.commit()
@router.put("/categories/{category_id}", response_model=CategoryOut)
def update_category(category_id: int, body: CategoryUpdate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
cat = db.query(Category).filter(Category.id == category_id).first()
if not cat:
raise HTTPException(status_code=404, detail="Category not found")
for field, value in body.model_dump(exclude_none=True).items():
setattr(cat, field, value)
db.commit()
db.refresh(cat)
return cat
@router.delete("/categories/{category_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_category(category_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)):
cat = db.query(Category).filter(Category.id == category_id).first()
if not cat:
raise HTTPException(status_code=404, detail="Category not found")
db.delete(cat)
db.commit()
# ── Products ──────────────────────────────────────────────────────────────────
@router.get("/", response_model=List[ProductOut])
def list_products(all: bool = False, db: Session = Depends(get_db), user: User = Depends(get_current_user)):
q = db.query(Product)
if not all or user.role not in ("manager", "sysadmin"):
# Waiters only see active, available products
q = q.filter(Product.is_available == True, Product.lifecycle_status == "active")
return q.order_by(Product.sort_order, Product.id).all()
@router.put("/reorder", status_code=status.HTTP_204_NO_CONTENT)
def reorder_products(items: List[ProductReorderItem], db: Session = Depends(get_db), user: User = Depends(require_manager)):
for item in items:
product = db.query(Product).filter(Product.id == item.id).first()
if product:
product.sort_order = item.sort_order
db.commit()
@router.post("/", response_model=ProductOut, status_code=status.HTTP_201_CREATED)
def create_product(body: ProductCreate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
data = body.model_dump(exclude={"quick_options", "options", "ingredients", "preference_sets"})
if data.get("sort_order") == 0:
data["sort_order"] = db.query(Product).count()
product = Product(**data)
db.add(product)
db.flush()
for i, qo in enumerate(body.quick_options):
db.add(ProductQuickOption(
product_id=product.id,
name=qo.name,
price=qo.price,
allow_multiple=qo.allow_multiple,
sort_order=qo.sort_order if qo.sort_order else i,
is_favorite=qo.is_favorite,
favorite_sort_order=qo.favorite_sort_order,
is_compact=qo.is_compact,
))
for opt in body.options:
sub_json = json.dumps([s.model_dump() for s in opt.sub_choices]) if opt.sub_choices else None
db.add(ProductOption(
product_id=product.id,
name=opt.name,
extra_cost=opt.extra_cost,
allow_multiple=opt.allow_multiple,
sub_choices=sub_json,
is_favorite=opt.is_favorite,
favorite_sort_order=opt.favorite_sort_order,
))
for ing in body.ingredients:
db.add(ProductIngredient(product_id=product.id, **ing.model_dump()))
_replace_preference_sets(db, product, body.preference_sets)
db.commit()
db.refresh(product)
return product
@router.put("/{product_id}", response_model=ProductOut)
def update_product(product_id: int, body: ProductUpdate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
for field, value in body.model_dump(exclude_none=True, exclude={"quick_options", "options", "ingredients", "preference_sets"}).items():
setattr(product, field, value)
if body.quick_options is not None:
_replace_quick_options(db, product, body.quick_options)
if body.options is not None:
_replace_options(db, product, body.options)
if body.ingredients is not None:
_replace_ingredients(db, product, body.ingredients)
if body.preference_sets is not None:
_replace_preference_sets(db, product, body.preference_sets)
db.commit()
db.refresh(product)
return product
@router.post("/{product_id}/image", response_model=ProductOut)
async def upload_product_image(product_id: int, file: UploadFile = File(...), db: Session = Depends(get_db), user: User = Depends(require_manager)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File must be an image")
os.makedirs(IMAGE_DIR, exist_ok=True)
if product.image_url:
old_path = os.path.join(IMAGE_DIR, os.path.basename(product.image_url))
if os.path.exists(old_path):
os.remove(old_path)
ext = file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else "jpg"
filename = f"{product_id}_{uuid.uuid4().hex[:8]}.{ext}"
filepath = os.path.join(IMAGE_DIR, filename)
contents = await file.read()
with open(filepath, "wb") as f:
f.write(contents)
product.image_url = f"/static/product_images/{filename}"
db.commit()
db.refresh(product)
return product
@router.delete("/{product_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_product(product_id: int, hard: bool = False, db: Session = Depends(get_db), user: User = Depends(require_manager)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
if hard:
has_orders = db.query(OrderItem).filter(OrderItem.product_id == product_id).first()
if has_orders:
raise HTTPException(
status_code=400,
detail="Cannot permanently delete a product that appears in past orders. Archive it instead."
)
db.delete(product)
else:
# If product has order history, archive it; otherwise hard delete
has_orders = db.query(OrderItem).filter(OrderItem.product_id == product_id).first()
if has_orders:
product.lifecycle_status = "archived"
else:
db.delete(product)
db.commit()