import os import uuid import json from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File from sqlalchemy.orm import Session from typing import List from database import get_db from models.product import Product, Category, ProductOption, ProductIngredient, ProductPreferenceSet, ProductPreferenceChoice from models.order import OrderItem from models.user import User from schemas.product import ( ProductCreate, ProductUpdate, ProductOut, ProductReorderItem, CategoryCreate, CategoryUpdate, CategoryOut, CategoryReorderItem, PreferenceSetCreate, ) from routers.deps import get_current_user, require_manager router = APIRouter() IMAGE_DIR = "/app/data/product_images" def _replace_options(db, product, options): for opt in product.options: db.delete(opt) db.flush() for opt in options: sub_json = json.dumps([s.model_dump() for s in opt.sub_choices]) if opt.sub_choices else None db.add(ProductOption( product_id=product.id, name=opt.name, extra_cost=opt.extra_cost, sub_choices=sub_json, )) def _replace_ingredients(db, product, ingredients): for ing in product.ingredients: db.delete(ing) db.flush() for ing in ingredients: db.add(ProductIngredient(product_id=product.id, **ing.model_dump())) def _replace_preference_sets(db, product, sets: List[PreferenceSetCreate]): for ps in product.preference_sets: db.delete(ps) db.flush() for ps in sets: shared_json = json.dumps(ps.shared_subset.model_dump()) if ps.shared_subset else None new_set = ProductPreferenceSet( product_id=product.id, name=ps.name, shared_subset=shared_json, ) db.add(new_set) db.flush() created_choices = [] for ch in ps.choices: sub_json = json.dumps([s.model_dump() for s in ch.sub_choices]) if ch.sub_choices else None choice = ProductPreferenceChoice( set_id=new_set.id, name=ch.name, extra_cost=ch.extra_cost, sub_choices=sub_json, disables_subset=ch.disables_subset, ) db.add(choice) db.flush() created_choices.append(choice) if ps.default_choice_index is not None and 0 <= ps.default_choice_index < len(created_choices): new_set.default_choice_id = created_choices[ps.default_choice_index].id # ── Categories ──────────────────────────────────────────────────────────────── @router.get("/categories", response_model=List[CategoryOut]) def list_categories(db: Session = Depends(get_db), user: User = Depends(get_current_user)): return db.query(Category).order_by(Category.sort_order).all() @router.post("/categories", response_model=CategoryOut, status_code=status.HTTP_201_CREATED) def create_category(body: CategoryCreate, db: Session = Depends(get_db), user: User = Depends(require_manager)): max_order = db.query(Category).count() cat = Category(name=body.name, color=body.color, sort_order=max_order) db.add(cat) db.commit() db.refresh(cat) return cat @router.put("/categories/reorder", status_code=status.HTTP_204_NO_CONTENT) def reorder_categories(items: List[CategoryReorderItem], db: Session = Depends(get_db), user: User = Depends(require_manager)): for item in items: cat = db.query(Category).filter(Category.id == item.id).first() if cat: cat.sort_order = item.sort_order db.commit() @router.put("/categories/{category_id}", response_model=CategoryOut) def update_category(category_id: int, body: CategoryUpdate, db: Session = Depends(get_db), user: User = Depends(require_manager)): cat = db.query(Category).filter(Category.id == category_id).first() if not cat: raise HTTPException(status_code=404, detail="Category not found") for field, value in body.model_dump(exclude_none=True).items(): setattr(cat, field, value) db.commit() db.refresh(cat) return cat @router.delete("/categories/{category_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_category(category_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)): cat = db.query(Category).filter(Category.id == category_id).first() if not cat: raise HTTPException(status_code=404, detail="Category not found") db.delete(cat) db.commit() # ── Products ────────────────────────────────────────────────────────────────── @router.get("/", response_model=List[ProductOut]) def list_products(all: bool = False, db: Session = Depends(get_db), user: User = Depends(get_current_user)): q = db.query(Product) if not all or user.role not in ("manager", "sysadmin"): q = q.filter(Product.is_available == True) return q.order_by(Product.sort_order, Product.id).all() @router.put("/reorder", status_code=status.HTTP_204_NO_CONTENT) def reorder_products(items: List[ProductReorderItem], db: Session = Depends(get_db), user: User = Depends(require_manager)): for item in items: product = db.query(Product).filter(Product.id == item.id).first() if product: product.sort_order = item.sort_order db.commit() @router.post("/", response_model=ProductOut, status_code=status.HTTP_201_CREATED) def create_product(body: ProductCreate, db: Session = Depends(get_db), user: User = Depends(require_manager)): data = body.model_dump(exclude={"options", "ingredients", "preference_sets"}) if data.get("sort_order") == 0: data["sort_order"] = db.query(Product).count() product = Product(**data) db.add(product) db.flush() for opt in body.options: sub_json = json.dumps([s.model_dump() for s in opt.sub_choices]) if opt.sub_choices else None db.add(ProductOption(product_id=product.id, name=opt.name, extra_cost=opt.extra_cost, sub_choices=sub_json)) for ing in body.ingredients: db.add(ProductIngredient(product_id=product.id, **ing.model_dump())) _replace_preference_sets(db, product, body.preference_sets) db.commit() db.refresh(product) return product @router.put("/{product_id}", response_model=ProductOut) def update_product(product_id: int, body: ProductUpdate, db: Session = Depends(get_db), user: User = Depends(require_manager)): product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException(status_code=404, detail="Product not found") for field, value in body.model_dump(exclude_none=True, exclude={"options", "ingredients", "preference_sets"}).items(): setattr(product, field, value) if body.options is not None: _replace_options(db, product, body.options) if body.ingredients is not None: _replace_ingredients(db, product, body.ingredients) if body.preference_sets is not None: _replace_preference_sets(db, product, body.preference_sets) db.commit() db.refresh(product) return product @router.post("/{product_id}/image", response_model=ProductOut) async def upload_product_image(product_id: int, file: UploadFile = File(...), db: Session = Depends(get_db), user: User = Depends(require_manager)): product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException(status_code=404, detail="Product not found") if not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="File must be an image") os.makedirs(IMAGE_DIR, exist_ok=True) if product.image_url: old_path = os.path.join(IMAGE_DIR, os.path.basename(product.image_url)) if os.path.exists(old_path): os.remove(old_path) ext = file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else "jpg" filename = f"{product_id}_{uuid.uuid4().hex[:8]}.{ext}" filepath = os.path.join(IMAGE_DIR, filename) contents = await file.read() with open(filepath, "wb") as f: f.write(contents) product.image_url = f"/static/product_images/{filename}" db.commit() db.refresh(product) return product @router.delete("/{product_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_product(product_id: int, hard: bool = False, db: Session = Depends(get_db), user: User = Depends(require_manager)): product = db.query(Product).filter(Product.id == product_id).first() if not product: raise HTTPException(status_code=404, detail="Product not found") if hard: has_orders = db.query(OrderItem).filter(OrderItem.product_id == product_id).first() if has_orders: raise HTTPException( status_code=400, detail="Cannot permanently delete a product that appears in past orders. Deactivate it instead." ) db.delete(product) else: product.is_available = False db.commit()