428 lines
14 KiB
Python
428 lines
14 KiB
Python
import json
|
|
import uuid
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from shared.firebase import get_db as get_firestore, get_bucket
|
|
from shared.exceptions import NotFoundError
|
|
from melodies.models import MelodyCreate, MelodyUpdate, MelodyInDB
|
|
from melodies import database as melody_db
|
|
|
|
COLLECTION = "melodies"
|
|
logger = logging.getLogger("melodies.service")
|
|
|
|
|
|
def _parse_localized_string(value: str) -> dict:
|
|
"""Parse a JSON-encoded localized string into a dict. Returns {} on failure."""
|
|
if not value:
|
|
return {}
|
|
try:
|
|
parsed = json.loads(value)
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _doc_to_melody(doc) -> MelodyInDB:
|
|
"""Convert a Firestore document snapshot to a MelodyInDB model."""
|
|
data = doc.to_dict()
|
|
return MelodyInDB(id=doc.id, **data)
|
|
|
|
|
|
def _row_to_melody(row: dict) -> MelodyInDB:
|
|
"""Convert a SQLite row (with parsed data) to a MelodyInDB model."""
|
|
data = row["data"]
|
|
return MelodyInDB(id=row["id"], status=row["status"], **data)
|
|
|
|
|
|
def _matches_filters(melody: MelodyInDB, search: str | None, tone: str | None,
|
|
total_notes: int | None) -> bool:
|
|
"""Apply client-side filters to a melody."""
|
|
if tone and melody.information.melodyTone.value != tone:
|
|
return False
|
|
if total_notes is not None and melody.information.totalNotes != total_notes:
|
|
return False
|
|
if search:
|
|
search_lower = search.lower()
|
|
name_dict = _parse_localized_string(melody.information.name)
|
|
desc_dict = _parse_localized_string(melody.information.description)
|
|
name_match = any(
|
|
search_lower in v.lower() for v in name_dict.values() if isinstance(v, str)
|
|
)
|
|
desc_match = any(
|
|
search_lower in v.lower() for v in desc_dict.values() if isinstance(v, str)
|
|
)
|
|
tag_match = any(search_lower in t.lower() for t in melody.information.customTags)
|
|
if not (name_match or desc_match or tag_match):
|
|
return False
|
|
return True
|
|
|
|
|
|
async def list_melodies(
|
|
search: str | None = None,
|
|
melody_type: str | None = None,
|
|
tone: str | None = None,
|
|
total_notes: int | None = None,
|
|
status: str | None = None,
|
|
) -> list[MelodyInDB]:
|
|
"""List melodies from SQLite with optional filters."""
|
|
rows = await melody_db.list_melodies(status=status)
|
|
results = []
|
|
|
|
for row in rows:
|
|
melody = _row_to_melody(row)
|
|
|
|
# Server-side type filter
|
|
if melody_type and melody.type.value != melody_type:
|
|
continue
|
|
|
|
if not _matches_filters(melody, search, tone, total_notes):
|
|
continue
|
|
|
|
results.append(melody)
|
|
|
|
return results
|
|
|
|
|
|
async def get_melody(melody_id: str) -> MelodyInDB:
|
|
"""Get a single melody by ID. Checks SQLite first."""
|
|
row = await melody_db.get_melody(melody_id)
|
|
if row:
|
|
return _row_to_melody(row)
|
|
raise NotFoundError("Melody")
|
|
|
|
|
|
def _sanitize_metadata_for_create(existing: dict | None, actor_name: str | None) -> dict:
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
metadata = dict(existing or {})
|
|
creator = metadata.get("createdBy") or actor_name or "Unknown"
|
|
created_at = metadata.get("dateCreated") or now
|
|
metadata["createdBy"] = creator
|
|
metadata["dateCreated"] = created_at
|
|
metadata["lastEditedBy"] = actor_name or metadata.get("lastEditedBy") or creator
|
|
metadata["dateEdited"] = now
|
|
if "adminNotes" not in metadata:
|
|
metadata["adminNotes"] = []
|
|
return metadata
|
|
|
|
|
|
def _sanitize_metadata_for_update(existing: dict | None, incoming: dict | None, actor_name: str | None) -> dict:
|
|
now = datetime.utcnow().isoformat() + "Z"
|
|
existing_meta = dict(existing or {})
|
|
incoming_meta = dict(incoming or {})
|
|
|
|
# Created fields are immutable after first set.
|
|
created_by = existing_meta.get("createdBy") or incoming_meta.get("createdBy") or actor_name or "Unknown"
|
|
date_created = existing_meta.get("dateCreated") or incoming_meta.get("dateCreated") or now
|
|
|
|
merged = {**existing_meta, **incoming_meta}
|
|
merged["createdBy"] = created_by
|
|
merged["dateCreated"] = date_created
|
|
merged["lastEditedBy"] = actor_name or incoming_meta.get("lastEditedBy") or existing_meta.get("lastEditedBy") or created_by
|
|
merged["dateEdited"] = now
|
|
if "adminNotes" not in merged:
|
|
merged["adminNotes"] = existing_meta.get("adminNotes", [])
|
|
return merged
|
|
|
|
|
|
async def create_melody(data: MelodyCreate, publish: bool = False, actor_name: str | None = None) -> MelodyInDB:
|
|
"""Create a new melody. If publish=True, also push to Firestore."""
|
|
melody_id = str(uuid.uuid4())
|
|
doc_data = data.model_dump()
|
|
doc_data["metadata"] = _sanitize_metadata_for_create(doc_data.get("metadata"), actor_name)
|
|
status = "published" if publish else "draft"
|
|
|
|
# Always save to SQLite
|
|
await melody_db.insert_melody(melody_id, status, doc_data)
|
|
|
|
# If publishing, also save to Firestore
|
|
if publish:
|
|
db = get_firestore()
|
|
db.collection(COLLECTION).document(melody_id).set(doc_data)
|
|
|
|
return MelodyInDB(id=melody_id, status=status, **doc_data)
|
|
|
|
|
|
async def update_melody(melody_id: str, data: MelodyUpdate, actor_name: str | None = None) -> MelodyInDB:
|
|
"""Update an existing melody. If published, also update Firestore."""
|
|
row = await melody_db.get_melody(melody_id)
|
|
if not row:
|
|
raise NotFoundError("Melody")
|
|
|
|
existing_data = row["data"]
|
|
update_data = data.model_dump(exclude_none=True)
|
|
|
|
# Merge nested structs
|
|
for key in ("information", "default_settings"):
|
|
if key in update_data and key in existing_data:
|
|
merged = {**existing_data[key], **update_data[key]}
|
|
update_data[key] = merged
|
|
if "metadata" in update_data or "metadata" in existing_data:
|
|
update_data["metadata"] = _sanitize_metadata_for_update(
|
|
existing_data.get("metadata"),
|
|
update_data.get("metadata"),
|
|
actor_name,
|
|
)
|
|
|
|
merged_data = {**existing_data, **update_data}
|
|
|
|
# Update SQLite
|
|
await melody_db.update_melody(melody_id, merged_data)
|
|
|
|
# If published, also update Firestore
|
|
if row["status"] == "published":
|
|
db = get_firestore()
|
|
doc_ref = db.collection(COLLECTION).document(melody_id)
|
|
doc_ref.set(merged_data)
|
|
|
|
return MelodyInDB(id=melody_id, status=row["status"], **merged_data)
|
|
|
|
|
|
async def publish_melody(melody_id: str) -> MelodyInDB:
|
|
"""Publish a draft melody to Firestore."""
|
|
row = await melody_db.get_melody(melody_id)
|
|
if not row:
|
|
raise NotFoundError("Melody")
|
|
|
|
doc_data = row["data"]
|
|
|
|
# Write to Firestore
|
|
db = get_firestore()
|
|
db.collection(COLLECTION).document(melody_id).set(doc_data)
|
|
|
|
# Update status in SQLite
|
|
await melody_db.update_status(melody_id, "published")
|
|
|
|
return MelodyInDB(id=melody_id, status="published", **doc_data)
|
|
|
|
|
|
async def unpublish_melody(melody_id: str) -> MelodyInDB:
|
|
"""Remove melody from Firestore but keep in SQLite as draft."""
|
|
row = await melody_db.get_melody(melody_id)
|
|
if not row:
|
|
raise NotFoundError("Melody")
|
|
|
|
# Delete from Firestore
|
|
db = get_firestore()
|
|
doc_ref = db.collection(COLLECTION).document(melody_id)
|
|
doc = doc_ref.get()
|
|
if doc.exists:
|
|
doc_ref.delete()
|
|
|
|
# Update status in SQLite
|
|
await melody_db.update_status(melody_id, "draft")
|
|
|
|
return MelodyInDB(id=melody_id, status="draft", **row["data"])
|
|
|
|
|
|
async def delete_melody(melody_id: str) -> None:
|
|
"""Delete a melody from SQLite and Firestore (if published), plus storage files."""
|
|
row = await melody_db.get_melody(melody_id)
|
|
if not row:
|
|
raise NotFoundError("Melody")
|
|
|
|
# Delete from Firestore if published
|
|
if row["status"] == "published":
|
|
db = get_firestore()
|
|
doc_ref = db.collection(COLLECTION).document(melody_id)
|
|
doc = doc_ref.get()
|
|
if doc.exists:
|
|
doc_ref.delete()
|
|
|
|
# Delete storage files
|
|
_delete_storage_files(melody_id, row["data"].get("uid"))
|
|
|
|
# Delete from SQLite
|
|
await melody_db.delete_melody(melody_id)
|
|
|
|
|
|
def upload_file(melody_id: str, file_bytes: bytes, filename: str, content_type: str) -> str:
|
|
"""Upload a file to Firebase Storage under melodies/{melody_id}/."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
raise RuntimeError("Firebase Storage not initialized")
|
|
|
|
if content_type in ("application/octet-stream", "application/macbinary"):
|
|
storage_path = f"melodies/{melody_id}/binary.bin"
|
|
else:
|
|
ext = filename.rsplit(".", 1)[-1] if "." in filename else "mp3"
|
|
storage_path = f"melodies/{melody_id}/preview.{ext}"
|
|
|
|
blob = bucket.blob(storage_path)
|
|
blob.upload_from_string(file_bytes, content_type=content_type)
|
|
blob.make_public()
|
|
return blob.public_url
|
|
|
|
|
|
def _is_binary_blob_name(blob_name: str) -> bool:
|
|
lower = (blob_name or "").lower()
|
|
base = lower.rsplit("/", 1)[-1]
|
|
if "preview" in base:
|
|
return False
|
|
return ("binary" in base) or base.endswith(".bin") or base.endswith(".bsm")
|
|
|
|
|
|
def _safe_storage_segment(raw: str | None, fallback: str) -> str:
|
|
value = (raw or "").strip()
|
|
if not value:
|
|
value = fallback
|
|
chars = []
|
|
for ch in value:
|
|
if ch.isalnum() or ch in ("-", "_", "."):
|
|
chars.append(ch)
|
|
else:
|
|
chars.append("_")
|
|
cleaned = "".join(chars).strip("._")
|
|
return cleaned or fallback
|
|
|
|
|
|
def _storage_prefixes(melody_id: str, melody_uid: str | None) -> list[str]:
|
|
uid_seg = _safe_storage_segment(melody_uid, melody_id)
|
|
id_seg = _safe_storage_segment(melody_id, melody_id)
|
|
prefixes = [f"melodies/{uid_seg}/"]
|
|
if uid_seg != id_seg:
|
|
# Legacy path support
|
|
prefixes.append(f"melodies/{id_seg}/")
|
|
return prefixes
|
|
|
|
|
|
def _list_blobs_for_prefixes(bucket, prefixes: list[str]):
|
|
all_blobs = []
|
|
seen = set()
|
|
for prefix in prefixes:
|
|
for blob in bucket.list_blobs(prefix=prefix):
|
|
if blob.name in seen:
|
|
continue
|
|
seen.add(blob.name)
|
|
all_blobs.append(blob)
|
|
return all_blobs
|
|
|
|
|
|
def upload_file_for_melody(melody_id: str, melody_uid: str | None, melody_pid: str | None, file_bytes: bytes, filename: str, content_type: str) -> str:
|
|
"""Upload a file to Firebase Storage under melodies/{melody_uid or melody_id}/.
|
|
Binary files are stored as {pid}.bsm and replace previous melody binaries.
|
|
"""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
raise RuntimeError("Firebase Storage not initialized")
|
|
|
|
prefixes = _storage_prefixes(melody_id, melody_uid)
|
|
primary_prefix = prefixes[0]
|
|
|
|
if content_type in ("application/octet-stream", "application/macbinary"):
|
|
# Keep one active binary per melody, clean older binaries in both legacy/current prefixes.
|
|
for blob in _list_blobs_for_prefixes(bucket, prefixes):
|
|
if _is_binary_blob_name(blob.name):
|
|
blob.delete()
|
|
|
|
stem = filename.rsplit(".", 1)[0] if "." in filename else filename
|
|
pid_seg = _safe_storage_segment(stem or melody_pid, "binary")
|
|
storage_path = f"{primary_prefix}{pid_seg}.bsm"
|
|
binary_content_type = "application/octet-stream"
|
|
blob = bucket.blob(storage_path)
|
|
blob.upload_from_string(file_bytes, content_type=binary_content_type)
|
|
blob.make_public()
|
|
return blob.public_url
|
|
|
|
ext = filename.rsplit(".", 1)[-1] if "." in filename else "mp3"
|
|
storage_path = f"{primary_prefix}preview.{ext}"
|
|
blob = bucket.blob(storage_path)
|
|
blob.upload_from_string(file_bytes, content_type=content_type)
|
|
blob.make_public()
|
|
return blob.public_url
|
|
|
|
|
|
def get_binary_file_bytes(melody_id: str, melody_uid: str | None = None) -> tuple[bytes, str]:
|
|
"""Fetch current binary bytes for a melody from Firebase Storage."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
raise RuntimeError("Firebase Storage not initialized")
|
|
|
|
prefixes = _storage_prefixes(melody_id, melody_uid)
|
|
blobs = [b for b in _list_blobs_for_prefixes(bucket, prefixes) if _is_binary_blob_name(b.name)]
|
|
if not blobs:
|
|
raise NotFoundError("Binary file")
|
|
|
|
# Prefer explicit binary.* naming, then newest.
|
|
blobs.sort(
|
|
key=lambda b: (
|
|
0 if "binary" in b.name.rsplit("/", 1)[-1].lower() else 1,
|
|
-(int(b.time_created.timestamp()) if getattr(b, "time_created", None) else 0),
|
|
)
|
|
)
|
|
chosen = blobs[0]
|
|
data = chosen.download_as_bytes()
|
|
content_type = chosen.content_type or "application/octet-stream"
|
|
return data, content_type
|
|
|
|
|
|
def delete_file(melody_id: str, file_type: str, melody_uid: str | None = None) -> None:
|
|
"""Delete a specific file from storage. file_type is 'binary' or 'preview'."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
return
|
|
|
|
prefixes = _storage_prefixes(melody_id, melody_uid)
|
|
blobs = _list_blobs_for_prefixes(bucket, prefixes)
|
|
|
|
for blob in blobs:
|
|
if file_type == "binary" and "binary" in blob.name:
|
|
blob.delete()
|
|
elif file_type == "preview" and "preview" in blob.name:
|
|
blob.delete()
|
|
|
|
|
|
def _delete_storage_files(melody_id: str, melody_uid: str | None = None) -> None:
|
|
"""Delete all storage files for a melody."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
return
|
|
|
|
prefixes = _storage_prefixes(melody_id, melody_uid)
|
|
blobs = _list_blobs_for_prefixes(bucket, prefixes)
|
|
for blob in blobs:
|
|
blob.delete()
|
|
|
|
|
|
def get_storage_files(melody_id: str, melody_uid: str | None = None) -> dict:
|
|
"""List storage files for a melody, returning URLs."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
return {"binary_url": None, "preview_url": None}
|
|
|
|
prefixes = _storage_prefixes(melody_id, melody_uid)
|
|
blobs = _list_blobs_for_prefixes(bucket, prefixes)
|
|
|
|
result = {"binary_url": None, "preview_url": None}
|
|
for blob in blobs:
|
|
blob.make_public()
|
|
if _is_binary_blob_name(blob.name):
|
|
result["binary_url"] = blob.public_url
|
|
elif "preview" in blob.name:
|
|
result["preview_url"] = blob.public_url
|
|
|
|
return result
|
|
|
|
|
|
async def migrate_from_firestore() -> int:
|
|
"""One-time migration: import existing Firestore melodies into SQLite as published.
|
|
Only runs if the melody_drafts table is empty."""
|
|
count = await melody_db.count_melodies()
|
|
if count > 0:
|
|
return 0
|
|
|
|
db = get_firestore()
|
|
docs = db.collection(COLLECTION).stream()
|
|
imported = 0
|
|
|
|
for doc in docs:
|
|
data = doc.to_dict()
|
|
await melody_db.insert_melody(doc.id, "published", data)
|
|
imported += 1
|
|
|
|
if imported > 0:
|
|
logger.info(f"Migrated {imported} melodies from Firestore to SQLite")
|
|
return imported
|