Files
bellsystems-cp/backend/manufacturing/service.py

413 lines
14 KiB
Python

import logging
import random
import string
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
from config import settings
from shared.firebase import get_db
from shared.exceptions import NotFoundError
from utils.serial_number import generate_serial
from utils.nvs_generator import generate as generate_nvs_binary
from manufacturing.models import BatchCreate, BatchResponse, DeviceInventoryItem, DeviceStatusUpdate, DeviceAssign, ManufacturingStats, RecentActivityItem, BOARD_TYPE_LABELS
COLLECTION = "devices"
_BATCH_ID_CHARS = string.ascii_uppercase + string.digits
def _make_batch_id() -> str:
today = datetime.utcnow().strftime("%y%m%d")
suffix = "".join(random.choices(_BATCH_ID_CHARS, k=4))
return f"BATCH-{today}-{suffix}"
def _get_existing_sns(db) -> set:
existing = set()
for doc in db.collection(COLLECTION).select(["serial_number"]).stream():
data = doc.to_dict()
sn = data.get("serial_number")
if sn:
existing.add(sn)
return existing
def _resolve_user_list(raw_list: list) -> list[str]:
"""Convert user_list entries (DocumentReferences or path strings) to plain user ID strings."""
from google.cloud.firestore_v1 import DocumentReference
result = []
for entry in raw_list:
if isinstance(entry, DocumentReference):
result.append(entry.id)
elif isinstance(entry, str):
result.append(entry.split("/")[-1])
return result
def _doc_to_inventory_item(doc) -> DeviceInventoryItem:
data = doc.to_dict() or {}
created_raw = data.get("created_at")
if isinstance(created_raw, datetime):
created_str = created_raw.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
created_str = str(created_raw) if created_raw else None
return DeviceInventoryItem(
id=doc.id,
serial_number=data.get("serial_number", ""),
hw_type=data.get("hw_type", ""),
hw_version=data.get("hw_version", ""),
mfg_status=data.get("mfg_status", "manufactured"),
mfg_batch_id=data.get("mfg_batch_id"),
created_at=created_str,
owner=data.get("owner"),
assigned_to=data.get("assigned_to"),
device_name=data.get("device_name") or None,
lifecycle_history=data.get("lifecycle_history") or [],
customer_id=data.get("customer_id"),
user_list=_resolve_user_list(data.get("user_list") or []),
)
def create_batch(data: BatchCreate) -> BatchResponse:
db = get_db()
existing_sns = _get_existing_sns(db)
batch_id = _make_batch_id()
now = datetime.now(timezone.utc)
serial_numbers = []
for _ in range(data.quantity):
for attempt in range(200):
sn = generate_serial(data.board_type.value, data.board_version)
if sn not in existing_sns:
existing_sns.add(sn)
break
else:
raise RuntimeError("Could not generate unique serial numbers — collision limit hit")
db.collection(COLLECTION).add({
"serial_number": sn,
"hw_type": data.board_type.value,
"hw_version": data.board_version,
"mfg_status": "manufactured",
"mfg_batch_id": batch_id,
"created_at": now,
"owner": None,
"assigned_to": None,
"user_list": [],
# Legacy fields left empty so existing device views don't break
"device_name": "",
"device_location": "",
"is_Online": False,
"lifecycle_history": [
{
"status_id": "manufactured",
"date": now.isoformat(),
"note": None,
"set_by": None,
}
],
})
serial_numbers.append(sn)
return BatchResponse(
batch_id=batch_id,
serial_numbers=serial_numbers,
board_type=data.board_type.value,
board_version=data.board_version,
created_at=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
)
def list_devices(
status: str | None = None,
hw_type: str | None = None,
search: str | None = None,
limit: int = 100,
offset: int = 0,
) -> list[DeviceInventoryItem]:
db = get_db()
query = db.collection(COLLECTION)
if status:
query = query.where("mfg_status", "==", status)
if hw_type:
query = query.where("hw_type", "==", hw_type)
docs = list(query.stream())
items = [_doc_to_inventory_item(doc) for doc in docs]
if search:
search_lower = search.lower()
items = [
item for item in items
if search_lower in (item.serial_number or "").lower()
or search_lower in (item.owner or "").lower()
or search_lower in (item.mfg_batch_id or "").lower()
]
return items[offset: offset + limit]
def get_device_by_sn(sn: str) -> DeviceInventoryItem:
db = get_db()
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise NotFoundError("Device")
return _doc_to_inventory_item(docs[0])
def update_device_status(sn: str, data: DeviceStatusUpdate, set_by: str | None = None) -> DeviceInventoryItem:
db = get_db()
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise NotFoundError("Device")
doc_ref = docs[0].reference
doc_data = docs[0].to_dict() or {}
now = datetime.now(timezone.utc).isoformat()
history = doc_data.get("lifecycle_history") or []
# Append new lifecycle entry
new_entry = {
"status_id": data.status.value,
"date": now,
"note": data.note if data.note else None,
"set_by": set_by,
}
history.append(new_entry)
update = {
"mfg_status": data.status.value,
"lifecycle_history": history,
}
if data.note:
update["mfg_status_note"] = data.note
doc_ref.update(update)
return _doc_to_inventory_item(doc_ref.get())
def get_nvs_binary(sn: str, hw_type_override: str | None = None, hw_revision_override: str | None = None) -> bytes:
item = get_device_by_sn(sn)
return generate_nvs_binary(
serial_number=item.serial_number,
hw_family=hw_type_override if hw_type_override else item.hw_type,
hw_revision=hw_revision_override if hw_revision_override else item.hw_version,
)
def assign_device(sn: str, data: DeviceAssign) -> DeviceInventoryItem:
"""Assign a device to a customer by customer_id.
- Stores customer_id on the device doc.
- Adds the device to the customer's owned_items list.
- Sets mfg_status to 'sold' unless device is already 'claimed'.
"""
db = get_db()
CRM_COLLECTION = "crm_customers"
# Get device doc
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise NotFoundError("Device")
doc_data = docs[0].to_dict() or {}
doc_ref = docs[0].reference
current_status = doc_data.get("mfg_status", "manufactured")
# Get customer doc
customer_ref = db.collection(CRM_COLLECTION).document(data.customer_id)
customer_doc = customer_ref.get()
if not customer_doc.exists:
raise NotFoundError("Customer")
customer_data = customer_doc.to_dict() or {}
# Determine new status: don't downgrade claimed → sold
new_status = current_status if current_status == "claimed" else "sold"
now = datetime.now(timezone.utc).isoformat()
history = doc_data.get("lifecycle_history") or []
history.append({
"status_id": new_status,
"date": now,
"note": "Assigned to customer",
"set_by": None,
})
doc_ref.update({
"customer_id": data.customer_id,
"mfg_status": new_status,
"lifecycle_history": history,
})
# Add to customer's owned_items (avoid duplicates)
owned_items = customer_data.get("owned_items", []) or []
device_doc_id = docs[0].id
already_assigned = any(
item.get("type") == "console_device"
and item.get("console_device", {}).get("device_id") == device_doc_id
for item in owned_items
)
if not already_assigned:
device_name = doc_data.get("device_name") or BOARD_TYPE_LABELS.get(doc_data.get("hw_type", ""), sn)
owned_items.append({
"type": "console_device",
"console_device": {
"device_id": device_doc_id,
"serial_number": sn,
"label": device_name,
},
})
customer_ref.update({"owned_items": owned_items})
return _doc_to_inventory_item(doc_ref.get())
def search_customers(q: str) -> list:
"""Search crm_customers by name, email, phone, organization, or tags."""
db = get_db()
CRM_COLLECTION = "crm_customers"
docs = db.collection(CRM_COLLECTION).stream()
results = []
q_lower = q.lower().strip()
for doc in docs:
data = doc.to_dict() or {}
loc = data.get("location") or {}
loc = loc if isinstance(loc, dict) else {}
city = loc.get("city") or ""
searchable = " ".join(filter(None, [
data.get("name"), data.get("surname"),
data.get("email"), data.get("phone"), data.get("organization"),
loc.get("address"), loc.get("city"), loc.get("postal_code"),
loc.get("region"), loc.get("country"),
" ".join(data.get("tags") or []),
])).lower()
if not q_lower or q_lower in searchable:
results.append({
"id": doc.id,
"name": data.get("name") or "",
"surname": data.get("surname") or "",
"email": data.get("email") or "",
"organization": data.get("organization") or "",
"phone": data.get("phone") or "",
"city": city or "",
})
return results
def get_stats() -> ManufacturingStats:
db = get_db()
docs = list(db.collection(COLLECTION).stream())
all_statuses = ["manufactured", "flashed", "provisioned", "sold", "claimed", "decommissioned"]
counts = {s: 0 for s in all_statuses}
activity_candidates = []
for doc in docs:
data = doc.to_dict() or {}
status = data.get("mfg_status", "manufactured")
if status in counts:
counts[status] += 1
if status in ("provisioned", "sold", "claimed"):
# Use created_at as a proxy timestamp; Firestore DatetimeWithNanoseconds or plain datetime
ts = data.get("created_at")
if isinstance(ts, datetime):
ts_str = ts.strftime("%Y-%m-%dT%H:%M:%SZ")
else:
ts_str = str(ts) if ts else None
activity_candidates.append(RecentActivityItem(
serial_number=data.get("serial_number", ""),
hw_type=data.get("hw_type", ""),
mfg_status=status,
owner=data.get("owner"),
updated_at=ts_str,
))
# Sort by updated_at descending, take latest 10
activity_candidates.sort(
key=lambda x: x.updated_at or "",
reverse=True,
)
recent = activity_candidates[:10]
return ManufacturingStats(counts=counts, recent_activity=recent)
PROTECTED_STATUSES = {"sold", "claimed"}
def delete_device(sn: str, force: bool = False) -> None:
"""Delete a device by serial number.
Raises PermissionError if the device is sold/claimed and force is not set.
The frontend uses force=True only after the user confirms by typing the SN.
"""
db = get_db()
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
if not docs:
raise NotFoundError("Device")
data = docs[0].to_dict() or {}
status = data.get("mfg_status", "manufactured")
if status in PROTECTED_STATUSES and not force:
raise PermissionError(
f"Device {sn} has status '{status}' and cannot be deleted without explicit confirmation."
)
docs[0].reference.delete()
def delete_unprovisioned_devices() -> list[str]:
"""Delete all devices with status 'manufactured' (never flashed/provisioned).
Returns the list of deleted serial numbers.
"""
db = get_db()
docs = list(db.collection(COLLECTION).where("mfg_status", "==", "manufactured").stream())
deleted = []
for doc in docs:
data = doc.to_dict() or {}
sn = data.get("serial_number", "")
doc.reference.delete()
deleted.append(sn)
return deleted
def _flash_asset_path(hw_type: str, asset: str) -> Path:
"""Return path to a flash asset (bootloader.bin or partitions.bin) for a given hw_type."""
return Path(settings.flash_assets_storage_path) / hw_type / asset
def save_flash_asset(hw_type: str, asset: str, data: bytes) -> Path:
"""Persist a flash asset binary. asset must be 'bootloader.bin' or 'partitions.bin'."""
if asset not in ("bootloader.bin", "partitions.bin"):
raise ValueError(f"Unknown flash asset: {asset}")
path = _flash_asset_path(hw_type, asset)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
return path
def get_flash_asset(hw_type: str, asset: str) -> bytes:
"""Load a flash asset binary. Raises NotFoundError if not uploaded yet."""
path = _flash_asset_path(hw_type, asset)
if not path.exists():
raise NotFoundError(f"Flash asset '{asset}' for hw_type '{hw_type}' — upload it first via POST /api/manufacturing/flash-assets/{{hw_type}}/{{asset}}")
return path.read_bytes()
def get_firmware_url(sn: str) -> str:
"""Return the FastAPI download URL for the latest stable firmware for this device's hw_type."""
from firmware.service import get_latest
item = get_device_by_sn(sn)
hw_type = item.hw_type.lower()
latest = get_latest(hw_type, "stable")
# download_url is a relative path like /api/firmware/vs/stable/1.4.2/firmware.bin
return latest.download_url