Files
bellsystems-cp/backend/melodies/service.py

326 lines
11 KiB
Python

import json
import uuid
import logging
from datetime import datetime
from shared.firebase import get_db as get_firestore, get_bucket
from shared.exceptions import NotFoundError
from melodies.models import MelodyCreate, MelodyUpdate, MelodyInDB
from melodies import database as melody_db
COLLECTION = "melodies"
logger = logging.getLogger("melodies.service")
def _parse_localized_string(value: str) -> dict:
"""Parse a JSON-encoded localized string into a dict. Returns {} on failure."""
if not value:
return {}
try:
parsed = json.loads(value)
if isinstance(parsed, dict):
return parsed
except (json.JSONDecodeError, TypeError):
pass
return {}
def _doc_to_melody(doc) -> MelodyInDB:
"""Convert a Firestore document snapshot to a MelodyInDB model."""
data = doc.to_dict()
return MelodyInDB(id=doc.id, **data)
def _row_to_melody(row: dict) -> MelodyInDB:
"""Convert a SQLite row (with parsed data) to a MelodyInDB model."""
data = row["data"]
return MelodyInDB(id=row["id"], status=row["status"], **data)
def _matches_filters(melody: MelodyInDB, search: str | None, tone: str | None,
total_notes: int | None) -> bool:
"""Apply client-side filters to a melody."""
if tone and melody.information.melodyTone.value != tone:
return False
if total_notes is not None and melody.information.totalNotes != total_notes:
return False
if search:
search_lower = search.lower()
name_dict = _parse_localized_string(melody.information.name)
desc_dict = _parse_localized_string(melody.information.description)
name_match = any(
search_lower in v.lower() for v in name_dict.values() if isinstance(v, str)
)
desc_match = any(
search_lower in v.lower() for v in desc_dict.values() if isinstance(v, str)
)
tag_match = any(search_lower in t.lower() for t in melody.information.customTags)
if not (name_match or desc_match or tag_match):
return False
return True
async def list_melodies(
search: str | None = None,
melody_type: str | None = None,
tone: str | None = None,
total_notes: int | None = None,
status: str | None = None,
) -> list[MelodyInDB]:
"""List melodies from SQLite with optional filters."""
rows = await melody_db.list_melodies(status=status)
results = []
for row in rows:
melody = _row_to_melody(row)
# Server-side type filter
if melody_type and melody.type.value != melody_type:
continue
if not _matches_filters(melody, search, tone, total_notes):
continue
results.append(melody)
return results
async def get_melody(melody_id: str) -> MelodyInDB:
"""Get a single melody by ID. Checks SQLite first."""
row = await melody_db.get_melody(melody_id)
if row:
return _row_to_melody(row)
raise NotFoundError("Melody")
def _sanitize_metadata_for_create(existing: dict | None, actor_name: str | None) -> dict:
now = datetime.utcnow().isoformat() + "Z"
metadata = dict(existing or {})
creator = metadata.get("createdBy") or actor_name or "Unknown"
created_at = metadata.get("dateCreated") or now
metadata["createdBy"] = creator
metadata["dateCreated"] = created_at
metadata["lastEditedBy"] = actor_name or metadata.get("lastEditedBy") or creator
metadata["dateEdited"] = now
if "adminNotes" not in metadata:
metadata["adminNotes"] = []
return metadata
def _sanitize_metadata_for_update(existing: dict | None, incoming: dict | None, actor_name: str | None) -> dict:
now = datetime.utcnow().isoformat() + "Z"
existing_meta = dict(existing or {})
incoming_meta = dict(incoming or {})
# Created fields are immutable after first set.
created_by = existing_meta.get("createdBy") or incoming_meta.get("createdBy") or actor_name or "Unknown"
date_created = existing_meta.get("dateCreated") or incoming_meta.get("dateCreated") or now
merged = {**existing_meta, **incoming_meta}
merged["createdBy"] = created_by
merged["dateCreated"] = date_created
merged["lastEditedBy"] = actor_name or incoming_meta.get("lastEditedBy") or existing_meta.get("lastEditedBy") or created_by
merged["dateEdited"] = now
if "adminNotes" not in merged:
merged["adminNotes"] = existing_meta.get("adminNotes", [])
return merged
async def create_melody(data: MelodyCreate, publish: bool = False, actor_name: str | None = None) -> MelodyInDB:
"""Create a new melody. If publish=True, also push to Firestore."""
melody_id = str(uuid.uuid4())
doc_data = data.model_dump()
doc_data["metadata"] = _sanitize_metadata_for_create(doc_data.get("metadata"), actor_name)
status = "published" if publish else "draft"
# Always save to SQLite
await melody_db.insert_melody(melody_id, status, doc_data)
# If publishing, also save to Firestore
if publish:
db = get_firestore()
db.collection(COLLECTION).document(melody_id).set(doc_data)
return MelodyInDB(id=melody_id, status=status, **doc_data)
async def update_melody(melody_id: str, data: MelodyUpdate, actor_name: str | None = None) -> MelodyInDB:
"""Update an existing melody. If published, also update Firestore."""
row = await melody_db.get_melody(melody_id)
if not row:
raise NotFoundError("Melody")
existing_data = row["data"]
update_data = data.model_dump(exclude_none=True)
# Merge nested structs
for key in ("information", "default_settings"):
if key in update_data and key in existing_data:
merged = {**existing_data[key], **update_data[key]}
update_data[key] = merged
if "metadata" in update_data or "metadata" in existing_data:
update_data["metadata"] = _sanitize_metadata_for_update(
existing_data.get("metadata"),
update_data.get("metadata"),
actor_name,
)
merged_data = {**existing_data, **update_data}
# Update SQLite
await melody_db.update_melody(melody_id, merged_data)
# If published, also update Firestore
if row["status"] == "published":
db = get_firestore()
doc_ref = db.collection(COLLECTION).document(melody_id)
doc_ref.set(merged_data)
return MelodyInDB(id=melody_id, status=row["status"], **merged_data)
async def publish_melody(melody_id: str) -> MelodyInDB:
"""Publish a draft melody to Firestore."""
row = await melody_db.get_melody(melody_id)
if not row:
raise NotFoundError("Melody")
doc_data = row["data"]
# Write to Firestore
db = get_firestore()
db.collection(COLLECTION).document(melody_id).set(doc_data)
# Update status in SQLite
await melody_db.update_status(melody_id, "published")
return MelodyInDB(id=melody_id, status="published", **doc_data)
async def unpublish_melody(melody_id: str) -> MelodyInDB:
"""Remove melody from Firestore but keep in SQLite as draft."""
row = await melody_db.get_melody(melody_id)
if not row:
raise NotFoundError("Melody")
# Delete from Firestore
db = get_firestore()
doc_ref = db.collection(COLLECTION).document(melody_id)
doc = doc_ref.get()
if doc.exists:
doc_ref.delete()
# Update status in SQLite
await melody_db.update_status(melody_id, "draft")
return MelodyInDB(id=melody_id, status="draft", **row["data"])
async def delete_melody(melody_id: str) -> None:
"""Delete a melody from SQLite and Firestore (if published), plus storage files."""
row = await melody_db.get_melody(melody_id)
if not row:
raise NotFoundError("Melody")
# Delete from Firestore if published
if row["status"] == "published":
db = get_firestore()
doc_ref = db.collection(COLLECTION).document(melody_id)
doc = doc_ref.get()
if doc.exists:
doc_ref.delete()
# Delete storage files
_delete_storage_files(melody_id)
# Delete from SQLite
await melody_db.delete_melody(melody_id)
def upload_file(melody_id: str, file_bytes: bytes, filename: str, content_type: str) -> str:
"""Upload a file to Firebase Storage under melodies/{melody_id}/."""
bucket = get_bucket()
if not bucket:
raise RuntimeError("Firebase Storage not initialized")
if content_type in ("application/octet-stream", "application/macbinary"):
storage_path = f"melodies/{melody_id}/binary.bin"
else:
ext = filename.rsplit(".", 1)[-1] if "." in filename else "mp3"
storage_path = f"melodies/{melody_id}/preview.{ext}"
blob = bucket.blob(storage_path)
blob.upload_from_string(file_bytes, content_type=content_type)
blob.make_public()
return blob.public_url
def delete_file(melody_id: str, file_type: str) -> None:
"""Delete a specific file from storage. file_type is 'binary' or 'preview'."""
bucket = get_bucket()
if not bucket:
return
prefix = f"melodies/{melody_id}/"
blobs = list(bucket.list_blobs(prefix=prefix))
for blob in blobs:
if file_type == "binary" and "binary" in blob.name:
blob.delete()
elif file_type == "preview" and "preview" in blob.name:
blob.delete()
def _delete_storage_files(melody_id: str) -> None:
"""Delete all storage files for a melody."""
bucket = get_bucket()
if not bucket:
return
prefix = f"melodies/{melody_id}/"
blobs = list(bucket.list_blobs(prefix=prefix))
for blob in blobs:
blob.delete()
def get_storage_files(melody_id: str) -> dict:
"""List storage files for a melody, returning URLs."""
bucket = get_bucket()
if not bucket:
return {"binary_url": None, "preview_url": None}
prefix = f"melodies/{melody_id}/"
blobs = list(bucket.list_blobs(prefix=prefix))
result = {"binary_url": None, "preview_url": None}
for blob in blobs:
blob.make_public()
if "binary" in blob.name:
result["binary_url"] = blob.public_url
elif "preview" in blob.name:
result["preview_url"] = blob.public_url
return result
async def migrate_from_firestore() -> int:
"""One-time migration: import existing Firestore melodies into SQLite as published.
Only runs if the melody_drafts table is empty."""
count = await melody_db.count_melodies()
if count > 0:
return 0
db = get_firestore()
docs = db.collection(COLLECTION).stream()
imported = 0
for doc in docs:
data = doc.to_dict()
await melody_db.insert_melody(doc.id, "published", data)
imported += 1
if imported > 0:
logger.info(f"Migrated {imported} melodies from Firestore to SQLite")
return imported