import random import string from datetime import datetime, timezone from shared.firebase import get_db from shared.exceptions import NotFoundError from utils.serial_number import generate_serial from utils.nvs_generator import generate as generate_nvs_binary from manufacturing.models import BatchCreate, BatchResponse, DeviceInventoryItem, DeviceStatusUpdate, DeviceAssign, ManufacturingStats, RecentActivityItem COLLECTION = "devices" _BATCH_ID_CHARS = string.ascii_uppercase + string.digits def _make_batch_id() -> str: today = datetime.utcnow().strftime("%y%m%d") suffix = "".join(random.choices(_BATCH_ID_CHARS, k=4)) return f"BATCH-{today}-{suffix}" def _get_existing_sns(db) -> set: existing = set() for doc in db.collection(COLLECTION).select(["serial_number"]).stream(): data = doc.to_dict() sn = data.get("serial_number") if sn: existing.add(sn) return existing def _doc_to_inventory_item(doc) -> DeviceInventoryItem: data = doc.to_dict() or {} created_raw = data.get("created_at") if isinstance(created_raw, datetime): created_str = created_raw.strftime("%Y-%m-%dT%H:%M:%SZ") else: created_str = str(created_raw) if created_raw else None return DeviceInventoryItem( id=doc.id, serial_number=data.get("serial_number", ""), hw_type=data.get("hw_type", ""), hw_version=data.get("hw_version", ""), mfg_status=data.get("mfg_status", "manufactured"), mfg_batch_id=data.get("mfg_batch_id"), created_at=created_str, owner=data.get("owner"), assigned_to=data.get("assigned_to"), device_name=data.get("device_name") or None, ) def create_batch(data: BatchCreate) -> BatchResponse: db = get_db() existing_sns = _get_existing_sns(db) batch_id = _make_batch_id() now = datetime.now(timezone.utc) serial_numbers = [] for _ in range(data.quantity): for attempt in range(200): sn = generate_serial(data.board_type.value, data.board_version) if sn not in existing_sns: existing_sns.add(sn) break else: raise RuntimeError("Could not generate unique serial numbers — collision limit hit") db.collection(COLLECTION).add({ "serial_number": sn, "hw_type": data.board_type.value, "hw_version": data.board_version, "mfg_status": "manufactured", "mfg_batch_id": batch_id, "created_at": now, "owner": None, "assigned_to": None, "users_list": [], # Legacy fields left empty so existing device views don't break "device_name": "", "device_location": "", "is_Online": False, }) serial_numbers.append(sn) return BatchResponse( batch_id=batch_id, serial_numbers=serial_numbers, board_type=data.board_type.value, board_version=data.board_version, created_at=now.strftime("%Y-%m-%dT%H:%M:%SZ"), ) def list_devices( status: str | None = None, hw_type: str | None = None, search: str | None = None, limit: int = 100, offset: int = 0, ) -> list[DeviceInventoryItem]: db = get_db() query = db.collection(COLLECTION) if status: query = query.where("mfg_status", "==", status) if hw_type: query = query.where("hw_type", "==", hw_type) docs = list(query.stream()) items = [_doc_to_inventory_item(doc) for doc in docs] if search: search_lower = search.lower() items = [ item for item in items if search_lower in (item.serial_number or "").lower() or search_lower in (item.owner or "").lower() or search_lower in (item.mfg_batch_id or "").lower() ] return items[offset: offset + limit] def get_device_by_sn(sn: str) -> DeviceInventoryItem: db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") return _doc_to_inventory_item(docs[0]) def update_device_status(sn: str, data: DeviceStatusUpdate) -> DeviceInventoryItem: db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") doc_ref = docs[0].reference update = {"mfg_status": data.status.value} if data.note: update["mfg_status_note"] = data.note doc_ref.update(update) return _doc_to_inventory_item(doc_ref.get()) def get_nvs_binary(sn: str) -> bytes: item = get_device_by_sn(sn) return generate_nvs_binary( serial_number=item.serial_number, hw_type=item.hw_type, hw_version=item.hw_version, ) def assign_device(sn: str, data: DeviceAssign) -> DeviceInventoryItem: from utils.email import send_device_assignment_invite db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") doc_ref = docs[0].reference doc_ref.update({ "owner": data.customer_email, "assigned_to": data.customer_email, "mfg_status": "sold", }) send_device_assignment_invite( customer_email=data.customer_email, serial_number=sn, customer_name=data.customer_name, ) return _doc_to_inventory_item(doc_ref.get()) def get_stats() -> ManufacturingStats: db = get_db() docs = list(db.collection(COLLECTION).stream()) all_statuses = ["manufactured", "flashed", "provisioned", "sold", "claimed", "decommissioned"] counts = {s: 0 for s in all_statuses} activity_candidates = [] for doc in docs: data = doc.to_dict() or {} status = data.get("mfg_status", "manufactured") if status in counts: counts[status] += 1 if status in ("provisioned", "sold", "claimed"): # Use created_at as a proxy timestamp; Firestore DatetimeWithNanoseconds or plain datetime ts = data.get("created_at") if isinstance(ts, datetime): ts_str = ts.strftime("%Y-%m-%dT%H:%M:%SZ") else: ts_str = str(ts) if ts else None activity_candidates.append(RecentActivityItem( serial_number=data.get("serial_number", ""), hw_type=data.get("hw_type", ""), mfg_status=status, owner=data.get("owner"), updated_at=ts_str, )) # Sort by updated_at descending, take latest 10 activity_candidates.sort( key=lambda x: x.updated_at or "", reverse=True, ) recent = activity_candidates[:10] return ManufacturingStats(counts=counts, recent_activity=recent) PROTECTED_STATUSES = {"sold", "claimed"} def delete_device(sn: str, force: bool = False) -> None: """Delete a device by serial number. Raises PermissionError if the device is sold/claimed and force is not set. The frontend uses force=True only after the user confirms by typing the SN. """ db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") data = docs[0].to_dict() or {} status = data.get("mfg_status", "manufactured") if status in PROTECTED_STATUSES and not force: raise PermissionError( f"Device {sn} has status '{status}' and cannot be deleted without explicit confirmation." ) docs[0].reference.delete() def delete_unprovisioned_devices() -> list[str]: """Delete all devices with status 'manufactured' (never flashed/provisioned). Returns the list of deleted serial numbers. """ db = get_db() docs = list(db.collection(COLLECTION).where("mfg_status", "==", "manufactured").stream()) deleted = [] for doc in docs: data = doc.to_dict() or {} sn = data.get("serial_number", "") doc.reference.delete() deleted.append(sn) return deleted def get_firmware_url(sn: str) -> str: """Return the FastAPI download URL for the latest stable firmware for this device's hw_type.""" from firmware.service import get_latest item = get_device_by_sn(sn) hw_type = item.hw_type.lower() latest = get_latest(hw_type, "stable") # download_url is a relative path like /api/firmware/vs/stable/1.4.2/firmware.bin return latest.download_url