update: firmware and provisioning now supports bootloader and partition tables

This commit is contained in:
2026-03-16 08:52:58 +02:00
parent 360725c93f
commit 4381a6681d
15 changed files with 776 additions and 49 deletions

View File

@@ -29,6 +29,7 @@ class Settings(BaseSettings):
# Local file storage
built_melodies_storage_path: str = "./storage/built_melodies"
firmware_storage_path: str = "./storage/firmware"
flash_assets_storage_path: str = "./storage/flash_assets"
# Email (Resend)
resend_api_key: str = "re_placeholder_change_me"

View File

@@ -30,13 +30,30 @@ class FirmwareListResponse(BaseModel):
class FirmwareMetadataResponse(BaseModel):
"""Returned by both /latest and /{version}/info endpoints."""
"""Returned by both /latest and /{version}/info endpoints.
Two orthogonal axes:
channel — the release track the device is subscribed to
("stable" | "beta" | "development")
Firmware validates this matches the channel it requested.
update_type — the urgency of THIS release, set by the publisher
("optional" | "mandatory" | "emergency")
Firmware reads mandatory/emergency booleans derived from this.
Additional firmware-compatible fields:
size — binary size in bytes (firmware reads "size", not "size_bytes")
mandatory — True when update_type is mandatory or emergency
emergency — True only when update_type is emergency
"""
hw_type: str
channel: str
channel: str # release track — firmware validates this
version: str
size_bytes: int
size: int # firmware reads "size"
size_bytes: int # kept for admin-panel consumers
sha256: str
update_type: UpdateType
update_type: UpdateType # urgency enum — for admin panel display
mandatory: bool # derived: update_type in (mandatory, emergency)
emergency: bool # derived: update_type == emergency
min_fw_version: Optional[str] = None
download_url: str
uploaded_at: str

View File

