258 lines
9.2 KiB
Python
258 lines
9.2 KiB
Python
import logging
|
|
import uuid
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from builder import database as db
|
|
from builder.models import BuiltMelodyCreate, BuiltMelodyUpdate, BuiltMelodyInDB
|
|
from fastapi import HTTPException
|
|
|
|
logger = logging.getLogger("builder.service")
|
|
|
|
# Storage directory for built .bsm files
|
|
STORAGE_DIR = Path(__file__).parent.parent / "storage" / "built_melodies"
|
|
|
|
|
|
def _ensure_storage_dir():
|
|
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _binary_url(melody_id: str) -> str:
|
|
"""Returns the API URL to download the binary for a given melody id."""
|
|
return f"/api/builder/melodies/{melody_id}/download"
|
|
|
|
|
|
def _row_to_built_melody(row: dict) -> BuiltMelodyInDB:
|
|
binary_path = row.get("binary_path")
|
|
binary_url = _binary_url(row["id"]) if binary_path else None
|
|
return BuiltMelodyInDB(
|
|
id=row["id"],
|
|
name=row["name"],
|
|
pid=row["pid"],
|
|
steps=row["steps"],
|
|
binary_path=binary_path,
|
|
binary_url=binary_url,
|
|
progmem_code=row.get("progmem_code"),
|
|
assigned_melody_ids=row.get("assigned_melody_ids", []),
|
|
created_at=row["created_at"],
|
|
updated_at=row["updated_at"],
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Conversion Logic (ported from SecondaryApps/MelodyBuilders/)
|
|
# ============================================================================
|
|
|
|
def parse_bell_notation(notation: str) -> int:
|
|
"""Convert human-readable bell notation to uint16 bit flag value.
|
|
|
|
Examples:
|
|
"2+8" → bells 2,8 → bits 1,7 → 0x0082 (130)
|
|
"4" → bell 4 → bit 3 → 0x0008 (8)
|
|
"0" → silence → 0x0000 (0)
|
|
"""
|
|
notation = notation.strip()
|
|
if notation == "0" or not notation:
|
|
return 0
|
|
|
|
value = 0
|
|
for bell_str in notation.split("+"):
|
|
bell_str = bell_str.strip()
|
|
try:
|
|
bell_num = int(bell_str)
|
|
if bell_num == 0:
|
|
continue
|
|
if bell_num < 1 or bell_num > 16:
|
|
logger.warning(f"Bell number {bell_num} out of range (1-16), skipping")
|
|
continue
|
|
value |= 1 << (bell_num - 1)
|
|
except ValueError:
|
|
logger.warning(f"Invalid bell token '{bell_str}', skipping")
|
|
return value
|
|
|
|
|
|
def steps_string_to_values(steps: str) -> List[int]:
|
|
"""Parse raw steps string (e.g. '1,2+3,0,4') into list of uint16 values."""
|
|
return [parse_bell_notation(s) for s in steps.split(",")]
|
|
|
|
|
|
def format_melody_array(name: str, values: List[int], values_per_line: int = 8) -> str:
|
|
"""Format values as C PROGMEM array declaration."""
|
|
array_name = f"melody_builtin_{name.lower()}"
|
|
lines = [f"const uint16_t PROGMEM {array_name}[] = {{"]
|
|
for i in range(0, len(values), values_per_line):
|
|
chunk = values[i : i + values_per_line]
|
|
hex_vals = [f"0x{v:04X}" for v in chunk]
|
|
suffix = "," if i + len(chunk) < len(values) else ""
|
|
lines.append(" " + ", ".join(hex_vals) + suffix)
|
|
lines.append("};")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def generate_progmem_code(name: str, pid: str, values: List[int]) -> str:
|
|
"""Generate standalone PROGMEM C code for a single melody."""
|
|
array_name = f"melody_builtin_{name.lower()}"
|
|
id_name = pid if pid else f"builtin_{name.lower()}"
|
|
display_name = name.replace("_", " ").title()
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
parts = [
|
|
f"// Generated: {timestamp}",
|
|
f"// Melody: {display_name} | PID: {id_name}",
|
|
"",
|
|
format_melody_array(name, values),
|
|
"",
|
|
"// --- Add this entry to your MELODY_LIBRARY[] array: ---",
|
|
"// {",
|
|
f'// "{display_name}",',
|
|
f'// "{id_name}",',
|
|
f"// {array_name},",
|
|
f"// sizeof({array_name}) / sizeof(uint16_t)",
|
|
"// }",
|
|
]
|
|
return "\n".join(parts)
|
|
|
|
|
|
# ============================================================================
|
|
# CRUD
|
|
# ============================================================================
|
|
|
|
async def list_built_melodies() -> List[BuiltMelodyInDB]:
|
|
rows = await db.list_built_melodies()
|
|
return [_row_to_built_melody(r) for r in rows]
|
|
|
|
|
|
async def get_built_melody(melody_id: str) -> BuiltMelodyInDB:
|
|
row = await db.get_built_melody(melody_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found")
|
|
return _row_to_built_melody(row)
|
|
|
|
|
|
async def create_built_melody(data: BuiltMelodyCreate) -> BuiltMelodyInDB:
|
|
melody_id = str(uuid.uuid4())
|
|
await db.insert_built_melody(
|
|
melody_id=melody_id,
|
|
name=data.name,
|
|
pid=data.pid,
|
|
steps=data.steps,
|
|
)
|
|
return await get_built_melody(melody_id)
|
|
|
|
|
|
async def update_built_melody(melody_id: str, data: BuiltMelodyUpdate) -> BuiltMelodyInDB:
|
|
row = await db.get_built_melody(melody_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found")
|
|
|
|
new_name = data.name if data.name is not None else row["name"]
|
|
new_pid = data.pid if data.pid is not None else row["pid"]
|
|
new_steps = data.steps if data.steps is not None else row["steps"]
|
|
|
|
await db.update_built_melody(melody_id, name=new_name, pid=new_pid, steps=new_steps)
|
|
return await get_built_melody(melody_id)
|
|
|
|
|
|
async def delete_built_melody(melody_id: str) -> None:
|
|
row = await db.get_built_melody(melody_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found")
|
|
|
|
# Delete the .bsm file if it exists
|
|
if row.get("binary_path"):
|
|
bsm_path = Path(row["binary_path"])
|
|
if bsm_path.exists():
|
|
bsm_path.unlink()
|
|
|
|
await db.delete_built_melody(melody_id)
|
|
|
|
|
|
# ============================================================================
|
|
# Build Actions
|
|
# ============================================================================
|
|
|
|
async def build_binary(melody_id: str) -> BuiltMelodyInDB:
|
|
"""Parse steps and write a .bsm binary file to storage."""
|
|
row = await db.get_built_melody(melody_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found")
|
|
|
|
_ensure_storage_dir()
|
|
values = steps_string_to_values(row["steps"])
|
|
|
|
bsm_path = STORAGE_DIR / f"{melody_id}.bsm"
|
|
with open(bsm_path, "wb") as f:
|
|
for value in values:
|
|
value = value & 0xFFFF
|
|
f.write(value.to_bytes(2, byteorder="big"))
|
|
|
|
await db.update_binary_path(melody_id, str(bsm_path))
|
|
logger.info(f"Built binary for '{row['name']}' → {bsm_path} ({len(values)} steps)")
|
|
return await get_built_melody(melody_id)
|
|
|
|
|
|
async def build_builtin_code(melody_id: str) -> BuiltMelodyInDB:
|
|
"""Generate PROGMEM C code and store it in the database."""
|
|
row = await db.get_built_melody(melody_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found")
|
|
|
|
values = steps_string_to_values(row["steps"])
|
|
code = generate_progmem_code(row["name"], row["pid"], values)
|
|
|
|
await db.update_progmem_code(melody_id, code)
|
|
logger.info(f"Built builtin code for '{row['name']}'")
|
|
return await get_built_melody(melody_id)
|
|
|
|
|
|
async def get_binary_path(melody_id: str) -> Optional[Path]:
|
|
"""Return the filesystem path to the .bsm file, or None if not built."""
|
|
row = await db.get_built_melody(melody_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found")
|
|
if not row.get("binary_path"):
|
|
return None
|
|
path = Path(row["binary_path"])
|
|
if not path.exists():
|
|
return None
|
|
return path
|
|
|
|
|
|
# ============================================================================
|
|
# Assignment
|
|
# ============================================================================
|
|
|
|
async def assign_to_melody(built_id: str, firestore_melody_id: str) -> BuiltMelodyInDB:
|
|
"""Add a Firestore melody ID to this built melody's assignment list."""
|
|
row = await db.get_built_melody(built_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{built_id}' not found")
|
|
|
|
assigned = row.get("assigned_melody_ids", [])
|
|
if firestore_melody_id not in assigned:
|
|
assigned.append(firestore_melody_id)
|
|
await db.update_assigned_melody_ids(built_id, assigned)
|
|
|
|
return await get_built_melody(built_id)
|
|
|
|
|
|
async def unassign_from_melody(built_id: str, firestore_melody_id: str) -> BuiltMelodyInDB:
|
|
"""Remove a Firestore melody ID from this built melody's assignment list."""
|
|
row = await db.get_built_melody(built_id)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Built melody '{built_id}' not found")
|
|
|
|
assigned = [mid for mid in row.get("assigned_melody_ids", []) if mid != firestore_melody_id]
|
|
await db.update_assigned_melody_ids(built_id, assigned)
|
|
return await get_built_melody(built_id)
|
|
|
|
|
|
async def get_built_melody_for_firestore_id(firestore_melody_id: str) -> Optional[BuiltMelodyInDB]:
|
|
"""Find the built melody assigned to a given Firestore melody ID (first match)."""
|
|
rows = await db.list_built_melodies()
|
|
for row in rows:
|
|
if firestore_melody_id in row.get("assigned_melody_ids", []):
|
|
return _row_to_built_melody(row)
|
|
return None
|