import random import string from datetime import datetime, timezone from shared.firebase import get_db from shared.exceptions import NotFoundError from utils.serial_number import generate_serial from utils.nvs_generator import generate as generate_nvs_binary from manufacturing.models import BatchCreate, BatchResponse, DeviceInventoryItem, DeviceStatusUpdate COLLECTION = "devices" _BATCH_ID_CHARS = string.ascii_uppercase + string.digits def _make_batch_id() -> str: today = datetime.utcnow().strftime("%y%m%d") suffix = "".join(random.choices(_BATCH_ID_CHARS, k=4)) return f"BATCH-{today}-{suffix}" def _get_existing_sns(db) -> set: existing = set() for doc in db.collection(COLLECTION).select(["serial_number"]).stream(): data = doc.to_dict() sn = data.get("serial_number") if sn: existing.add(sn) return existing def _doc_to_inventory_item(doc) -> DeviceInventoryItem: data = doc.to_dict() or {} created_raw = data.get("created_at") if isinstance(created_raw, datetime): created_str = created_raw.strftime("%Y-%m-%dT%H:%M:%SZ") else: created_str = str(created_raw) if created_raw else None return DeviceInventoryItem( id=doc.id, serial_number=data.get("serial_number", ""), hw_type=data.get("hw_type", ""), hw_version=data.get("hw_version", ""), mfg_status=data.get("mfg_status", "manufactured"), mfg_batch_id=data.get("mfg_batch_id"), created_at=created_str, owner=data.get("owner"), assigned_to=data.get("assigned_to"), ) def create_batch(data: BatchCreate) -> BatchResponse: db = get_db() existing_sns = _get_existing_sns(db) batch_id = _make_batch_id() now = datetime.now(timezone.utc) serial_numbers = [] for _ in range(data.quantity): for attempt in range(200): sn = generate_serial(data.board_type.value, data.board_version) if sn not in existing_sns: existing_sns.add(sn) break else: raise RuntimeError("Could not generate unique serial numbers — collision limit hit") db.collection(COLLECTION).add({ "serial_number": sn, "hw_type": data.board_type.value, "hw_version": data.board_version, "mfg_status": "manufactured", "mfg_batch_id": batch_id, "created_at": now, "owner": None, "assigned_to": None, "users_list": [], # Legacy fields left empty so existing device views don't break "device_name": "", "device_location": "", "is_Online": False, }) serial_numbers.append(sn) return BatchResponse( batch_id=batch_id, serial_numbers=serial_numbers, board_type=data.board_type.value, board_version=data.board_version, created_at=now.strftime("%Y-%m-%dT%H:%M:%SZ"), ) def list_devices( status: str | None = None, hw_type: str | None = None, search: str | None = None, limit: int = 100, offset: int = 0, ) -> list[DeviceInventoryItem]: db = get_db() query = db.collection(COLLECTION) if status: query = query.where("mfg_status", "==", status) if hw_type: query = query.where("hw_type", "==", hw_type) docs = list(query.stream()) items = [_doc_to_inventory_item(doc) for doc in docs] if search: search_lower = search.lower() items = [ item for item in items if search_lower in (item.serial_number or "").lower() or search_lower in (item.owner or "").lower() or search_lower in (item.mfg_batch_id or "").lower() ] return items[offset: offset + limit] def get_device_by_sn(sn: str) -> DeviceInventoryItem: db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") return _doc_to_inventory_item(docs[0]) def update_device_status(sn: str, data: DeviceStatusUpdate) -> DeviceInventoryItem: db = get_db() docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream()) if not docs: raise NotFoundError("Device") doc_ref = docs[0].reference update = {"mfg_status": data.status.value} if data.note: update["mfg_status_note"] = data.note doc_ref.update(update) return _doc_to_inventory_item(doc_ref.get()) def get_nvs_binary(sn: str) -> bytes: item = get_device_by_sn(sn) return generate_nvs_binary( serial_number=item.serial_number, hw_type=item.hw_type, hw_version=item.hw_version, )