@@ -1,13 +1,18 @@
from fastapi import APIRouter, Depends, Query, UploadFile, File, Form
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import Optional
import logging
from auth.models import TokenPayload
from auth.dependencies import require_permission
from firmware.models import FirmwareVersion, FirmwareListResponse, FirmwareMetadataResponse, UpdateType
from firmware import service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/firmware", tags=["firmware"])
ota_router = APIRouter(prefix="/api/ota", tags=["ota-telemetry"])
@router.post("/upload", response_model=FirmwareVersion, status_code=201)
@@ -44,11 +49,16 @@ def list_firmware(
@router.get("/{hw_type}/{channel}/latest", response_model=FirmwareMetadataResponse)
def get_latest_firmware(hw_type: str, channel: str):
def get_latest_firmware(
hw_type: str,
channel: str,
hw_version: Optional[str] = Query(None, description="Hardware revision from NVS, e.g. '1.0'"),
current_version: Optional[str] = Query(None, description="Currently running firmware semver, e.g. '1.2.3'"),
):
"""Returns metadata for the latest firmware for a given hw_type + channel.
No auth required — devices call this endpoint to check for updates.
"""
return service.get_latest(hw_type, channel)
return service.get_latest(hw_type, channel, hw_version=hw_version, current_version=current_version)
@router.get("/{hw_type}/{channel}/{version}/info", response_model=FirmwareMetadataResponse)
@@ -76,3 +86,52 @@ def delete_firmware(
_user: TokenPayload = Depends(require_permission("manufacturing", "delete")),
):
service.delete_firmware(firmware_id)
# ─────────────────────────────────────────────────────────────────────────────
# OTA event telemetry — called by devices (no auth, best-effort)
# ─────────────────────────────────────────────────────────────────────────────
class OtaDownloadEvent(BaseModel):
device_uid: str
hw_type: str
hw_version: str
from_version: str
to_version: str
channel: str
class OtaFlashEvent(BaseModel):
device_uid: str
hw_type: str
hw_version: str
from_version: str
to_version: str
channel: str
sha256: str
@ota_router.post("/events/download", status_code=204)
def ota_event_download(event: OtaDownloadEvent):
"""Device reports that firmware was fully written to flash (pre-commit).
No auth required — best-effort telemetry from the device.
"""
logger.info(
"OTA download event: device=%s hw=%s/%s %s%s (channel=%s)",
event.device_uid, event.hw_type, event.hw_version,
event.from_version, event.to_version, event.channel,
)
service.record_ota_event("download", event.model_dump())
@ota_router.post("/events/flash", status_code=204)
def ota_event_flash(event: OtaFlashEvent):
"""Device reports that firmware partition was committed and device is rebooting.
No auth required — best-effort telemetry from the device.
"""
logger.info(
"OTA flash event: device=%s hw=%s/%s %s%s (channel=%s sha256=%.16s...)",
event.device_uid, event.hw_type, event.hw_version,
event.from_version, event.to_version, event.channel, event.sha256,
)
service.record_ota_event("flash", event.model_dump())

View File

@@ -1,7 +1,9 @@
import hashlib
import logging
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import HTTPException
@@ -10,6 +12,8 @@ from shared.firebase import get_db
from shared.exceptions import NotFoundError
from firmware.models import FirmwareVersion, FirmwareMetadataResponse, UpdateType
logger = logging.getLogger(__name__)
COLLECTION = "firmware_versions"
VALID_HW_TYPES = {"vesper", "vesper_plus", "vesper_pro", "chronos", "chronos_pro", "agnus", "agnus_mini"}
@@ -46,13 +50,18 @@ def _doc_to_firmware_version(doc) -> FirmwareVersion:
def _fw_to_metadata_response(fw: FirmwareVersion) -> FirmwareMetadataResponse:
download_url = f"/api/firmware/{fw.hw_type}/{fw.channel}/{fw.version}/firmware.bin"
is_emergency = fw.update_type == UpdateType.emergency
is_mandatory = fw.update_type in (UpdateType.mandatory, UpdateType.emergency)
return FirmwareMetadataResponse(
hw_type=fw.hw_type,
channel=fw.channel,
channel=fw.channel, # firmware validates this matches requested channel
version=fw.version,
size_bytes=fw.size_bytes,
size=fw.size_bytes, # firmware reads "size"
size_bytes=fw.size_bytes, # kept for admin-panel consumers
sha256=fw.sha256,
update_type=fw.update_type,
update_type=fw.update_type, # urgency enum — for admin panel display
mandatory=is_mandatory, # firmware reads this to decide auto-apply
emergency=is_emergency, # firmware reads this to decide immediate apply
min_fw_version=fw.min_fw_version,
download_url=download_url,
uploaded_at=fw.uploaded_at,
@@ -130,7 +139,7 @@ def list_firmware(
return items
def get_latest(hw_type: str, channel: str) -> FirmwareMetadataResponse:
def get_latest(hw_type: str, channel: str, hw_version: str | None = None, current_version: str | None = None) -> FirmwareMetadataResponse:
if hw_type not in VALID_HW_TYPES:
raise HTTPException(status_code=400, detail=f"Invalid hw_type '{hw_type}'")
if channel not in VALID_CHANNELS:
@@ -180,6 +189,22 @@ def get_firmware_path(hw_type: str, channel: str, version: str) -> Path:
return path
def record_ota_event(event_type: str, payload: dict[str, Any]) -> None:
"""Persist an OTA telemetry event (download or flash) to Firestore.
Best-effort — caller should not raise on failure.
"""
try:
db = get_db()
db.collection("ota_events").add({
"event_type": event_type,
"received_at": datetime.now(timezone.utc),
**payload,
})
except Exception as exc:
logger.warning("Failed to persist OTA event (%s): %s", event_type, exc)
def delete_firmware(doc_id: str) -> None:
db = get_db()
doc_ref = db.collection(COLLECTION).document(doc_id)

View File

@@ -15,7 +15,7 @@ from staff.router import router as staff_router
from helpdesk.router import router as helpdesk_router
from builder.router import router as builder_router
from manufacturing.router import router as manufacturing_router
from firmware.router import router as firmware_router
from firmware.router import router as firmware_router, ota_router
from admin.router import router as admin_router
from crm.router import router as crm_products_router
from crm.customers_router import router as crm_customers_router
@@ -58,6 +58,7 @@ app.include_router(staff_router)
app.include_router(builder_router)
app.include_router(manufacturing_router)
app.include_router(firmware_router)
app.include_router(ota_router)
app.include_router(admin_router)
app.include_router(crm_products_router)
app.include_router(crm_customers_router)

View File

@@ -15,7 +15,7 @@ class BoardType(str, Enum):
BOARD_TYPE_LABELS = {
"vesper": "Vesper",
"vesper_plus": "Vesper+",
"vesper_plus": "Vesper Plus",
"vesper_pro": "Vesper Pro",
"chronos": "Chronos",
"chronos_pro": "Chronos Pro",
@@ -23,6 +23,28 @@ BOARD_TYPE_LABELS = {
"agnus": "Agnus",
}
# Family codes (BS + 4 chars = segment 1 of serial number)
BOARD_FAMILY_CODES = {
"vesper": "VSPR",
"vesper_plus": "VSPR",
"vesper_pro": "VSPR",
"agnus": "AGNS",
"agnus_mini": "AGNS",
"chronos": "CRNS",
"chronos_pro": "CRNS",
}
# Variant codes (3 chars = first part of segment 3 of serial number)
BOARD_VARIANT_CODES = {
"vesper": "STD",
"vesper_plus": "PLS",
"vesper_pro": "PRO",
"agnus": "STD",
"agnus_mini": "MIN",
"chronos": "STD",
"chronos_pro": "PRO",
}
class MfgStatus(str, Enum):
manufactured = "manufactured"

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi import APIRouter, Depends, Query, HTTPException, UploadFile, File, Form
from fastapi.responses import Response
from fastapi.responses import RedirectResponse
from typing import Optional
@@ -15,6 +15,9 @@ from manufacturing import service
from manufacturing import audit
from shared.exceptions import NotFoundError
VALID_FLASH_ASSETS = {"bootloader.bin", "partitions.bin"}
VALID_HW_TYPES_MFG = {"vesper", "vesper_plus", "vesper_pro", "agnus", "agnus_mini", "chronos", "chronos_pro"}
router = APIRouter(prefix="/api/manufacturing", tags=["manufacturing"])
@@ -175,3 +178,68 @@ def redirect_firmware(
"""
url = service.get_firmware_url(sn)
return RedirectResponse(url=url, status_code=302)
# ─────────────────────────────────────────────────────────────────────────────
# Flash assets — bootloader.bin and partitions.bin per hw_type
# These are the binaries that must be flashed at fixed addresses during full
# provisioning (0x1000 bootloader, 0x8000 partition table).
# They are NOT flashed during OTA updates — only during initial provisioning.
# Upload once per hw_type after each PlatformIO build that changes the layout.
# ─────────────────────────────────────────────────────────────────────────────
@router.post("/flash-assets/{hw_type}/{asset}", status_code=204)
async def upload_flash_asset(
hw_type: str,
asset: str,
file: UploadFile = File(...),
_user: TokenPayload = Depends(require_permission("manufacturing", "add")),
):
"""Upload a bootloader.bin or partitions.bin for a given hw_type.
These are build artifacts from PlatformIO (.pio/build/{env}/bootloader.bin
and .pio/build/{env}/partitions.bin). Upload them once per hw_type after
each PlatformIO build that changes the partition layout.
"""
if hw_type not in VALID_HW_TYPES_MFG:
raise HTTPException(status_code=400, detail=f"Invalid hw_type. Must be one of: {', '.join(sorted(VALID_HW_TYPES_MFG))}")
if asset not in VALID_FLASH_ASSETS:
raise HTTPException(status_code=400, detail=f"Invalid asset. Must be one of: {', '.join(sorted(VALID_FLASH_ASSETS))}")
data = await file.read()
service.save_flash_asset(hw_type, asset, data)
@router.get("/devices/{sn}/bootloader.bin")
def download_bootloader(
sn: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return the bootloader.bin for this device's hw_type (flashed at 0x1000)."""
item = service.get_device_by_sn(sn)
try:
data = service.get_flash_asset(item.hw_type, "bootloader.bin")
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="bootloader_{item.hw_type}.bin"'},
)
@router.get("/devices/{sn}/partitions.bin")
def download_partitions(
sn: str,
_user: TokenPayload = Depends(require_permission("manufacturing", "view")),
):
"""Return the partitions.bin for this device's hw_type (flashed at 0x8000)."""
item = service.get_device_by_sn(sn)
try:
data = service.get_flash_asset(item.hw_type, "partitions.bin")
except NotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="partitions_{item.hw_type}.bin"'},
)

View File

@@ -2,9 +2,11 @@ import logging
import random
import string
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
from config import settings
from shared.firebase import get_db
from shared.exceptions import NotFoundError
from utils.serial_number import generate_serial
@@ -270,6 +272,29 @@ def delete_unprovisioned_devices() -> list[str]:
return deleted
def _flash_asset_path(hw_type: str, asset: str) -> Path:
"""Return path to a flash asset (bootloader.bin or partitions.bin) for a given hw_type."""
return Path(settings.flash_assets_storage_path) / hw_type / asset
def save_flash_asset(hw_type: str, asset: str, data: bytes) -> Path:
"""Persist a flash asset binary. asset must be 'bootloader.bin' or 'partitions.bin'."""
if asset not in ("bootloader.bin", "partitions.bin"):
raise ValueError(f"Unknown flash asset: {asset}")
path = _flash_asset_path(hw_type, asset)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
return path
def get_flash_asset(hw_type: str, asset: str) -> bytes:
"""Load a flash asset binary. Raises NotFoundError if not uploaded yet."""
path = _flash_asset_path(hw_type, asset)
if not path.exists():
raise NotFoundError(f"Flash asset '{asset}' for hw_type '{hw_type}' — upload it first via POST /api/manufacturing/flash-assets/{{hw_type}}/{{asset}}")
return path.read_bytes()
def get_firmware_url(sn: str) -> str:
"""Return the FastAPI download URL for the latest stable firmware for this device's hw_type."""
from firmware.service import get_latest

Binary file not shown.

Binary file not shown.

View File

@@ -180,9 +180,14 @@ def _build_page(entries: List[bytes], slot_counts: List[int], seq: int = 0) -> b
def generate(serial_number: str, hw_type: str, hw_version: str) -> bytes:
"""Generate a 0x5000-byte NVS partition binary for a Vesper device.
serial_number: full SN string e.g. 'PV-26B27-VS01R-X7KQA'
hw_type: board type e.g. 'vesper', 'vesper_plus', 'vesper_pro'
hw_version: zero-padded version e.g. '01'
serial_number: full SN string e.g. 'BSVSPR-26C13X-STD01R-X7KQA'
hw_type: board family e.g. 'vesper', 'vesper_plus', 'vesper_pro'
hw_version: zero-padded revision e.g. '01'
Writes the NEW schema keys (2.0+) expected by ConfigManager:
serial ← full serial number
hw_family ← board family (hw_type value, lowercase)
hw_revision ← hardware revision string
Returns raw bytes ready to flash at 0x9000.
"""
@@ -190,9 +195,9 @@ def generate(serial_number: str, hw_type: str, hw_version: str) -> bytes:
# Build entries for namespace "device_id"
ns_entry, ns_span = _build_namespace_entry("device_id", ns_index)
uid_entry, uid_span = _build_string_entry(ns_index, "device_uid", serial_number)
hwt_entry, hwt_span = _build_string_entry(ns_index, "hw_type", hw_type.lower())
hwv_entry, hwv_span = _build_string_entry(ns_index, "hw_version", hw_version)
uid_entry, uid_span = _build_string_entry(ns_index, "serial", serial_number)
hwt_entry, hwt_span = _build_string_entry(ns_index, "hw_family", hw_type.lower())
hwv_entry, hwv_span = _build_string_entry(ns_index, "hw_revision", hw_version)
entries = [ns_entry, uid_entry, hwt_entry, hwv_entry]
spans = [ns_span, uid_span, hwt_span, hwv_span]

View File

@@ -4,17 +4,75 @@ from datetime import datetime
MONTH_CODES = "ABCDEFGHIJKL"
SAFE_CHARS = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" # No 0, O, 1, I — avoids label confusion
# Family segment (chars 3-6 of segment 1, after "BS")
BOARD_FAMILY_CODES = {
"vesper": "VSPR",
"vesper_plus": "VSPR",
"vesper_pro": "VSPR",
"agnus": "AGNS",
"agnus_mini": "AGNS",
"chronos": "CRNS",
"chronos_pro": "CRNS",
}
# Variant segment (first 3 chars of segment 3)
BOARD_VARIANT_CODES = {
"vesper": "STD",
"vesper_plus": "PLS",
"vesper_pro": "PRO",
"agnus": "STD",
"agnus_mini": "MIN",
"chronos": "STD",
"chronos_pro": "PRO",
}
def _version_suffix(board_version: str) -> str:
"""Convert version string to 3-char suffix.
Rules:
- Strip the dot: "2.3""23", "10.2""102"
- If result is 2 digits, append "R": "23""23R"
- If result is already 3 digits, use as-is: "102""102"
"""
digits = board_version.replace(".", "")
if len(digits) >= 3:
return digits[:3]
return digits.ljust(2, "0") + "R"
def generate_serial(board_type: str, board_version: str) -> str:
"""Generate a serial number in the format PV-YYMMM-BBTTR-XXXXX.
"""Generate a serial number in the format BSFFFF-YYMDDFX-VVVHHH-XXXXXX.
board_type: 2-char uppercase code, e.g. 'VS', 'VP', 'VX'
board_version: 2-char zero-padded version, e.g. '01'
Format: BSFFFF-YYMDDf-VVVvvv-XXXXXX
BS = Bell Systems (static)
FFFF = 4-char family code (VSPR, AGNS, CRNS)
YY = 2-digit year
M = month code A-L
DD = 2-digit day
f = random filler char
VVV = 3-char variant (STD, PLS, PRO, MIN)
vvv = 3-char version suffix (e.g. 23R, 102)
XXXXXX = 6-char random suffix
board_type: enum value e.g. 'vesper', 'vesper_plus', 'vesper_pro'
board_version: version string e.g. '2.3', '10.2'
"""
key = board_type.lower()
family = BOARD_FAMILY_CODES.get(key, "UNKN")
variant = BOARD_VARIANT_CODES.get(key, "UNK")
ver = _version_suffix(board_version)
now = datetime.utcnow()
year = now.strftime("%y")
year = now.strftime("%y")
month = MONTH_CODES[now.month - 1]
day = now.strftime("%d")
suffix = "".join(random.choices(SAFE_CHARS, k=5))
version_clean = board_version.replace(".", "")
return f"PV-{year}{month}{day}-{board_type.upper()}{version_clean}R-{suffix}"
day = now.strftime("%d")
filler = random.choice(SAFE_CHARS)
suffix = "".join(random.choices(SAFE_CHARS, k=6))
seg1 = f"BS{family}" # e.g. BSVSPR
seg2 = f"{year}{month}{day}{filler}" # e.g. 26C13X
seg3 = f"{variant}{ver}" # e.g. PRO23R
seg4 = suffix # e.g. X9K4M2
return f"{seg1}-{seg2}-{seg3}-{seg4}"