from fastapi import APIRouter, Depends, Query, HTTPException, UploadFile, File from fastapi.responses import Response from fastapi.responses import RedirectResponse from typing import Optional from pydantic import BaseModel from auth.models import TokenPayload from auth.dependencies import require_permission from manufacturing.models import ( BatchCreate, BatchResponse, DeviceInventoryItem, DeviceInventoryListResponse, DeviceStatusUpdate, DeviceAssign, ManufacturingStats, ) from manufacturing import service from manufacturing import audit from shared.exceptions import NotFoundError from shared.firebase import get_db as get_firestore class LifecycleEntryPatch(BaseModel): index: int date: Optional[str] = None note: Optional[str] = None class LifecycleEntryCreate(BaseModel): status_id: str date: Optional[str] = None note: Optional[str] = None VALID_FLASH_ASSETS = {"bootloader.bin", "partitions.bin"} VALID_HW_TYPES_MFG = {"vesper", "vesper_plus", "vesper_pro", "agnus", "agnus_mini", "chronos", "chronos_pro"} # Bespoke UIDs are dynamic — we allow any non-empty slug that doesn't clash with # a standard hw_type name. The flash-asset upload endpoint checks this below. router = APIRouter(prefix="/api/manufacturing", tags=["manufacturing"]) @router.get("/stats", response_model=ManufacturingStats) def get_stats( _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): return service.get_stats() @router.get("/audit-log") async def get_audit_log( limit: int = Query(20, ge=1, le=100), _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): entries = await audit.get_recent(limit=limit) return {"entries": entries} @router.post("/batch", response_model=BatchResponse, status_code=201) async def create_batch( body: BatchCreate, user: TokenPayload = Depends(require_permission("manufacturing", "add")), ): result = service.create_batch(body) await audit.log_action( admin_user=user.email, action="batch_created", detail={ "batch_id": result.batch_id, "board_type": result.board_type, "board_version": result.board_version, "quantity": len(result.serial_numbers), }, ) return result @router.get("/devices", response_model=DeviceInventoryListResponse) def list_devices( status: Optional[str] = Query(None), hw_type: Optional[str] = Query(None), search: Optional[str] = Query(None), limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): items = service.list_devices( status=status, hw_type=hw_type, search=search, limit=limit, offset=offset, ) return DeviceInventoryListResponse(devices=items, total=len(items)) @router.get("/devices/{sn}", response_model=DeviceInventoryItem) def get_device( sn: str, _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): return service.get_device_by_sn(sn) @router.get("/customers/search") def search_customers( q: str = Query(""), _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): """Search CRM customers by name, email, phone, organization, or tags.""" results = service.search_customers(q) return {"results": results} @router.get("/customers/{customer_id}") def get_customer( customer_id: str, _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): """Get a single CRM customer by ID.""" db = get_firestore() doc = db.collection("crm_customers").document(customer_id).get() if not doc.exists: raise HTTPException(status_code=404, detail="Customer not found") data = doc.to_dict() or {} loc = data.get("location") or {} city = loc.get("city") if isinstance(loc, dict) else None return { "id": doc.id, "name": data.get("name") or "", "surname": data.get("surname") or "", "email": data.get("email") or "", "organization": data.get("organization") or "", "phone": data.get("phone") or "", "city": city or "", } @router.patch("/devices/{sn}/status", response_model=DeviceInventoryItem) async def update_status( sn: str, body: DeviceStatusUpdate, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): # Guard: claimed requires at least one user in user_list # (allow if explicitly force_claimed=true, which the mfg UI sets after adding a user manually) if body.status.value == "claimed": db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if docs: data = docs[0].to_dict() or {} user_list = data.get("user_list", []) or [] if not user_list and not getattr(body, "force_claimed", False): raise HTTPException( status_code=400, detail="Cannot set status to 'claimed': device has no users in user_list. " "Assign a user first, then set to Claimed.", ) # Guard: sold requires a customer assigned if body.status.value == "sold": db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if docs: data = docs[0].to_dict() or {} if not data.get("customer_id"): raise HTTPException( status_code=400, detail="Cannot set status to 'sold' without an assigned customer. " "Use the 'Assign to Customer' action first.", ) result = service.update_device_status(sn, body, set_by=user.email) await audit.log_action( admin_user=user.email, action="status_updated", serial_number=sn, detail={"status": body.status.value, "note": body.note}, ) return result @router.patch("/devices/{sn}/lifecycle", response_model=DeviceInventoryItem) async def patch_lifecycle_entry( sn: str, body: LifecycleEntryPatch, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): """Edit the date and/or note of a lifecycle history entry by index.""" db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if not docs: raise HTTPException(status_code=404, detail="Device not found") doc_ref = docs[0].reference data = docs[0].to_dict() or {} history = data.get("lifecycle_history") or [] if body.index < 0 or body.index >= len(history): raise HTTPException(status_code=400, detail="Invalid lifecycle entry index") if body.date is not None: history[body.index]["date"] = body.date if body.note is not None: history[body.index]["note"] = body.note doc_ref.update({"lifecycle_history": history}) from manufacturing.service import _doc_to_inventory_item return _doc_to_inventory_item(doc_ref.get()) @router.post("/devices/{sn}/lifecycle", response_model=DeviceInventoryItem, status_code=200) async def create_lifecycle_entry( sn: str, body: LifecycleEntryCreate, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): """Upsert a lifecycle history entry for the given status_id. If an entry for this status already exists it is overwritten in-place; otherwise a new entry is appended. This prevents duplicate entries when a status is visited more than once (max one entry per status). """ from datetime import datetime, timezone db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if not docs: raise HTTPException(status_code=404, detail="Device not found") doc_ref = docs[0].reference data = docs[0].to_dict() or {} history = list(data.get("lifecycle_history") or []) new_entry = { "status_id": body.status_id, "date": body.date or datetime.now(timezone.utc).isoformat(), "note": body.note, "set_by": user.email, } # Overwrite existing entry for this status if present, else append existing_idx = next( (i for i, e in enumerate(history) if e.get("status_id") == body.status_id), None, ) if existing_idx is not None: history[existing_idx] = new_entry else: history.append(new_entry) doc_ref.update({"lifecycle_history": history}) from manufacturing.service import _doc_to_inventory_item return _doc_to_inventory_item(doc_ref.get()) @router.delete("/devices/{sn}/lifecycle/{index}", response_model=DeviceInventoryItem) async def delete_lifecycle_entry( sn: str, index: int, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): """Delete a lifecycle history entry by index. Cannot delete the entry for the current status.""" db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if not docs: raise HTTPException(status_code=404, detail="Device not found") doc_ref = docs[0].reference data = docs[0].to_dict() or {} history = data.get("lifecycle_history") or [] if index < 0 or index >= len(history): raise HTTPException(status_code=400, detail="Invalid lifecycle entry index") current_status = data.get("mfg_status", "") if history[index].get("status_id") == current_status: raise HTTPException(status_code=400, detail="Cannot delete the entry for the current status. Change the status first.") history.pop(index) doc_ref.update({"lifecycle_history": history}) from manufacturing.service import _doc_to_inventory_item return _doc_to_inventory_item(doc_ref.get()) @router.get("/devices/{sn}/nvs.bin") async def download_nvs( sn: str, hw_type_override: Optional[str] = Query(None, description="Override hw_type written to NVS (for bespoke firmware)"), hw_revision_override: Optional[str] = Query(None, description="Override hw_revision written to NVS (for bespoke firmware)"), user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): binary = service.get_nvs_binary(sn, hw_type_override=hw_type_override, hw_revision_override=hw_revision_override) await audit.log_action( admin_user=user.email, action="device_flashed", serial_number=sn, ) return Response( content=binary, media_type="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="{sn}_nvs.bin"'}, ) @router.post("/devices/{sn}/assign", response_model=DeviceInventoryItem) async def assign_device( sn: str, body: DeviceAssign, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): try: result = service.assign_device(sn, body) except NotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) await audit.log_action( admin_user=user.email, action="device_assigned", serial_number=sn, detail={"customer_id": body.customer_id}, ) return result @router.delete("/devices/{sn}", status_code=204) async def delete_device( sn: str, force: bool = Query(False, description="Required to delete sold/claimed devices"), user: TokenPayload = Depends(require_permission("manufacturing", "delete")), ): """Delete a device. Sold/claimed devices require force=true.""" try: service.delete_device(sn, force=force) except NotFoundError: raise HTTPException(status_code=404, detail="Device not found") except PermissionError as e: raise HTTPException(status_code=403, detail=str(e)) await audit.log_action( admin_user=user.email, action="device_deleted", serial_number=sn, detail={"force": force}, ) @router.post("/devices/{sn}/email/manufactured", status_code=204) async def send_manufactured_email( sn: str, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): """Send the 'device manufactured' notification to the assigned customer's email.""" db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if not docs: raise HTTPException(status_code=404, detail="Device not found") data = docs[0].to_dict() or {} customer_id = data.get("customer_id") if not customer_id: raise HTTPException(status_code=400, detail="No customer assigned to this device") customer_doc = db.collection("crm_customers").document(customer_id).get() if not customer_doc.exists: raise HTTPException(status_code=404, detail="Assigned customer not found") cdata = customer_doc.to_dict() or {} email = cdata.get("email") if not email: raise HTTPException(status_code=400, detail="Customer has no email address") name_parts = [cdata.get("name") or "", cdata.get("surname") or ""] customer_name = " ".join(p for p in name_parts if p).strip() or None hw_family = data.get("hw_family") or data.get("hw_type") or "" from utils.emails.device_mfged_mail import send_device_manufactured_email send_device_manufactured_email( customer_email=email, serial_number=sn, device_name=hw_family.replace("_", " ").title(), customer_name=customer_name, ) await audit.log_action( admin_user=user.email, action="email_manufactured_sent", serial_number=sn, detail={"recipient": email}, ) @router.post("/devices/{sn}/email/assigned", status_code=204) async def send_assigned_email( sn: str, user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): """Send the 'device assigned / app instructions' email to the assigned user(s).""" db = get_firestore() docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream()) if not docs: raise HTTPException(status_code=404, detail="Device not found") data = docs[0].to_dict() or {} user_list = data.get("user_list") or [] if not user_list: raise HTTPException(status_code=400, detail="No users assigned to this device") hw_family = data.get("hw_family") or data.get("hw_type") or "" device_name = hw_family.replace("_", " ").title() from utils.emails.device_assigned_mail import send_device_assigned_email errors = [] for uid in user_list: try: user_doc = db.collection("users").document(uid).get() if not user_doc.exists: continue udata = user_doc.to_dict() or {} email = udata.get("email") if not email: continue display_name = udata.get("display_name") or udata.get("name") or None send_device_assigned_email( user_email=email, serial_number=sn, device_name=device_name, user_name=display_name, ) except Exception as exc: errors.append(str(exc)) if errors: raise HTTPException(status_code=500, detail=f"Some emails failed: {'; '.join(errors)}") await audit.log_action( admin_user=user.email, action="email_assigned_sent", serial_number=sn, detail={"user_count": len(user_list)}, ) @router.delete("/devices", status_code=200) async def delete_unprovisioned( user: TokenPayload = Depends(require_permission("manufacturing", "delete")), ): """Delete all devices with status 'manufactured' (never provisioned).""" deleted = service.delete_unprovisioned_devices() await audit.log_action( admin_user=user.email, action="bulk_delete_unprovisioned", detail={"count": len(deleted), "serial_numbers": deleted}, ) return {"deleted": deleted, "count": len(deleted)} @router.get("/devices/{sn}/firmware.bin") def redirect_firmware( sn: str, _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): """Redirect to the latest stable firmware binary for this device's hw_type. Resolves to GET /api/firmware/{hw_type}/stable/{version}/firmware.bin. """ url = service.get_firmware_url(sn) return RedirectResponse(url=url, status_code=302) # ───────────────────────────────────────────────────────────────────────────── # Flash assets — bootloader.bin and partitions.bin per hw_type # These are the binaries that must be flashed at fixed addresses during full # provisioning (0x1000 bootloader, 0x8000 partition table). # They are NOT flashed during OTA updates — only during initial provisioning. # Upload once per hw_type after each PlatformIO build that changes the layout. # ───────────────────────────────────────────────────────────────────────────── @router.get("/flash-assets") def list_flash_assets( _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): """Return asset status for all known board types (and any discovered bespoke UIDs). Checks the filesystem directly — no database involved. Each entry contains: hw_type, bootloader (exists, size, uploaded_at), partitions (same), note. """ return {"assets": service.list_flash_assets()} @router.delete("/flash-assets/{hw_type}/{asset}", status_code=204) async def delete_flash_asset( hw_type: str, asset: str, user: TokenPayload = Depends(require_permission("manufacturing", "delete")), ): """Delete a single flash asset file (bootloader.bin or partitions.bin).""" if asset not in VALID_FLASH_ASSETS: raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}") try: service.delete_flash_asset(hw_type, asset) except NotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) await audit.log_action( admin_user=user.email, action="flash_asset_deleted", detail={"hw_type": hw_type, "asset": asset}, ) class FlashAssetNoteBody(BaseModel): note: str @router.put("/flash-assets/{hw_type}/note", status_code=204) async def set_flash_asset_note( hw_type: str, body: FlashAssetNoteBody, _user: TokenPayload = Depends(require_permission("manufacturing", "edit")), ): """Save (or overwrite) the note for a hw_type's flash asset set. The note is stored as note.txt next to the binary files. Pass an empty string to clear the note. """ service.set_flash_asset_note(hw_type, body.note) @router.post("/flash-assets/{hw_type}/{asset}", status_code=204) async def upload_flash_asset( hw_type: str, asset: str, file: UploadFile = File(...), _user: TokenPayload = Depends(require_permission("manufacturing", "add")), ): """Upload a bootloader.bin or partitions.bin for a given hw_type. These are build artifacts from PlatformIO (.pio/build/{env}/bootloader.bin and .pio/build/{env}/partitions.bin). Upload them once per hw_type after each PlatformIO build that changes the partition layout. """ # hw_type can be a standard board type OR a bespoke UID (any non-empty slug) if not hw_type or len(hw_type) > 128: raise HTTPException(status_code=400, detail="Invalid hw_type/bespoke UID.") if asset not in VALID_FLASH_ASSETS: raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}") data = await file.read() service.save_flash_asset(hw_type, asset, data) @router.get("/devices/{sn}/bootloader.bin") def download_bootloader( sn: str, hw_type_override: Optional[str] = Query(None, description="Override hw_type for flash asset lookup (for bespoke firmware)"), _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): """Return the bootloader.bin for this device's hw_type (flashed at 0x1000).""" item = service.get_device_by_sn(sn) hw_type = hw_type_override or item.hw_type try: data = service.get_flash_asset(hw_type, "bootloader.bin") except NotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) return Response( content=data, media_type="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="bootloader_{hw_type}.bin"'}, ) @router.get("/devices/{sn}/partitions.bin") def download_partitions( sn: str, hw_type_override: Optional[str] = Query(None, description="Override hw_type for flash asset lookup (for bespoke firmware)"), _user: TokenPayload = Depends(require_permission("manufacturing", "view")), ): """Return the partitions.bin for this device's hw_type (flashed at 0x8000).""" item = service.get_device_by_sn(sn) hw_type = hw_type_override or item.hw_type try: data = service.get_flash_asset(hw_type, "partitions.bin") except NotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) return Response( content=data, media_type="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="partitions_{hw_type}.bin"'}, )