Files
bellsystems-cp/backend/manufacturing/router.py
bonamin 024ba88470 fix: route manufacturing audit logs to shared Postgres audit log
- Manufacturing router now uses shared/audit.log_action (Postgres) instead
  of the separate manufacturing/audit.py (SQLite mfg_audit_log), so all
  manufacturing events appear in the Log Viewer
- Added log_action calls to 5 previously unlogged endpoints: lifecycle
  patch, lifecycle create, lifecycle delete, flash asset upload, flash
  asset note
- Removed the now-redundant /manufacturing/audit-log endpoint
- Log Viewer restricted to sysadmin only: backend uses require_sysadmin
  (was require_admin_or_above), frontend adds role guard on the page
- Fixed Action badge column clipping: table-layout auto + whiteSpace nowrap
  so the column sizes to fit the widest badge (Status Change)
- Added device_batch entity type to Log Viewer entity labels and filters

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 19:21:39 +03:00

634 lines
24 KiB
Python

from fastapi import APIRouter, Depends, Query, HTTPException, UploadFile, File
from fastapi.responses import Response
from fastapi.responses import RedirectResponse
from typing import Optional
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from auth.models import TokenPayload
from auth.dependencies import require_permission
from manufacturing.models import (
BatchCreate, BatchResponse,
DeviceInventoryItem, DeviceInventoryListResponse,
DeviceStatusUpdate, DeviceAssign,
ManufacturingStats,
)
from manufacturing import service
from shared.audit import log_action
from shared.exceptions import NotFoundError
from shared.firebase import get_db as get_firestore
from database.postgres import get_pg_session
class LifecycleEntryPatch(BaseModel):
index: int
date: Optional[str] = None
note: Optional[str] = None
class LifecycleEntryCreate(BaseModel):
status_id: str
date: Optional[str] = None
note: Optional[str] = None
VALID_FLASH_ASSETS = {"bootloader.bin", "partitions.bin"}
VALID_HW_TYPES_MFG = {"vesper", "vesper_plus", "vesper_pro", "agnus", "agnus_mini", "chronos", "chronos_pro"}
# Bespoke UIDs are dynamic — we allow any non-empty slug that doesn't clash with
# a standard hw_type name. The flash-asset upload endpoint checks this below.
router = APIRouter(prefix="/api/manufacturing", tags=["manufacturing"])
@router.get("/stats", response_model=ManufacturingStats)
def get_stats(
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
return service.get_stats()
@router.post("/batch", response_model=BatchResponse, status_code=201)
async def create_batch(
body: BatchCreate,
user: TokenPayload = Depends(require_permission("manufacturing", "add")),
db: AsyncSession = Depends(get_pg_session),
):
result = service.create_batch(body)
await log_action(
db, user.sub, user.email,
action="CREATE",
entity_type="device_batch",
entity_id=result.batch_id,
entity_label=f"Batch {result.batch_id} ({result.board_type}, qty {len(result.serial_numbers)})",
meta={
"board_type": result.board_type,
"board_version": result.board_version,
"quantity": len(result.serial_numbers),
},
)
return result
@router.get("/devices", response_model=DeviceInventoryListResponse)
def list_devices(
status: Optional[str] = Query(None),
hw_type: Optional[str] = Query(None),
search: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
items = service.list_devices(
status=status,
hw_type=hw_type,
search=search,
limit=limit,
offset=offset,
)
return DeviceInventoryListResponse(devices=items, total=len(items))
@router.get("/devices/{sn}", response_model=DeviceInventoryItem)
def get_device(
sn: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
return service.get_device_by_sn(sn)
@router.get("/customers/search")
def search_customers(
q: str = Query(""),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Search CRM customers by name, email, phone, organization, or tags."""
results = service.search_customers(q)
return {"results": results}
@router.get("/customers/{customer_id}")
def get_customer(
customer_id: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Get a single CRM customer by ID."""
db = get_firestore()
doc = db.collection("crm_customers").document(customer_id).get()
if not doc.exists:
raise HTTPException(status_code=404, detail="Customer not found")
data = doc.to_dict() or {}
loc = data.get("location") or {}
city = loc.get("city") if isinstance(loc, dict) else None
return {
"id": doc.id,
"name": data.get("name") or "",
"surname": data.get("surname") or "",
"email": data.get("email") or "",
"organization": data.get("organization") or "",
"phone": data.get("phone") or "",
"city": city or "",
}
@router.patch("/devices/{sn}/status", response_model=DeviceInventoryItem)
async def update_status(
sn: str,
body: DeviceStatusUpdate,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
# Guard: claimed requires at least one user in user_list
# (allow if explicitly force_claimed=true, which the mfg UI sets after adding a user manually)
if body.status.value == "claimed":
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if docs:
data = docs[0].to_dict() or {}
user_list = data.get("user_list", []) or []
if not user_list and not getattr(body, "force_claimed", False):
raise HTTPException(
status_code=400,
detail="Cannot set status to 'claimed': device has no users in user_list. "
"Assign a user first, then set to Claimed.",
)
# Guard: sold requires a customer assigned
if body.status.value == "sold":
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if docs:
data = docs[0].to_dict() or {}
if not data.get("customer_id"):
raise HTTPException(
status_code=400,
detail="Cannot set status to 'sold' without an assigned customer. "
"Use the 'Assign to Customer' action first.",
)
result = service.update_device_status(sn, body, set_by=user.email)
await log_action(
db, user.sub, user.email,
action="STATUS_CHANGE",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"status": body.status.value, "note": body.note},
)
return result
@router.patch("/devices/{sn}/lifecycle", response_model=DeviceInventoryItem)
async def patch_lifecycle_entry(
sn: str,
body: LifecycleEntryPatch,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
"""Edit the date and/or note of a lifecycle history entry by index."""
fs = get_firestore()
docs = list(fs.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
doc_ref = docs[0].reference
data = docs[0].to_dict() or {}
history = data.get("lifecycle_history") or []
if body.index < 0 or body.index >= len(history):
raise HTTPException(status_code=400, detail="Invalid lifecycle entry index")
if body.date is not None:
history[body.index]["date"] = body.date
if body.note is not None:
history[body.index]["note"] = body.note
doc_ref.update({"lifecycle_history": history})
from manufacturing.service import _doc_to_inventory_item
result = _doc_to_inventory_item(doc_ref.get())
await log_action(
db, user.sub, user.email,
action="UPDATE",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"lifecycle_index": body.index, "date": body.date, "note": body.note},
)
return result
@router.post("/devices/{sn}/lifecycle", response_model=DeviceInventoryItem, status_code=200)
async def create_lifecycle_entry(
sn: str,
body: LifecycleEntryCreate,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
"""Upsert a lifecycle history entry for the given status_id.
If an entry for this status already exists it is overwritten in-place;
otherwise a new entry is appended. This prevents duplicate entries when
a status is visited more than once (max one entry per status).
"""
from datetime import datetime, timezone
fs = get_firestore()
docs = list(fs.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
doc_ref = docs[0].reference
data = docs[0].to_dict() or {}
history = list(data.get("lifecycle_history") or [])
new_entry = {
"status_id": body.status_id,
"date": body.date or datetime.now(timezone.utc).isoformat(),
"note": body.note,
"set_by": user.email,
}
existing_idx = next(
(i for i, e in enumerate(history) if e.get("status_id") == body.status_id),
None,
)
is_update = existing_idx is not None
if is_update:
history[existing_idx] = new_entry
else:
history.append(new_entry)
doc_ref.update({"lifecycle_history": history})
from manufacturing.service import _doc_to_inventory_item
result = _doc_to_inventory_item(doc_ref.get())
await log_action(
db, user.sub, user.email,
action="UPDATE" if is_update else "CREATE",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"lifecycle_status": body.status_id, "date": new_entry["date"], "note": body.note},
)
return result
@router.delete("/devices/{sn}/lifecycle/{index}", response_model=DeviceInventoryItem)
async def delete_lifecycle_entry(
sn: str,
index: int,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
"""Delete a lifecycle history entry by index. Cannot delete the entry for the current status."""
fs = get_firestore()
docs = list(fs.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
doc_ref = docs[0].reference
data = docs[0].to_dict() or {}
history = data.get("lifecycle_history") or []
if index < 0 or index >= len(history):
raise HTTPException(status_code=400, detail="Invalid lifecycle entry index")
current_status = data.get("mfg_status", "")
deleted_entry = history[index]
if deleted_entry.get("status_id") == current_status:
raise HTTPException(status_code=400, detail="Cannot delete the entry for the current status. Change the status first.")
history.pop(index)
doc_ref.update({"lifecycle_history": history})
from manufacturing.service import _doc_to_inventory_item
result = _doc_to_inventory_item(doc_ref.get())
await log_action(
db, user.sub, user.email,
action="DELETE",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"lifecycle_status": deleted_entry.get("status_id"), "index": index},
)
return result
@router.get("/devices/{sn}/nvs.bin")
async def download_nvs(
sn: str,
hw_type_override: Optional[str] = Query(None, description="Override hw_type written to NVS (for bespoke firmware)"),
hw_revision_override: Optional[str] = Query(None, description="Override hw_revision written to NVS (for bespoke firmware)"),
nvs_schema: Optional[str] = Query(None, description="NVS schema to use: 'legacy' or 'new' (default)"),
user: TokenPayload = Depends(require_permission("manufacturing", "view")),
db: AsyncSession = Depends(get_pg_session),
):
binary = service.get_nvs_binary(sn, hw_type_override=hw_type_override, hw_revision_override=hw_revision_override, legacy=(nvs_schema == "legacy"))
await log_action(
db, user.sub, user.email,
action="COMMAND",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"command": "nvs_flash", "hw_type_override": hw_type_override, "nvs_schema": nvs_schema or "new"},
)
return Response(
content=binary,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{sn}_nvs.bin"'},
)
@router.post("/devices/{sn}/assign", response_model=DeviceInventoryItem)
async def assign_device(
sn: str,
body: DeviceAssign,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
try:
result = service.assign_device(sn, body)
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
await log_action(
db, user.sub, user.email,
action="UPDATE",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"customer_id": body.customer_id},
)
return result
@router.delete("/devices/{sn}", status_code=204)
async def delete_device(
sn: str,
force: bool = Query(False, description="Required to delete sold/claimed devices"),
user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
db: AsyncSession = Depends(get_pg_session),
):
"""Delete a device. Sold/claimed devices require force=true."""
try:
service.delete_device(sn, force=force)
except NotFoundError:
raise HTTPException(status_code=404, detail="Device not found")
except PermissionError as e:
raise HTTPException(status_code=403, detail=str(e))
await log_action(
db, user.sub, user.email,
action="DELETE",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"force": force},
)
@router.post("/devices/{sn}/email/manufactured", status_code=204)
async def send_manufactured_email(
sn: str,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
"""Send the 'device manufactured' notification to the assigned customer's email."""
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
data = docs[0].to_dict() or {}
customer_id = data.get("customer_id")
if not customer_id:
raise HTTPException(status_code=400, detail="No customer assigned to this device")
customer_doc = db.collection("crm_customers").document(customer_id).get()
if not customer_doc.exists:
raise HTTPException(status_code=404, detail="Assigned customer not found")
cdata = customer_doc.to_dict() or {}
email = cdata.get("email")
if not email:
raise HTTPException(status_code=400, detail="Customer has no email address")
name_parts = [cdata.get("name") or "", cdata.get("surname") or ""]
customer_name = " ".join(p for p in name_parts if p).strip() or None
hw_family = data.get("hw_family") or data.get("hw_type") or ""
from utils.emails.device_mfged_mail import send_device_manufactured_email
send_device_manufactured_email(
customer_email=email,
serial_number=sn,
device_name=hw_family.replace("_", " ").title(),
customer_name=customer_name,
)
await log_action(
db, user.sub, user.email,
action="COMMAND",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"command": "email_manufactured", "recipient": email},
)
@router.post("/devices/{sn}/email/assigned", status_code=204)
async def send_assigned_email(
sn: str,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
"""Send the 'device assigned / app instructions' email to the assigned user(s)."""
db = get_firestore()
docs = list(db.collection("devices").where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise HTTPException(status_code=404, detail="Device not found")
data = docs[0].to_dict() or {}
user_list = data.get("user_list") or []
if not user_list:
raise HTTPException(status_code=400, detail="No users assigned to this device")
hw_family = data.get("hw_family") or data.get("hw_type") or ""
device_name = hw_family.replace("_", " ").title()
from utils.emails.device_assigned_mail import send_device_assigned_email
errors = []
for uid in user_list:
try:
user_doc = db.collection("users").document(uid).get()
if not user_doc.exists:
continue
udata = user_doc.to_dict() or {}
email = udata.get("email")
if not email:
continue
display_name = udata.get("display_name") or udata.get("name") or None
send_device_assigned_email(
user_email=email,
serial_number=sn,
device_name=device_name,
user_name=display_name,
)
except Exception as exc:
errors.append(str(exc))
if errors:
raise HTTPException(status_code=500, detail=f"Some emails failed: {'; '.join(errors)}")
await log_action(
db, user.sub, user.email,
action="COMMAND",
entity_type="device",
entity_id=sn,
entity_label=sn,
meta={"command": "email_assigned", "user_count": len(user_list)},
)
@router.delete("/devices", status_code=200)
async def delete_unprovisioned(
user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
db: AsyncSession = Depends(get_pg_session),
):
"""Delete all devices with status 'manufactured' (never provisioned)."""
deleted = service.delete_unprovisioned_devices()
await log_action(
db, user.sub, user.email,
action="DELETE",
entity_type="device_batch",
entity_id="bulk_unprovisioned",
entity_label=f"Bulk delete unprovisioned ({len(deleted)} devices)",
meta={"count": len(deleted), "serial_numbers": deleted},
)
return {"deleted": deleted, "count": len(deleted)}
@router.get("/devices/{sn}/firmware.bin")
def redirect_firmware(
sn: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Redirect to the latest stable firmware binary for this device's hw_type.
Resolves to GET /api/firmware/{hw_type}/stable/{version}/firmware.bin.
"""
url = service.get_firmware_url(sn)
return RedirectResponse(url=url, status_code=302)
# ─────────────────────────────────────────────────────────────────────────────
# Flash assets — bootloader.bin and partitions.bin per hw_type
# These are the binaries that must be flashed at fixed addresses during full
# provisioning (0x1000 bootloader, 0x8000 partition table).
# They are NOT flashed during OTA updates — only during initial provisioning.
# Upload once per hw_type after each PlatformIO build that changes the layout.
# ─────────────────────────────────────────────────────────────────────────────
@router.get("/flash-assets")
def list_flash_assets(
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return asset status for all known board types (and any discovered bespoke UIDs).
Checks the filesystem directly — no database involved.
Each entry contains: hw_type, bootloader (exists, size, uploaded_at), partitions (same), note.
"""
return {"assets": service.list_flash_assets()}
@router.delete("/flash-assets/{hw_type}/{asset}", status_code=204)
async def delete_flash_asset(
hw_type: str,
asset: str,
user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
db: AsyncSession = Depends(get_pg_session),
):
"""Delete a single flash asset file (bootloader.bin or partitions.bin)."""
if asset not in VALID_FLASH_ASSETS:
raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}")
try:
service.delete_flash_asset(hw_type, asset)
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
await log_action(
db, user.sub, user.email,
action="DELETE",
entity_type="firmware",
entity_id=f"{hw_type}/{asset}",
entity_label=f"{hw_type} / {asset}",
meta={"hw_type": hw_type, "asset": asset},
)
class FlashAssetNoteBody(BaseModel):
note: str
@router.put("/flash-assets/{hw_type}/note", status_code=204)
async def set_flash_asset_note(
hw_type: str,
body: FlashAssetNoteBody,
user: TokenPayload = Depends(require_permission("manufacturing", "edit")),
db: AsyncSession = Depends(get_pg_session),
):
"""Save (or overwrite) the note for a hw_type's flash asset set.
The note is stored as note.txt next to the binary files.
Pass an empty string to clear the note.
"""
service.set_flash_asset_note(hw_type, body.note)
await log_action(
db, user.sub, user.email,
action="UPDATE",
entity_type="firmware",
entity_id=hw_type,
entity_label=hw_type,
meta={"note": body.note},
)
@router.post("/flash-assets/{hw_type}/{asset}", status_code=204)
async def upload_flash_asset(
hw_type: str,
asset: str,
file: UploadFile = File(...),
user: TokenPayload = Depends(require_permission("manufacturing", "add")),
db: AsyncSession = Depends(get_pg_session),
):
"""Upload a bootloader.bin or partitions.bin for a given hw_type.
These are build artifacts from PlatformIO (.pio/build/{env}/bootloader.bin
and .pio/build/{env}/partitions.bin). Upload them once per hw_type after
each PlatformIO build that changes the partition layout.
"""
if not hw_type or len(hw_type) > 128:
raise HTTPException(status_code=400, detail="Invalid hw_type/bespoke UID.")
if asset not in VALID_FLASH_ASSETS:
raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}")
data = await file.read()
service.save_flash_asset(hw_type, asset, data)
await log_action(
db, user.sub, user.email,
action="CREATE",
entity_type="firmware",
entity_id=f"{hw_type}/{asset}",
entity_label=f"{hw_type} / {asset}",
meta={"hw_type": hw_type, "asset": asset, "size_bytes": len(data)},
)
@router.get("/devices/{sn}/bootloader.bin")
def download_bootloader(
sn: str,
hw_type_override: Optional[str] = Query(None, description="Override hw_type for flash asset lookup (for bespoke firmware)"),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return the bootloader.bin for this device's hw_type (flashed at 0x1000)."""
item = service.get_device_by_sn(sn)
hw_type = hw_type_override or item.hw_type
try:
data = service.get_flash_asset(hw_type, "bootloader.bin")
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="bootloader_{hw_type}.bin"'},
)
@router.get("/devices/{sn}/partitions.bin")
def download_partitions(
sn: str,
hw_type_override: Optional[str] = Query(None, description="Override hw_type for flash asset lookup (for bespoke firmware)"),
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return the partitions.bin for this device's hw_type (flashed at 0x8000)."""
item = service.get_device_by_sn(sn)
hw_type = hw_type_override or item.hw_type
try:
data = service.get_flash_asset(hw_type, "partitions.bin")
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="partitions_{hw_type}.bin"'},
)