228 lines
7.1 KiB
Python
228 lines
7.1 KiB
Python
import random
|
|
import string
|
|
from datetime import datetime, timezone
|
|
|
|
from shared.firebase import get_db
|
|
from shared.exceptions import NotFoundError
|
|
from utils.serial_number import generate_serial
|
|
from utils.nvs_generator import generate as generate_nvs_binary
|
|
from manufacturing.models import BatchCreate, BatchResponse, DeviceInventoryItem, DeviceStatusUpdate, DeviceAssign, ManufacturingStats, RecentActivityItem
|
|
|
|
COLLECTION = "devices"
|
|
_BATCH_ID_CHARS = string.ascii_uppercase + string.digits
|
|
|
|
|
|
def _make_batch_id() -> str:
|
|
today = datetime.utcnow().strftime("%y%m%d")
|
|
suffix = "".join(random.choices(_BATCH_ID_CHARS, k=4))
|
|
return f"BATCH-{today}-{suffix}"
|
|
|
|
|
|
def _get_existing_sns(db) -> set:
|
|
existing = set()
|
|
for doc in db.collection(COLLECTION).select(["serial_number"]).stream():
|
|
data = doc.to_dict()
|
|
sn = data.get("serial_number")
|
|
if sn:
|
|
existing.add(sn)
|
|
return existing
|
|
|
|
|
|
def _doc_to_inventory_item(doc) -> DeviceInventoryItem:
|
|
data = doc.to_dict() or {}
|
|
created_raw = data.get("created_at")
|
|
if isinstance(created_raw, datetime):
|
|
created_str = created_raw.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
created_str = str(created_raw) if created_raw else None
|
|
|
|
return DeviceInventoryItem(
|
|
id=doc.id,
|
|
serial_number=data.get("serial_number", ""),
|
|
hw_type=data.get("hw_type", ""),
|
|
hw_version=data.get("hw_version", ""),
|
|
mfg_status=data.get("mfg_status", "manufactured"),
|
|
mfg_batch_id=data.get("mfg_batch_id"),
|
|
created_at=created_str,
|
|
owner=data.get("owner"),
|
|
assigned_to=data.get("assigned_to"),
|
|
)
|
|
|
|
|
|
def create_batch(data: BatchCreate) -> BatchResponse:
|
|
db = get_db()
|
|
existing_sns = _get_existing_sns(db)
|
|
batch_id = _make_batch_id()
|
|
now = datetime.now(timezone.utc)
|
|
serial_numbers = []
|
|
|
|
for _ in range(data.quantity):
|
|
for attempt in range(200):
|
|
sn = generate_serial(data.board_type.value, data.board_version)
|
|
if sn not in existing_sns:
|
|
existing_sns.add(sn)
|
|
break
|
|
else:
|
|
raise RuntimeError("Could not generate unique serial numbers — collision limit hit")
|
|
|
|
db.collection(COLLECTION).add({
|
|
"serial_number": sn,
|
|
"hw_type": data.board_type.value,
|
|
"hw_version": data.board_version,
|
|
"mfg_status": "manufactured",
|
|
"mfg_batch_id": batch_id,
|
|
"created_at": now,
|
|
"owner": None,
|
|
"assigned_to": None,
|
|
"users_list": [],
|
|
# Legacy fields left empty so existing device views don't break
|
|
"device_name": "",
|
|
"device_location": "",
|
|
"is_Online": False,
|
|
})
|
|
serial_numbers.append(sn)
|
|
|
|
return BatchResponse(
|
|
batch_id=batch_id,
|
|
serial_numbers=serial_numbers,
|
|
board_type=data.board_type.value,
|
|
board_version=data.board_version,
|
|
created_at=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
)
|
|
|
|
|
|
def list_devices(
|
|
status: str | None = None,
|
|
hw_type: str | None = None,
|
|
search: str | None = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> list[DeviceInventoryItem]:
|
|
db = get_db()
|
|
query = db.collection(COLLECTION)
|
|
|
|
if status:
|
|
query = query.where("mfg_status", "==", status)
|
|
if hw_type:
|
|
query = query.where("hw_type", "==", hw_type)
|
|
|
|
docs = list(query.stream())
|
|
items = [_doc_to_inventory_item(doc) for doc in docs]
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
items = [
|
|
item for item in items
|
|
if search_lower in (item.serial_number or "").lower()
|
|
or search_lower in (item.owner or "").lower()
|
|
or search_lower in (item.mfg_batch_id or "").lower()
|
|
]
|
|
|
|
return items[offset: offset + limit]
|
|
|
|
|
|
def get_device_by_sn(sn: str) -> DeviceInventoryItem:
|
|
db = get_db()
|
|
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
|
|
if not docs:
|
|
raise NotFoundError("Device")
|
|
return _doc_to_inventory_item(docs[0])
|
|
|
|
|
|
def update_device_status(sn: str, data: DeviceStatusUpdate) -> DeviceInventoryItem:
|
|
db = get_db()
|
|
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
|
|
if not docs:
|
|
raise NotFoundError("Device")
|
|
|
|
doc_ref = docs[0].reference
|
|
update = {"mfg_status": data.status.value}
|
|
if data.note:
|
|
update["mfg_status_note"] = data.note
|
|
doc_ref.update(update)
|
|
|
|
return _doc_to_inventory_item(doc_ref.get())
|
|
|
|
|
|
def get_nvs_binary(sn: str) -> bytes:
|
|
item = get_device_by_sn(sn)
|
|
return generate_nvs_binary(
|
|
serial_number=item.serial_number,
|
|
hw_type=item.hw_type,
|
|
hw_version=item.hw_version,
|
|
)
|
|
|
|
|
|
def assign_device(sn: str, data: DeviceAssign) -> DeviceInventoryItem:
|
|
from utils.email import send_device_assignment_invite
|
|
|
|
db = get_db()
|
|
docs = list(db.collection(COLLECTION).where("serial_number", "==", sn).limit(1).stream())
|
|
if not docs:
|
|
raise NotFoundError("Device")
|
|
|
|
doc_ref = docs[0].reference
|
|
doc_ref.update({
|
|
"owner": data.customer_email,
|
|
"assigned_to": data.customer_email,
|
|
"mfg_status": "sold",
|
|
})
|
|
|
|
send_device_assignment_invite(
|
|
customer_email=data.customer_email,
|
|
serial_number=sn,
|
|
customer_name=data.customer_name,
|
|
)
|
|
|
|
return _doc_to_inventory_item(doc_ref.get())
|
|
|
|
|
|
def get_stats() -> ManufacturingStats:
|
|
db = get_db()
|
|
docs = list(db.collection(COLLECTION).stream())
|
|
|
|
all_statuses = ["manufactured", "flashed", "provisioned", "sold", "claimed", "decommissioned"]
|
|
counts = {s: 0 for s in all_statuses}
|
|
|
|
activity_candidates = []
|
|
for doc in docs:
|
|
data = doc.to_dict() or {}
|
|
status = data.get("mfg_status", "manufactured")
|
|
if status in counts:
|
|
counts[status] += 1
|
|
|
|
if status in ("provisioned", "sold", "claimed"):
|
|
# Use created_at as a proxy timestamp; Firestore DatetimeWithNanoseconds or plain datetime
|
|
ts = data.get("created_at")
|
|
if isinstance(ts, datetime):
|
|
ts_str = ts.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
ts_str = str(ts) if ts else None
|
|
|
|
activity_candidates.append(RecentActivityItem(
|
|
serial_number=data.get("serial_number", ""),
|
|
hw_type=data.get("hw_type", ""),
|
|
mfg_status=status,
|
|
owner=data.get("owner"),
|
|
updated_at=ts_str,
|
|
))
|
|
|
|
# Sort by updated_at descending, take latest 10
|
|
activity_candidates.sort(
|
|
key=lambda x: x.updated_at or "",
|
|
reverse=True,
|
|
)
|
|
recent = activity_candidates[:10]
|
|
|
|
return ManufacturingStats(counts=counts, recent_activity=recent)
|
|
|
|
|
|
def get_firmware_url(sn: str) -> str:
|
|
"""Return the FastAPI download URL for the latest stable firmware for this device's hw_type."""
|
|
from firmware.service import get_latest
|
|
item = get_device_by_sn(sn)
|
|
hw_type = item.hw_type.lower()
|
|
latest = get_latest(hw_type, "stable")
|
|
# download_url is a relative path like /api/firmware/vs/stable/1.4.2/firmware.bin
|
|
return latest.download_url
|