175 lines
5.4 KiB
Python
175 lines
5.4 KiB
Python
from shared.firebase import get_db, get_bucket
|
|
from shared.exceptions import NotFoundError
|
|
from melodies.models import MelodyCreate, MelodyUpdate, MelodyInDB
|
|
|
|
COLLECTION = "melodies"
|
|
|
|
|
|
def _doc_to_melody(doc) -> MelodyInDB:
|
|
"""Convert a Firestore document snapshot to a MelodyInDB model."""
|
|
data = doc.to_dict()
|
|
return MelodyInDB(id=doc.id, **data)
|
|
|
|
|
|
def list_melodies(
|
|
search: str | None = None,
|
|
melody_type: str | None = None,
|
|
tone: str | None = None,
|
|
total_notes: int | None = None,
|
|
) -> list[MelodyInDB]:
|
|
"""List melodies with optional filters."""
|
|
db = get_db()
|
|
ref = db.collection(COLLECTION)
|
|
|
|
# Firestore doesn't support full-text search, so we fetch and filter in-memory
|
|
# for the name search. Type/tone/totalNotes can be queried server-side.
|
|
query = ref
|
|
|
|
if melody_type:
|
|
query = query.where("type", "==", melody_type)
|
|
|
|
docs = query.stream()
|
|
results = []
|
|
|
|
for doc in docs:
|
|
melody = _doc_to_melody(doc)
|
|
|
|
# Client-side filters
|
|
if tone and melody.information.melodyTone.value != tone:
|
|
continue
|
|
if total_notes is not None and melody.information.totalNotes != total_notes:
|
|
continue
|
|
if search:
|
|
search_lower = search.lower()
|
|
name_match = search_lower in melody.information.name.lower()
|
|
desc_match = search_lower in melody.information.description.lower()
|
|
tag_match = any(search_lower in t.lower() for t in melody.information.customTags)
|
|
if not (name_match or desc_match or tag_match):
|
|
continue
|
|
|
|
results.append(melody)
|
|
|
|
return results
|
|
|
|
|
|
def get_melody(melody_id: str) -> MelodyInDB:
|
|
"""Get a single melody by document ID."""
|
|
db = get_db()
|
|
doc = db.collection(COLLECTION).document(melody_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Melody")
|
|
return _doc_to_melody(doc)
|
|
|
|
|
|
def create_melody(data: MelodyCreate) -> MelodyInDB:
|
|
"""Create a new melody document in Firestore."""
|
|
db = get_db()
|
|
doc_data = data.model_dump()
|
|
_, doc_ref = db.collection(COLLECTION).add(doc_data)
|
|
return MelodyInDB(id=doc_ref.id, **doc_data)
|
|
|
|
|
|
def update_melody(melody_id: str, data: MelodyUpdate) -> MelodyInDB:
|
|
"""Update an existing melody document. Only provided fields are updated."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(melody_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Melody")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
|
|
# For nested structs, merge with existing data rather than replacing
|
|
existing = doc.to_dict()
|
|
for key in ("information", "default_settings"):
|
|
if key in update_data and key in existing:
|
|
merged = {**existing[key], **update_data[key]}
|
|
update_data[key] = merged
|
|
|
|
doc_ref.update(update_data)
|
|
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_melody(updated_doc)
|
|
|
|
|
|
def delete_melody(melody_id: str) -> None:
|
|
"""Delete a melody document and its associated storage files."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(melody_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Melody")
|
|
|
|
# Delete associated storage files
|
|
_delete_storage_files(melody_id)
|
|
|
|
doc_ref.delete()
|
|
|
|
|
|
def upload_file(melody_id: str, file_bytes: bytes, filename: str, content_type: str) -> str:
|
|
"""Upload a file to Firebase Storage under melodies/{melody_id}/."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
raise RuntimeError("Firebase Storage not initialized")
|
|
|
|
# Determine subfolder based on content type
|
|
if content_type in ("application/octet-stream", "application/macbinary"):
|
|
storage_path = f"melodies/{melody_id}/binary.bin"
|
|
else:
|
|
# Audio preview files
|
|
ext = filename.rsplit(".", 1)[-1] if "." in filename else "mp3"
|
|
storage_path = f"melodies/{melody_id}/preview.{ext}"
|
|
|
|
blob = bucket.blob(storage_path)
|
|
blob.upload_from_string(file_bytes, content_type=content_type)
|
|
blob.make_public()
|
|
return blob.public_url
|
|
|
|
|
|
def delete_file(melody_id: str, file_type: str) -> None:
|
|
"""Delete a specific file from storage. file_type is 'binary' or 'preview'."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
return
|
|
|
|
prefix = f"melodies/{melody_id}/"
|
|
blobs = list(bucket.list_blobs(prefix=prefix))
|
|
|
|
for blob in blobs:
|
|
if file_type == "binary" and "binary" in blob.name:
|
|
blob.delete()
|
|
elif file_type == "preview" and "preview" in blob.name:
|
|
blob.delete()
|
|
|
|
|
|
def _delete_storage_files(melody_id: str) -> None:
|
|
"""Delete all storage files for a melody."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
return
|
|
|
|
prefix = f"melodies/{melody_id}/"
|
|
blobs = list(bucket.list_blobs(prefix=prefix))
|
|
for blob in blobs:
|
|
blob.delete()
|
|
|
|
|
|
def get_storage_files(melody_id: str) -> dict:
|
|
"""List storage files for a melody, returning URLs."""
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
return {"binary_url": None, "preview_url": None}
|
|
|
|
prefix = f"melodies/{melody_id}/"
|
|
blobs = list(bucket.list_blobs(prefix=prefix))
|
|
|
|
result = {"binary_url": None, "preview_url": None}
|
|
for blob in blobs:
|
|
blob.make_public()
|
|
if "binary" in blob.name:
|
|
result["binary_url"] = blob.public_url
|
|
elif "preview" in blob.name:
|
|
result["preview_url"] = blob.public_url
|
|
|
|
return result
|