399 lines
14 KiB
Python
399 lines
14 KiB
Python
import hashlib
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from config import settings
|
|
from shared.firebase import get_db
|
|
from shared.exceptions import NotFoundError
|
|
from firmware.models import FirmwareVersion, FirmwareMetadataResponse, UpdateType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
COLLECTION = "firmware_versions"
|
|
|
|
VALID_HW_TYPES = {"vesper", "vesper_plus", "vesper_pro", "chronos", "chronos_pro", "agnus", "agnus_mini", "bespoke"}
|
|
VALID_CHANNELS = {"stable", "beta", "alpha", "testing"}
|
|
|
|
|
|
def _storage_path(hw_type: str, channel: str, version: str) -> Path:
|
|
return Path(settings.firmware_storage_path) / hw_type / channel / version / "firmware.bin"
|
|
|
|
|
|
def _doc_to_firmware_version(doc) -> FirmwareVersion:
|
|
data = doc.to_dict() or {}
|
|
uploaded_raw = data.get("uploaded_at")
|
|
if isinstance(uploaded_raw, datetime):
|
|
uploaded_str = uploaded_raw.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
else:
|
|
uploaded_str = str(uploaded_raw) if uploaded_raw else ""
|
|
|
|
return FirmwareVersion(
|
|
id=doc.id,
|
|
hw_type=data.get("hw_type", ""),
|
|
channel=data.get("channel", ""),
|
|
version=data.get("version", ""),
|
|
filename=data.get("filename", "firmware.bin"),
|
|
size_bytes=data.get("size_bytes", 0),
|
|
sha256=data.get("sha256", ""),
|
|
update_type=data.get("update_type", UpdateType.mandatory),
|
|
min_fw_version=data.get("min_fw_version"),
|
|
uploaded_at=uploaded_str,
|
|
changelog=data.get("changelog"),
|
|
release_note=data.get("release_note"),
|
|
is_latest=data.get("is_latest", False),
|
|
bespoke_uid=data.get("bespoke_uid"),
|
|
)
|
|
|
|
|
|
def _fw_to_metadata_response(fw: FirmwareVersion) -> FirmwareMetadataResponse:
|
|
download_url = f"/api/firmware/{fw.hw_type}/{fw.channel}/{fw.version}/firmware.bin"
|
|
is_emergency = fw.update_type == UpdateType.emergency
|
|
is_mandatory = fw.update_type in (UpdateType.mandatory, UpdateType.emergency)
|
|
return FirmwareMetadataResponse(
|
|
hw_type=fw.hw_type,
|
|
channel=fw.channel, # firmware validates this matches requested channel
|
|
version=fw.version,
|
|
size=fw.size_bytes, # firmware reads "size"
|
|
size_bytes=fw.size_bytes, # kept for admin-panel consumers
|
|
sha256=fw.sha256,
|
|
update_type=fw.update_type, # urgency enum — for admin panel display
|
|
mandatory=is_mandatory, # firmware reads this to decide auto-apply
|
|
emergency=is_emergency, # firmware reads this to decide immediate apply
|
|
min_fw_version=fw.min_fw_version,
|
|
download_url=download_url,
|
|
uploaded_at=fw.uploaded_at,
|
|
release_note=fw.release_note,
|
|
)
|
|
|
|
|
|
def upload_firmware(
|
|
hw_type: str,
|
|
channel: str,
|
|
version: str,
|
|
file_bytes: bytes,
|
|
update_type: UpdateType = UpdateType.mandatory,
|
|
min_fw_version: str | None = None,
|
|
changelog: str | None = None,
|
|
release_note: str | None = None,
|
|
bespoke_uid: str | None = None,
|
|
) -> FirmwareVersion:
|
|
if hw_type not in VALID_HW_TYPES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid hw_type. Must be one of: {', '.join(sorted(VALID_HW_TYPES))}")
|
|
if channel not in VALID_CHANNELS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid channel. Must be one of: {', '.join(sorted(VALID_CHANNELS))}")
|
|
if hw_type == "bespoke" and not bespoke_uid:
|
|
raise HTTPException(status_code=400, detail="bespoke_uid is required when hw_type is 'bespoke'")
|
|
|
|
db = get_db()
|
|
sha256 = hashlib.sha256(file_bytes).hexdigest()
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# For bespoke firmware: if a firmware with the same bespoke_uid already exists,
|
|
# overwrite it (delete old doc + file, reuse same storage path keyed by uid).
|
|
if hw_type == "bespoke" and bespoke_uid:
|
|
existing_docs = list(
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", "bespoke")
|
|
.where("bespoke_uid", "==", bespoke_uid)
|
|
.stream()
|
|
)
|
|
for old_doc in existing_docs:
|
|
old_data = old_doc.to_dict() or {}
|
|
old_path = _storage_path("bespoke", old_data.get("channel", channel), old_data.get("version", version))
|
|
if old_path.exists():
|
|
old_path.unlink()
|
|
try:
|
|
old_path.parent.rmdir()
|
|
except OSError:
|
|
pass
|
|
old_doc.reference.delete()
|
|
|
|
dest = _storage_path(hw_type, channel, version)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_bytes(file_bytes)
|
|
|
|
doc_id = str(uuid.uuid4())
|
|
|
|
# Mark previous latest for this hw_type+channel as no longer latest
|
|
# (skip for bespoke — each bespoke_uid is its own independent firmware)
|
|
if hw_type != "bespoke":
|
|
prev_docs = (
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", hw_type)
|
|
.where("channel", "==", channel)
|
|
.where("is_latest", "==", True)
|
|
.stream()
|
|
)
|
|
for prev in prev_docs:
|
|
prev.reference.update({"is_latest": False})
|
|
|
|
doc_ref = db.collection(COLLECTION).document(doc_id)
|
|
doc_ref.set({
|
|
"hw_type": hw_type,
|
|
"channel": channel,
|
|
"version": version,
|
|
"filename": "firmware.bin",
|
|
"size_bytes": len(file_bytes),
|
|
"sha256": sha256,
|
|
"update_type": update_type.value,
|
|
"min_fw_version": min_fw_version,
|
|
"uploaded_at": now,
|
|
"changelog": changelog,
|
|
"release_note": release_note,
|
|
"is_latest": True,
|
|
"bespoke_uid": bespoke_uid,
|
|
})
|
|
|
|
return _doc_to_firmware_version(doc_ref.get())
|
|
|
|
|
|
def list_firmware(
|
|
hw_type: str | None = None,
|
|
channel: str | None = None,
|
|
) -> list[FirmwareVersion]:
|
|
db = get_db()
|
|
query = db.collection(COLLECTION)
|
|
if hw_type:
|
|
query = query.where("hw_type", "==", hw_type)
|
|
if channel:
|
|
query = query.where("channel", "==", channel)
|
|
|
|
docs = list(query.stream())
|
|
items = [_doc_to_firmware_version(doc) for doc in docs]
|
|
items.sort(key=lambda x: x.uploaded_at, reverse=True)
|
|
return items
|
|
|
|
|
|
def get_latest(hw_type: str, channel: str, hw_version: str | None = None, current_version: str | None = None) -> FirmwareMetadataResponse:
|
|
if hw_type not in VALID_HW_TYPES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid hw_type '{hw_type}'")
|
|
if hw_type == "bespoke":
|
|
raise HTTPException(status_code=400, detail="Bespoke firmware is not served via auto-update. Use the direct download URL.")
|
|
if channel not in VALID_CHANNELS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid channel '{channel}'")
|
|
|
|
db = get_db()
|
|
docs = list(
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", hw_type)
|
|
.where("channel", "==", channel)
|
|
.where("is_latest", "==", True)
|
|
.limit(1)
|
|
.stream()
|
|
)
|
|
if not docs:
|
|
raise NotFoundError("Firmware")
|
|
|
|
return _fw_to_metadata_response(_doc_to_firmware_version(docs[0]))
|
|
|
|
|
|
def get_version_info(hw_type: str, channel: str, version: str) -> FirmwareMetadataResponse:
|
|
"""Fetch metadata for a specific version. Used by devices resolving upgrade chains."""
|
|
if hw_type not in VALID_HW_TYPES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid hw_type '{hw_type}'")
|
|
if channel not in VALID_CHANNELS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid channel '{channel}'")
|
|
|
|
db = get_db()
|
|
docs = list(
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", hw_type)
|
|
.where("channel", "==", channel)
|
|
.where("version", "==", version)
|
|
.limit(1)
|
|
.stream()
|
|
)
|
|
if not docs:
|
|
raise NotFoundError("Firmware version")
|
|
|
|
return _fw_to_metadata_response(_doc_to_firmware_version(docs[0]))
|
|
|
|
|
|
def get_latest_changelog(hw_type: str, channel: str) -> str:
|
|
if hw_type not in VALID_HW_TYPES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid hw_type '{hw_type}'")
|
|
if channel not in VALID_CHANNELS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid channel '{channel}'")
|
|
|
|
db = get_db()
|
|
docs = list(
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", hw_type)
|
|
.where("channel", "==", channel)
|
|
.where("is_latest", "==", True)
|
|
.limit(1)
|
|
.stream()
|
|
)
|
|
if not docs:
|
|
raise NotFoundError("Firmware")
|
|
fw = _doc_to_firmware_version(docs[0])
|
|
if not fw.changelog:
|
|
raise NotFoundError("Changelog")
|
|
return fw.changelog
|
|
|
|
|
|
def get_version_changelog(hw_type: str, channel: str, version: str) -> str:
|
|
if hw_type not in VALID_HW_TYPES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid hw_type '{hw_type}'")
|
|
if channel not in VALID_CHANNELS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid channel '{channel}'")
|
|
|
|
db = get_db()
|
|
docs = list(
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", hw_type)
|
|
.where("channel", "==", channel)
|
|
.where("version", "==", version)
|
|
.limit(1)
|
|
.stream()
|
|
)
|
|
if not docs:
|
|
raise NotFoundError("Firmware version")
|
|
fw = _doc_to_firmware_version(docs[0])
|
|
if not fw.changelog:
|
|
raise NotFoundError("Changelog")
|
|
return fw.changelog
|
|
|
|
|
|
def get_firmware_path(hw_type: str, channel: str, version: str) -> Path:
|
|
path = _storage_path(hw_type, channel, version)
|
|
if not path.exists():
|
|
raise NotFoundError("Firmware binary")
|
|
return path
|
|
|
|
|
|
def record_ota_event(event_type: str, payload: dict[str, Any]) -> None:
|
|
"""Persist an OTA telemetry event (download or flash) to Firestore.
|
|
|
|
Best-effort — caller should not raise on failure.
|
|
"""
|
|
try:
|
|
db = get_db()
|
|
db.collection("ota_events").add({
|
|
"event_type": event_type,
|
|
"received_at": datetime.now(timezone.utc),
|
|
**payload,
|
|
})
|
|
except Exception as exc:
|
|
logger.warning("Failed to persist OTA event (%s): %s", event_type, exc)
|
|
|
|
|
|
def edit_firmware(
|
|
doc_id: str,
|
|
channel: str | None = None,
|
|
version: str | None = None,
|
|
update_type: UpdateType | None = None,
|
|
min_fw_version: str | None = None,
|
|
changelog: str | None = None,
|
|
release_note: str | None = None,
|
|
bespoke_uid: str | None = None,
|
|
file_bytes: bytes | None = None,
|
|
) -> FirmwareVersion:
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Firmware")
|
|
|
|
data = doc.to_dict() or {}
|
|
hw_type = data["hw_type"]
|
|
old_channel = data.get("channel", "")
|
|
old_version = data.get("version", "")
|
|
|
|
effective_channel = channel if channel is not None else old_channel
|
|
effective_version = version if version is not None else old_version
|
|
|
|
if channel is not None and channel not in VALID_CHANNELS:
|
|
raise HTTPException(status_code=400, detail=f"Invalid channel. Must be one of: {', '.join(sorted(VALID_CHANNELS))}")
|
|
|
|
updates: dict = {}
|
|
if channel is not None:
|
|
updates["channel"] = channel
|
|
if version is not None:
|
|
updates["version"] = version
|
|
if update_type is not None:
|
|
updates["update_type"] = update_type.value
|
|
if min_fw_version is not None:
|
|
updates["min_fw_version"] = min_fw_version if min_fw_version else None
|
|
if changelog is not None:
|
|
updates["changelog"] = changelog if changelog else None
|
|
if release_note is not None:
|
|
updates["release_note"] = release_note if release_note else None
|
|
if bespoke_uid is not None:
|
|
updates["bespoke_uid"] = bespoke_uid if bespoke_uid else None
|
|
|
|
if file_bytes is not None:
|
|
# Move binary if path changed
|
|
old_path = _storage_path(hw_type, old_channel, old_version)
|
|
new_path = _storage_path(hw_type, effective_channel, effective_version)
|
|
if old_path != new_path and old_path.exists():
|
|
old_path.unlink()
|
|
try:
|
|
old_path.parent.rmdir()
|
|
except OSError:
|
|
pass
|
|
new_path.parent.mkdir(parents=True, exist_ok=True)
|
|
new_path.write_bytes(file_bytes)
|
|
updates["sha256"] = hashlib.sha256(file_bytes).hexdigest()
|
|
updates["size_bytes"] = len(file_bytes)
|
|
elif (channel is not None and channel != old_channel) or (version is not None and version != old_version):
|
|
# Path changed but no new file — move existing binary
|
|
old_path = _storage_path(hw_type, old_channel, old_version)
|
|
new_path = _storage_path(hw_type, effective_channel, effective_version)
|
|
if old_path.exists() and old_path != new_path:
|
|
new_path.parent.mkdir(parents=True, exist_ok=True)
|
|
old_path.rename(new_path)
|
|
try:
|
|
old_path.parent.rmdir()
|
|
except OSError:
|
|
pass
|
|
|
|
if updates:
|
|
doc_ref.update(updates)
|
|
|
|
return _doc_to_firmware_version(doc_ref.get())
|
|
|
|
|
|
def delete_firmware(doc_id: str) -> None:
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Firmware")
|
|
|
|
data = doc.to_dict()
|
|
hw_type = data.get("hw_type", "")
|
|
channel = data.get("channel", "")
|
|
version = data.get("version", "")
|
|
was_latest = data.get("is_latest", False)
|
|
|
|
# Delete the binary file
|
|
path = _storage_path(hw_type, channel, version)
|
|
if path.exists():
|
|
path.unlink()
|
|
# Remove the version directory if empty
|
|
try:
|
|
path.parent.rmdir()
|
|
except OSError:
|
|
pass
|
|
|
|
doc_ref.delete()
|
|
|
|
# If we deleted the latest, promote the next most recent as latest
|
|
if was_latest:
|
|
remaining = list(
|
|
db.collection(COLLECTION)
|
|
.where("hw_type", "==", hw_type)
|
|
.where("channel", "==", channel)
|
|
.stream()
|
|
)
|
|
if remaining:
|
|
# Sort in Python to avoid needing a composite Firestore index
|
|
remaining.sort(key=lambda d: d.to_dict().get("uploaded_at", ""), reverse=True)
|
|
remaining[0].reference.update({"is_latest": True})
|