import logging import random import string from datetime import datetime, timezone from pathlib import Path logger = logging.getLogger(__name__) from config import settings from shared.firebase import get_db from shared.exceptions import NotFoundError from utils.serial_number import generate_serial from utils.nvs_generator import generate as generate_nvs_binary from manufacturing.models import BatchCreate, BatchResponse, DeviceInventoryItem, DeviceStatusUpdate, DeviceAssign, ManufacturingStats, RecentActivityItem, BOARD_TYPE_LABELS COLLECTION = "devices" _BATCH_ID_CHARS = string.ascii_uppercase + string.digits def _make_batch_id() -> str: today = datetime.utcnow().strftime("%y%m%d") suffix = "".join(random.choices(_BATCH_ID_CHARS, k=4)) return f"BATCH-{today}-{suffix}" def _get_existing_sns(db) -> set: existing = set() for doc in db.collection(COLLECTION).select(["serial_number"]).stream(): data = doc.to_dict() sn = data.get("serial_number") if sn: existing.add(sn) return existing def _resolve_user_list(raw_list: list) -> list[str]: """Convert user_list entries (DocumentReferences or path strings) to plain user ID strings.""" from google.cloud.firestore_v1 import DocumentReference result = [] for entry in raw_list: if isinstance(entry, DocumentReference): result.append(entry.id) elif isinstance(entry, str): result.append(entry.split("/")[-1]) return result def _doc_to_inventory_item(doc) -> DeviceInventoryItem: data = doc.to_dict() or {} created_raw = data.get("created_at") if isinstance(created_raw, datetime): created_str = created_raw.strftime("%Y-%m-%dT%H:%M:%SZ") else: created_str = str(created_raw) if created_raw else None return DeviceInventoryItem( id=doc.id, serial_number=data.get("serial_number", ""), hw_type=data.get("hw_type", ""), hw_version=data.get("hw_version", ""), mfg_status=data.get("mfg_status", "manufactured"), mfg_batch_id=data.get("mfg_batch_id"), created_at=created_str, owner=data.get("owner"), assigned_to=data.get("assigned_to"), device_name=data.get("device_name") or None, lifecycle_history=data.get("lifecycle_history") or [], customer_id=data.get("customer_id"), user_list=_resolve_user_list(data.get("user_list") or []), ) def create_batch(data: BatchCreate) -> BatchResponse: db = get_db() existing_sns = _get_existing_sns(db) batch_id = _make_batch_id() now = datetime.now(timezone.utc) serial_numbers = [] for _ in range(data.quantity): for attempt in range(200): sn = generate_serial(data.board_type.value, data.board_version) if sn not in existing_sns: existing_sns.add(sn) break else: raise RuntimeError("Could not generate unique serial numbers — collision limit hit") db.collection(COLLECTION).add({ "serial_number": sn, "hw_type": data.board_type.value, "hw_version": data.board_version, "mfg_status": "manufactured", "mfg_batch_id": batch_id, "created_at": now, "owner": None, "assigned_to": None, "user_list": [], # Legacy fields left empty so existing device views don't break "device_name": "", "device_location": "", "is_Online": False, "lifecycle_history": [ { "status_id": "manufactured", "date": now.isoformat(), "note": None, "set_by": None, } ], }) serial_numbers.append(sn) return BatchResponse( batch_id=batch_id, serial_numbers=serial_numbers, board_type=data.board_type.value, board_version=data.board_version, created_at=now.strftime("%Y-%m-%dT%H:%M:%SZ"), ) def list_devices( status: str | None = None, hw_type: str | None = None, search: str | None = None, limit: int = 100, offset: int = 0, ) -> list[DeviceInventoryItem]: db = get_db() query = db.collection(COLLECTION) if status: query = query.where("mfg_status", "==", status) if hw_type: query = query.where("hw_type", "==", hw_type) docs = list(query.stream()) items = [_doc_to_inventory_item(doc) for doc in docs] if search: search_lower = search.lower() items = [ item for item in items if search_lower in (item.serial_number or "").lower() or search_lower in (item.owner or "").lower() or search_lower in (item.mfg_batch_id or "").lower() ] return items[offset: offset + limit] def get_device_by_sn(sn: str) -> DeviceInventoryItem: db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") return _doc_to_inventory_item(docs[0]) def update_device_status(sn: str, data: DeviceStatusUpdate, set_by: str | None = None) -> DeviceInventoryItem: db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") doc_ref = docs[0].reference doc_data = docs[0].to_dict() or {} now = datetime.now(timezone.utc).isoformat() history = list(doc_data.get("lifecycle_history") or []) # Upsert lifecycle entry — overwrite existing entry for this status if present new_entry = { "status_id": data.status.value, "date": now, "note": data.note if data.note else None, "set_by": set_by, } existing_idx = next( (i for i, e in enumerate(history) if e.get("status_id") == data.status.value), None, ) if existing_idx is not None: history[existing_idx] = new_entry else: history.append(new_entry) update = { "mfg_status": data.status.value, "lifecycle_history": history, } if data.note: update["mfg_status_note"] = data.note doc_ref.update(update) return _doc_to_inventory_item(doc_ref.get()) def get_nvs_binary(sn: str, hw_type_override: str | None = None, hw_revision_override: str | None = None, legacy: bool = False) -> bytes: item = get_device_by_sn(sn) return generate_nvs_binary( serial_number=item.serial_number, hw_family=hw_type_override if hw_type_override else item.hw_type, hw_revision=hw_revision_override if hw_revision_override else item.hw_version, legacy=legacy, ) def assign_device(sn: str, data: DeviceAssign) -> DeviceInventoryItem: """Assign a device to a customer by customer_id. - Stores customer_id on the device doc. - Adds the device to the customer's owned_items list. - Sets mfg_status to 'sold' unless device is already 'claimed'. """ db = get_db() CRM_COLLECTION = "crm_customers" # Get device doc docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") doc_data = docs[0].to_dict() or {} doc_ref = docs[0].reference current_status = doc_data.get("mfg_status", "manufactured") # Get customer doc customer_ref = db.collection(CRM_COLLECTION).document(data.customer_id) customer_doc = customer_ref.get() if not customer_doc.exists: raise NotFoundError("Customer") customer_data = customer_doc.to_dict() or {} # Determine new status: don't downgrade claimed → sold new_status = current_status if current_status == "claimed" else "sold" now = datetime.now(timezone.utc).isoformat() history = doc_data.get("lifecycle_history") or [] history.append({ "status_id": new_status, "date": now, "note": "Assigned to customer", "set_by": None, }) doc_ref.update({ "customer_id": data.customer_id, "mfg_status": new_status, "lifecycle_history": history, }) # Add to customer's owned_items (avoid duplicates) owned_items = customer_data.get("owned_items", []) or [] device_doc_id = docs[0].id already_assigned = any( item.get("type") == "console_device" and item.get("console_device", {}).get("device_id") == device_doc_id for item in owned_items ) if not already_assigned: device_name = doc_data.get("device_name") or BOARD_TYPE_LABELS.get(doc_data.get("hw_type", ""), sn) owned_items.append({ "type": "console_device", "console_device": { "device_id": device_doc_id, "serial_number": sn, "label": device_name, }, }) customer_ref.update({"owned_items": owned_items}) return _doc_to_inventory_item(doc_ref.get()) def search_customers(q: str) -> list: """Search crm_customers by name, email, phone, organization, or tags.""" db = get_db() CRM_COLLECTION = "crm_customers" docs = db.collection(CRM_COLLECTION).stream() results = [] q_lower = q.lower().strip() for doc in docs: data = doc.to_dict() or {} loc = data.get("location") or {} loc = loc if isinstance(loc, dict) else {} city = loc.get("city") or "" searchable = " ".join(filter(None, [ data.get("name"), data.get("surname"), data.get("email"), data.get("phone"), data.get("organization"), loc.get("address"), loc.get("city"), loc.get("postal_code"), loc.get("region"), loc.get("country"), " ".join(data.get("tags") or []), ])).lower() if not q_lower or q_lower in searchable: results.append({ "id": doc.id, "name": data.get("name") or "", "surname": data.get("surname") or "", "email": data.get("email") or "", "organization": data.get("organization") or "", "phone": data.get("phone") or "", "city": city or "", }) return results def get_stats() -> ManufacturingStats: db = get_db() docs = list(db.collection(COLLECTION).stream()) all_statuses = ["manufactured", "flashed", "provisioned", "sold", "claimed", "decommissioned"] counts = {s: 0 for s in all_statuses} activity_candidates = [] for doc in docs: data = doc.to_dict() or {} status = data.get("mfg_status", "manufactured") if status in counts: counts[status] += 1 if status in ("provisioned", "sold", "claimed"): # Use created_at as a proxy timestamp; Firestore DatetimeWithNanoseconds or plain datetime ts = data.get("created_at") if isinstance(ts, datetime): ts_str = ts.strftime("%Y-%m-%dT%H:%M:%SZ") else: ts_str = str(ts) if ts else None activity_candidates.append(RecentActivityItem( serial_number=data.get("serial_number", ""), hw_type=data.get("hw_type", ""), mfg_status=status, owner=data.get("owner"), updated_at=ts_str, )) # Sort by updated_at descending, take latest 10 activity_candidates.sort( key=lambda x: x.updated_at or "", reverse=True, ) recent = activity_candidates[:10] return ManufacturingStats(counts=counts, recent_activity=recent) PROTECTED_STATUSES = {"sold", "claimed"} def delete_device(sn: str, force: bool = False) -> None: """Delete a device by serial number. Raises PermissionError if the device is sold/claimed and force is not set. The frontend uses force=True only after the user confirms by typing the SN. """ db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") data = docs[0].to_dict() or {} status = data.get("mfg_status", "manufactured") if status in PROTECTED_STATUSES and not force: raise PermissionError( f"Device {sn} has status '{status}' and cannot be deleted without explicit confirmation." ) docs[0].reference.delete() def delete_unprovisioned_devices() -> list[str]: """Delete all devices with status 'manufactured' (never flashed/provisioned). Returns the list of deleted serial numbers. """ db = get_db() docs = list(db.collection(COLLECTION).where("mfg_status", "==", "manufactured").stream()) deleted = [] for doc in docs: data = doc.to_dict() or {} sn = data.get("serial_number", "") doc.reference.delete() deleted.append(sn) return deleted KNOWN_HW_TYPES = ["vesper", "vesper_plus", "vesper_pro", "agnus", "agnus_mini", "chronos", "chronos_pro"] FLASH_ASSET_FILES = ["bootloader.bin", "partitions.bin"] def _flash_asset_path(hw_type: str, asset: str) -> Path: """Return path to a flash asset (bootloader.bin or partitions.bin) for a given hw_type.""" return Path(settings.flash_assets_storage_path) / hw_type / asset def _flash_asset_info(hw_type: str) -> dict: """Build the asset info dict for a single hw_type by inspecting the filesystem.""" base = Path(settings.flash_assets_storage_path) / hw_type note_path = base / "note.txt" note = note_path.read_text(encoding="utf-8").strip() if note_path.exists() else "" files = {} for fname in FLASH_ASSET_FILES: p = base / fname if p.exists(): stat = p.stat() files[fname] = { "exists": True, "size_bytes": stat.st_size, "uploaded_at": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), } else: files[fname] = {"exists": False, "size_bytes": None, "uploaded_at": None} return { "hw_type": hw_type, "bootloader": files["bootloader.bin"], "partitions": files["partitions.bin"], "note": note, } def list_flash_assets() -> list: """Return asset status for all known board types plus any discovered bespoke directories.""" base = Path(settings.flash_assets_storage_path) results = [] # Always include all known hw types, even if no files uploaded yet seen = set(KNOWN_HW_TYPES) for hw_type in KNOWN_HW_TYPES: results.append(_flash_asset_info(hw_type)) # Discover bespoke directories (anything in storage/flash_assets/ not in known list) if base.exists(): for entry in sorted(base.iterdir()): if entry.is_dir() and entry.name not in seen: seen.add(entry.name) info = _flash_asset_info(entry.name) info["is_bespoke"] = True results.append(info) # Mark known types for r in results: r.setdefault("is_bespoke", False) return results def save_flash_asset(hw_type: str, asset: str, data: bytes) -> Path: """Persist a flash asset binary. asset must be 'bootloader.bin' or 'partitions.bin'.""" if asset not in ("bootloader.bin", "partitions.bin"): raise ValueError(f"Unknown flash asset: {asset}") path = _flash_asset_path(hw_type, asset) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(data) return path def delete_flash_asset(hw_type: str, asset: str) -> None: """Delete a flash asset file. Raises NotFoundError if not present.""" path = _flash_asset_path(hw_type, asset) if not path.exists(): raise NotFoundError(f"Flash asset '{asset}' for '{hw_type}' not found") path.unlink() def set_flash_asset_note(hw_type: str, note: str) -> None: """Write (or clear) the note for a hw_type's flash asset directory.""" base = Path(settings.flash_assets_storage_path) / hw_type base.mkdir(parents=True, exist_ok=True) note_path = base / "note.txt" if note.strip(): note_path.write_text(note.strip(), encoding="utf-8") elif note_path.exists(): note_path.unlink() def get_flash_asset(hw_type: str, asset: str) -> bytes: """Load a flash asset binary. Raises NotFoundError if not uploaded yet.""" path = _flash_asset_path(hw_type, asset) if not path.exists(): raise NotFoundError(f"Flash asset '{asset}' for hw_type '{hw_type}' — upload it first via POST /api/manufacturing/flash-assets/{{hw_type}}/{{asset}}") return path.read_bytes() def get_firmware_url(sn: str) -> str: """Return the FastAPI download URL for the latest stable firmware for this device's hw_type.""" from firmware.service import get_latest item = get_device_by_sn(sn) hw_type = item.hw_type.lower() latest = get_latest(hw_type, "stable") # download_url is a relative path like /api/firmware/vs/stable/1.4.2/firmware.bin return latest.download_url