Files

562 lines
21 KiB
Python

from fastapi import APIRouter, Depends, Query, HTTPException, UploadFile, File
from fastapi.responses import Response
from fastapi.responses import RedirectResponse
from typing import Optional
from pydantic import BaseModel
from auth.models import TokenPayload
from auth.dependencies import require_permission
from manufacturing.models import (
BatchCreate, BatchResponse,
DeviceInventoryItem, DeviceInventoryListResponse,
DeviceStatusUpdate, DeviceAssign,
ManufacturingStats,
)
from manufacturing import service
from manufacturing import audit
from shared.exceptions import NotFoundError
from shared.firebase import get_db as get_firestore
class LifecycleEntryPatch(BaseModel):
index: int
date: Optional[str] = None
note: Optional[str] = None
class LifecycleEntryCreate(BaseModel):
status_id: str
date: Optional[str] = None
note: Optional[str] = None
VALID_FLASH_ASSETS = {"bootloader.bin", "partitions.bin"}
VALID_HW_TYPES_MFG = {"vesper", "vesper_plus", "vesper_pro", "agnus", "agnus_mini", "chronos", "chronos_pro"}
# Bespoke UIDs are dynamic — we allow any non-empty slug that doesn't clash with
# a standard hw_type name. The flash-asset upload endpoint checks this below.
router = APIRouter(prefix="/api/manufacturing", tags=["manufacturing"])
@router.get("/stats", response_model=ManufacturingStats)
def get_stats(
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
return service.get_stats()
@router.get("/audit-log")
async def get_audit_log(
limit: int = Query(20, ge=1, le=100),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
entries = await audit.get_recent(limit=limit)
return {"entries": entries}
@router.post("/batch", response_model=BatchResponse, status_code=201)
async def create_batch(
body: BatchCreate,
user: TokenPayload = Depends(require_permission("manufacturing", "add")),
):
result = service.create_batch(body)
await audit.log_action(
admin_user=user.email,
action="batch_created",
detail={
"batch_id": result.batch_id,
"board_type": result.board_type,
"board_version": result.board_version,
"quantity": len(result.serial_numbers),
},
)
return result
@router.get("/devices", response_model=DeviceInventoryListResponse)
def list_devices(
status: Optional[str] = Query(None),
hw_type: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
items = service.list_devices(
status=status,
hw_type=hw_type,
search=search,
limit=limit,
offset=offset,
)
return DeviceInventoryListResponse(devices=items, total=len(items))
@router.get("/devices/{sn}", response_model=DeviceInventoryItem)
def get_device(
sn: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
return service.get_device_by_sn(sn)
@router.get("/customers/search")
def search_customers(
q: str = Query(""),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Search CRM customers by name, email, phone, organization, or tags."""
results = service.search_customers(q)
return {"results": results}
@router.get("/customers/{customer_id}")
def get_customer(
customer_id: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Get a single CRM customer by ID."""
db = get_firestore()
doc = db.collection("crm_customers").document(customer_id).get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Customer not found")
data = doc.to_dict() or {}
loc = data.get("location") or {}
city = loc.get("city") if isinstance(loc, dict) else None
return {
"id": doc.id,
"name": data.get("name") or "",
"surname": data.get("surname") or "",
"email": data.get("email") or "",
"organization": data.get("organization") or "",
"phone": data.get("phone") or "",
"city": city or "",
}
@router.patch("/devices/{sn}/status", response_model=DeviceInventoryItem)
async def update_status(
sn: str,
body: DeviceStatusUpdate,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
# Guard: claimed requires at least one user in user_list
# (allow if explicitly force_claimed=true, which the mfg UI sets after adding a user manually)
if body.status.value == "claimed":
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if docs:
data = docs[0].to_dict() or {}
user_list = data.get("user_list", []) or []
if not user_list and not getattr(body, "force_claimed", False):
raise HTTPException(
status_code=400,
detail="Cannot set status to 'claimed': device has no users in user_list. "
"Assign a user first, then set to Claimed.",
)
# Guard: sold requires a customer assigned
if body.status.value == "sold":
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if docs:
data = docs[0].to_dict() or {}
if not data.get("customer_id"):
raise HTTPException(
status_code=400,
detail="Cannot set status to 'sold' without an assigned customer. "
"Use the 'Assign to Customer' action first.",
)
result = service.update_device_status(sn, body, set_by=user.email)
await audit.log_action(
admin_user=user.email,
action="status_updated",
serial_number=sn,
detail={"status": body.status.value, "note": body.note},
)
return result
@router.patch("/devices/{sn}/lifecycle", response_model=DeviceInventoryItem)
async def patch_lifecycle_entry(
sn: str,
body: LifecycleEntryPatch,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
"""Edit the date and/or note of a lifecycle history entry by index."""
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
doc_ref = docs[0].reference
data = docs[0].to_dict() or {}
history = data.get("lifecycle_history") or []
if body.index < 0 or body.index >= len(history):
raise HTTPException(status_code=400, detail="Invalid lifecycle entry index")
if body.date is not None:
history[body.index]["date"] = body.date
if body.note is not None:
history[body.index]["note"] = body.note
doc_ref.update({"lifecycle_history": history})
from manufacturing.service import _doc_to_inventory_item
return _doc_to_inventory_item(doc_ref.get())
@router.post("/devices/{sn}/lifecycle", response_model=DeviceInventoryItem, status_code=200)
async def create_lifecycle_entry(
sn: str,
body: LifecycleEntryCreate,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
"""Upsert a lifecycle history entry for the given status_id.
If an entry for this status already exists it is overwritten in-place;
otherwise a new entry is appended. This prevents duplicate entries when
a status is visited more than once (max one entry per status).
"""
from datetime import datetime, timezone
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
doc_ref = docs[0].reference
data = docs[0].to_dict() or {}
history = list(data.get("lifecycle_history") or [])
new_entry = {
"status_id": body.status_id,
"date": body.date or datetime.now(timezone.utc).isoformat(),
"note": body.note,
"set_by": user.email,
}
# Overwrite existing entry for this status if present, else append
existing_idx = next(
(i for i, e in enumerate(history) if e.get("status_id") == body.status_id),
None,
)
if existing_idx is not None:
history[existing_idx] = new_entry
else:
history.append(new_entry)
doc_ref.update({"lifecycle_history": history})
from manufacturing.service import _doc_to_inventory_item
return _doc_to_inventory_item(doc_ref.get())
@router.delete("/devices/{sn}/lifecycle/{index}", response_model=DeviceInventoryItem)
async def delete_lifecycle_entry(
sn: str,
index: int,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
"""Delete a lifecycle history entry by index. Cannot delete the entry for the current status."""
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
doc_ref = docs[0].reference
data = docs[0].to_dict() or {}
history = data.get("lifecycle_history") or []
if index < 0 or index >= len(history):
raise HTTPException(status_code=400, detail="Invalid lifecycle entry index")
current_status = data.get("mfg_status", "")
if history[index].get("status_id") == current_status:
raise HTTPException(status_code=400, detail="Cannot delete the entry for the current status. Change the status first.")
history.pop(index)
doc_ref.update({"lifecycle_history": history})
from manufacturing.service import _doc_to_inventory_item
return _doc_to_inventory_item(doc_ref.get())
@router.get("/devices/{sn}/nvs.bin")
async def download_nvs(
sn: str,
hw_type_override: Optional[str] = Query(None, description="Override hw_type written to NVS (for bespoke firmware)"),
hw_revision_override: Optional[str] = Query(None, description="Override hw_revision written to NVS (for bespoke firmware)"),
nvs_schema: Optional[str] = Query(None, description="NVS schema to use: 'legacy' or 'new' (default)"),
user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
binary = service.get_nvs_binary(sn, hw_type_override=hw_type_override, hw_revision_override=hw_revision_override, legacy=(nvs_schema == "legacy"))
await audit.log_action(
admin_user=user.email,
action="device_flashed",
serial_number=sn,
)
return Response(
content=binary,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{sn}_nvs.bin"'},
)
@router.post("/devices/{sn}/assign", response_model=DeviceInventoryItem)
async def assign_device(
sn: str,
body: DeviceAssign,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
try:
result = service.assign_device(sn, body)
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
await audit.log_action(
admin_user=user.email,
action="device_assigned",
serial_number=sn,
detail={"customer_id": body.customer_id},
)
return result
@router.delete("/devices/{sn}", status_code=204)
async def delete_device(
sn: str,
force: bool = Query(False, description="Required to delete sold/claimed devices"),
user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
):
"""Delete a device. Sold/claimed devices require force=true."""
try:
service.delete_device(sn, force=force)
except NotFoundError:
raise HTTPException(status_code=404, detail="Device not found")
except PermissionError as e:
raise HTTPException(status_code=403, detail=str(e))
await audit.log_action(
admin_user=user.email,
action="device_deleted",
serial_number=sn,
detail={"force": force},
)
@router.post("/devices/{sn}/email/manufactured", status_code=204)
async def send_manufactured_email(
sn: str,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
"""Send the 'device manufactured' notification to the assigned customer's email."""
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
data = docs[0].to_dict() or {}
customer_id = data.get("customer_id")
if not customer_id:
raise HTTPException(status_code=400, detail="No customer assigned to this device")
customer_doc = db.collection("crm_customers").document(customer_id).get()
if not customer_doc.exists:
raise HTTPException(status_code=404, detail="Assigned customer not found")
cdata = customer_doc.to_dict() or {}
email = cdata.get("email")
if not email:
raise HTTPException(status_code=400, detail="Customer has no email address")
name_parts = [cdata.get("name") or "", cdata.get("surname") or ""]
customer_name = " ".join(p for p in name_parts if p).strip() or None
hw_family = data.get("hw_family") or data.get("hw_type") or ""
from utils.emails.device_mfged_mail import send_device_manufactured_email
send_device_manufactured_email(
customer_email=email,
serial_number=sn,
device_name=hw_family.replace("_", " ").title(),
customer_name=customer_name,
)
await audit.log_action(
admin_user=user.email,
action="email_manufactured_sent",
serial_number=sn,
detail={"recipient": email},
)
@router.post("/devices/{sn}/email/assigned", status_code=204)
async def send_assigned_email(
sn: str,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
"""Send the 'device assigned / app instructions' email to the assigned user(s)."""
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
data = docs[0].to_dict() or {}
user_list = data.get("user_list") or []
if not user_list:
raise HTTPException(status_code=400, detail="No users assigned to this device")
hw_family = data.get("hw_family") or data.get("hw_type") or ""
device_name = hw_family.replace("_", " ").title()
from utils.emails.device_assigned_mail import send_device_assigned_email
errors = []
for uid in user_list:
try:
user_doc = db.collection("users").document(uid).get()
if not user_doc.exists:
continue
udata = user_doc.to_dict() or {}
email = udata.get("email")
if not email:
continue
display_name = udata.get("display_name") or udata.get("name") or None
send_device_assigned_email(
user_email=email,
serial_number=sn,
device_name=device_name,
user_name=display_name,
)
except Exception as exc:
errors.append(str(exc))
if errors:
raise HTTPException(status_code=500, detail=f"Some emails failed: {'; '.join(errors)}")
await audit.log_action(
admin_user=user.email,
action="email_assigned_sent",
serial_number=sn,
detail={"user_count": len(user_list)},
)
@router.delete("/devices", status_code=200)
async def delete_unprovisioned(
user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
):
"""Delete all devices with status 'manufactured' (never provisioned)."""
deleted = service.delete_unprovisioned_devices()
await audit.log_action(
admin_user=user.email,
action="bulk_delete_unprovisioned",
detail={"count": len(deleted), "serial_numbers": deleted},
)
return {"deleted": deleted, "count": len(deleted)}
@router.get("/devices/{sn}/firmware.bin")
def redirect_firmware(
sn: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Redirect to the latest stable firmware binary for this device's hw_type.
Resolves to GET /api/firmware/{hw_type}/stable/{version}/firmware.bin.
"""
url = service.get_firmware_url(sn)
return RedirectResponse(url=url, status_code=302)
# ─────────────────────────────────────────────────────────────────────────────
# Flash assets — bootloader.bin and partitions.bin per hw_type
# These are the binaries that must be flashed at fixed addresses during full
# provisioning (0x1000 bootloader, 0x8000 partition table).
# They are NOT flashed during OTA updates — only during initial provisioning.
# Upload once per hw_type after each PlatformIO build that changes the layout.
# ─────────────────────────────────────────────────────────────────────────────
@router.get("/flash-assets")
def list_flash_assets(
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return asset status for all known board types (and any discovered bespoke UIDs).
Checks the filesystem directly — no database involved.
Each entry contains: hw_type, bootloader (exists, size, uploaded_at), partitions (same), note.
"""
return {"assets": service.list_flash_assets()}
@router.delete("/flash-assets/{hw_type}/{asset}", status_code=204)
async def delete_flash_asset(
hw_type: str,
asset: str,
user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
):
"""Delete a single flash asset file (bootloader.bin or partitions.bin)."""
if asset not in VALID_FLASH_ASSETS:
raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}")
try:
service.delete_flash_asset(hw_type, asset)
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
await audit.log_action(
admin_user=user.email,
action="flash_asset_deleted",
detail={"hw_type": hw_type, "asset": asset},
)
class FlashAssetNoteBody(BaseModel):
note: str
@router.put("/flash-assets/{hw_type}/note", status_code=204)
async def set_flash_asset_note(
hw_type: str,
body: FlashAssetNoteBody,
_user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
):
"""Save (or overwrite) the note for a hw_type's flash asset set.
The note is stored as note.txt next to the binary files.
Pass an empty string to clear the note.
"""
service.set_flash_asset_note(hw_type, body.note)
@router.post("/flash-assets/{hw_type}/{asset}", status_code=204)
async def upload_flash_asset(
hw_type: str,
asset: str,
file: UploadFile = File(...),
_user: TokenPayload = Depends(require_permission("manufacturing", "add")),
):
"""Upload a bootloader.bin or partitions.bin for a given hw_type.
These are build artifacts from PlatformIO (.pio/build/{env}/bootloader.bin
and .pio/build/{env}/partitions.bin). Upload them once per hw_type after
each PlatformIO build that changes the partition layout.
"""
# hw_type can be a standard board type OR a bespoke UID (any non-empty slug)
if not hw_type or len(hw_type) > 128:
raise HTTPException(status_code=400, detail="Invalid hw_type/bespoke UID.")
if asset not in VALID_FLASH_ASSETS:
raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}")
data = await file.read()
service.save_flash_asset(hw_type, asset, data)
@router.get("/devices/{sn}/bootloader.bin")
def download_bootloader(
sn: str,
hw_type_override: Optional[str] = Query(None, description="Override hw_type for flash asset lookup (for bespoke firmware)"),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return the bootloader.bin for this device's hw_type (flashed at 0x1000)."""
item = service.get_device_by_sn(sn)
hw_type = hw_type_override or item.hw_type
try:
data = service.get_flash_asset(hw_type, "bootloader.bin")
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="bootloader_{hw_type}.bin"'},
)
@router.get("/devices/{sn}/partitions.bin")
def download_partitions(
sn: str,
hw_type_override: Optional[str] = Query(None, description="Override hw_type for flash asset lookup (for bespoke firmware)"),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return the partitions.bin for this device's hw_type (flashed at 0x8000)."""
item = service.get_device_by_sn(sn)
hw_type = hw_type_override or item.hw_type
try:
data = service.get_flash_asset(hw_type, "partitions.bin")
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="partitions_{hw_type}.bin"'},
)