import logging import uuid from pathlib import Path from datetime import datetime from typing import List, Optional from builder import database as db from builder.models import BuiltMelodyCreate, BuiltMelodyUpdate, BuiltMelodyInDB from fastapi import HTTPException from config import settings logger = logging.getLogger("builder.service") # Storage directory for built .bsm files — configurable via BUILT_MELODIES_STORAGE_PATH env var STORAGE_DIR = Path(settings.built_melodies_storage_path) def _ensure_storage_dir(): STORAGE_DIR.mkdir(parents=True, exist_ok=True) def _binary_url(melody_id: str) -> str: """Returns the API URL to download the binary for a given melody id.""" return f"/builder/melodies/{melody_id}/download" def _row_to_built_melody(row: dict) -> BuiltMelodyInDB: binary_path = row.get("binary_path") binary_url = _binary_url(row["id"]) if binary_path else None return BuiltMelodyInDB( id=row["id"], name=row["name"], pid=row["pid"], steps=row["steps"], is_builtin=row.get("is_builtin", False), binary_path=binary_path, binary_url=binary_url, progmem_code=row.get("progmem_code"), assigned_melody_ids=row.get("assigned_melody_ids", []), created_at=row["created_at"], updated_at=row["updated_at"], ) # ============================================================================ # Conversion Logic (ported from SecondaryApps/MelodyBuilders/) # ============================================================================ def parse_bell_notation(notation: str) -> int: """Convert human-readable bell notation to uint16 bit flag value. Examples: "2+8" → bells 2,8 → bits 1,7 → 0x0082 (130) "4" → bell 4 → bit 3 → 0x0008 (8) "0" → silence → 0x0000 (0) """ notation = notation.strip() if notation == "0" or not notation: return 0 value = 0 for bell_str in notation.split("+"): bell_str = bell_str.strip() try: bell_num = int(bell_str) if bell_num == 0: continue if bell_num < 1 or bell_num > 16: logger.warning(f"Bell number {bell_num} out of range (1-16), skipping") continue value |= 1 << (bell_num - 1) except ValueError: logger.warning(f"Invalid bell token '{bell_str}', skipping") return value def steps_string_to_values(steps: str) -> List[int]: """Parse raw steps string (e.g. '1,2+3,0,4') into list of uint16 values.""" return [parse_bell_notation(s) for s in steps.split(",")] def format_melody_array(name: str, values: List[int], values_per_line: int = 8) -> str: """Format values as C PROGMEM array declaration.""" array_name = f"melody_builtin_{name.lower()}" lines = [f"const uint16_t PROGMEM {array_name}[] = {{"] for i in range(0, len(values), values_per_line): chunk = values[i : i + values_per_line] hex_vals = [f"0x{v:04X}" for v in chunk] suffix = "," if i + len(chunk) < len(values) else "" lines.append(" " + ", ".join(hex_vals) + suffix) lines.append("};") return "\n".join(lines) def generate_progmem_code(name: str, pid: str, values: List[int]) -> str: """Generate standalone PROGMEM C code for a single melody.""" array_name = f"melody_builtin_{name.lower()}" id_name = pid if pid else f"builtin_{name.lower()}" display_name = name.replace("_", " ").title() timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") parts = [ f"// Generated: {timestamp}", f"// Melody: {display_name} | PID: {id_name}", "", format_melody_array(name, values), "", "// --- Add this entry to your MELODY_LIBRARY[] array: ---", "// {", f'// "{display_name}",', f'// "{id_name}",', f"// {array_name},", f"// sizeof({array_name}) / sizeof(uint16_t)", "// }", ] return "\n".join(parts) # ============================================================================ # CRUD # ============================================================================ async def list_built_melodies() -> List[BuiltMelodyInDB]: rows = await db.list_built_melodies() return [_row_to_built_melody(r) for r in rows] async def get_built_melody(melody_id: str) -> BuiltMelodyInDB: row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") return _row_to_built_melody(row) async def _check_unique(name: str, pid: str, exclude_id: Optional[str] = None) -> None: """Raise 409 if name or PID is already taken by another archetype.""" rows = await db.list_built_melodies() for row in rows: if exclude_id and row["id"] == exclude_id: continue if row["name"].lower() == name.lower(): raise HTTPException(status_code=409, detail=f"An archetype with the name '{name}' already exists.") if pid and row.get("pid") and row["pid"].lower() == pid.lower(): raise HTTPException(status_code=409, detail=f"An archetype with the PID '{pid}' already exists.") async def create_built_melody(data: BuiltMelodyCreate) -> BuiltMelodyInDB: await _check_unique(data.name, data.pid or "") melody_id = str(uuid.uuid4()) await db.insert_built_melody( melody_id=melody_id, name=data.name, pid=data.pid, steps=data.steps, is_builtin=data.is_builtin, ) # Auto-build binary and builtin code on creation result = await get_built_melody(melody_id) result = await _do_build(melody_id) return result async def update_built_melody(melody_id: str, data: BuiltMelodyUpdate) -> BuiltMelodyInDB: row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") new_name = data.name if data.name is not None else row["name"] new_pid = data.pid if data.pid is not None else row["pid"] new_steps = data.steps if data.steps is not None else row["steps"] new_is_builtin = data.is_builtin if data.is_builtin is not None else row.get("is_builtin", False) await _check_unique(new_name, new_pid or "", exclude_id=melody_id) steps_changed = (data.steps is not None) and (data.steps != row["steps"]) await db.update_built_melody(melody_id, name=new_name, pid=new_pid, steps=new_steps, is_builtin=new_is_builtin) # If steps changed, flag all assigned melodies as outdated, then rebuild if steps_changed: assigned_ids = row.get("assigned_melody_ids", []) if assigned_ids: await _flag_melodies_outdated(assigned_ids, True) # Auto-rebuild binary and builtin code on every save return await _do_build(melody_id) async def delete_built_melody(melody_id: str) -> None: row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") # Flag all assigned melodies as outdated before deleting assigned_ids = row.get("assigned_melody_ids", []) if assigned_ids: await _flag_melodies_outdated(assigned_ids, True) # Delete the .bsm file if it exists if row.get("binary_path"): bsm_path = Path(row["binary_path"]) if bsm_path.exists(): bsm_path.unlink() await db.delete_built_melody(melody_id) async def toggle_builtin(melody_id: str) -> BuiltMelodyInDB: """Toggle the is_builtin flag for an archetype.""" row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") new_value = not row.get("is_builtin", False) await db.update_builtin_flag(melody_id, new_value) return await get_built_melody(melody_id) # ============================================================================ # Build Actions # ============================================================================ async def _do_build(melody_id: str) -> BuiltMelodyInDB: """Internal: build both binary and PROGMEM code, return updated record.""" await build_binary(melody_id) return await build_builtin_code(melody_id) async def build_binary(melody_id: str) -> BuiltMelodyInDB: """Parse steps and write a .bsm binary file to storage.""" row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") _ensure_storage_dir() values = steps_string_to_values(row["steps"]) file_name = f"{row['pid']}.bsm" if row.get("pid") else f"{melody_id}.bsm" bsm_path = STORAGE_DIR / file_name with open(bsm_path, "wb") as f: for value in values: value = value & 0xFFFF f.write(value.to_bytes(2, byteorder="big")) await db.update_binary_path(melody_id, str(bsm_path)) logger.info(f"Built binary for '{row['name']}' → {bsm_path} ({len(values)} steps)") return await get_built_melody(melody_id) async def build_builtin_code(melody_id: str) -> BuiltMelodyInDB: """Generate PROGMEM C code and store it in the database.""" row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") values = steps_string_to_values(row["steps"]) code = generate_progmem_code(row["name"], row["pid"], values) await db.update_progmem_code(melody_id, code) logger.info(f"Built builtin code for '{row['name']}'") return await get_built_melody(melody_id) async def get_binary_path(melody_id: str) -> Optional[Path]: """Return the filesystem path to the .bsm file, or None if not built.""" row = await db.get_built_melody(melody_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{melody_id}' not found") if not row.get("binary_path"): return None path = Path(row["binary_path"]) if not path.exists(): return None return path async def generate_builtin_list() -> str: """Generate a C++ header with PROGMEM arrays for all is_builtin archetypes.""" rows = await db.list_built_melodies() builtin_rows = [r for r in rows if r.get("is_builtin")] if not builtin_rows: return "// No built-in archetypes defined.\n" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") parts = [ f"// Auto-generated Built-in Archetype List", f"// Generated: {timestamp}", f"// Total built-ins: {len(builtin_rows)}", "", "#pragma once", "#include ", "", ] entry_refs = [] for row in builtin_rows: values = steps_string_to_values(row["steps"]) array_name = f"melody_builtin_{row['name'].lower().replace(' ', '_')}" display_name = row["name"].replace("_", " ").title() pid = row.get("pid") or f"builtin_{row['name'].lower()}" parts.append(f"// {display_name} | PID: {pid} | Steps: {len(values)}") parts.append(format_melody_array(row["name"].lower().replace(" ", "_"), values)) parts.append("") entry_refs.append((display_name, pid, array_name, len(values))) # Generate MELODY_LIBRARY array parts.append("// --- MELODY_LIBRARY entries ---") parts.append("// Add these to your firmware's MELODY_LIBRARY[] array:") parts.append("// {") for display_name, pid, array_name, step_count in entry_refs: parts.append(f'// {{ "{display_name}", "{pid}", {array_name}, {step_count} }},') parts.append("// };") return "\n".join(parts) # ============================================================================ # Assignment # ============================================================================ async def assign_to_melody(built_id: str, firestore_melody_id: str) -> BuiltMelodyInDB: """Add a Firestore melody ID to this built melody's assignment list.""" row = await db.get_built_melody(built_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{built_id}' not found") assigned = row.get("assigned_melody_ids", []) if firestore_melody_id not in assigned: assigned.append(firestore_melody_id) await db.update_assigned_melody_ids(built_id, assigned) # Clear outdated flag on the melody being assigned await _flag_melodies_outdated([firestore_melody_id], False) return await get_built_melody(built_id) async def unassign_from_melody(built_id: str, firestore_melody_id: str) -> BuiltMelodyInDB: """Remove a Firestore melody ID from this built melody's assignment list.""" row = await db.get_built_melody(built_id) if not row: raise HTTPException(status_code=404, detail=f"Built melody '{built_id}' not found") assigned = [mid for mid in row.get("assigned_melody_ids", []) if mid != firestore_melody_id] await db.update_assigned_melody_ids(built_id, assigned) # Flag the melody as outdated since it no longer has an archetype await _flag_melodies_outdated([firestore_melody_id], True) return await get_built_melody(built_id) async def get_built_melody_for_firestore_id(firestore_melody_id: str) -> Optional[BuiltMelodyInDB]: """Find the built melody assigned to a given Firestore melody ID (first match).""" rows = await db.list_built_melodies() for row in rows: if firestore_melody_id in row.get("assigned_melody_ids", []): return _row_to_built_melody(row) return None # ============================================================================ # Outdated Flag Helpers # ============================================================================ async def _flag_melodies_outdated(melody_ids: List[str], outdated: bool) -> None: """Set or clear the outdated_archetype flag on a list of Firestore melody IDs. This updates both SQLite (melody_drafts) and Firestore (published melodies). We import inline to avoid circular imports. """ if not melody_ids: return try: from melodies import database as melody_db from shared.firebase import get_db as get_firestore except ImportError: logger.warning("Could not import melody/firebase modules — skipping outdated flag update") return firestore_db = get_firestore() for melody_id in melody_ids: try: row = await melody_db.get_melody(melody_id) if not row: continue data = row["data"] info = dict(data.get("information", {})) info["outdated_archetype"] = outdated data["information"] = info await melody_db.update_melody(melody_id, data) # If published, also update Firestore if row.get("status") == "published": doc_ref = firestore_db.collection("melodies").document(melody_id) doc_ref.update({"information.outdated_archetype": outdated}) logger.info(f"Set outdated_archetype={outdated} on melody {melody_id}") except Exception as e: logger.error(f"Failed to set outdated flag on melody {melody_id}: {e}")