from shared.firebase import get_db, get_bucket from shared.exceptions import NotFoundError from melodies.models import MelodyCreate, MelodyUpdate, MelodyInDB COLLECTION = "melodies" def _doc_to_melody(doc) -> MelodyInDB: """Convert a Firestore document snapshot to a MelodyInDB model.""" data = doc.to_dict() # Backward compat: if name/description are plain strings, wrap as dict info = data.get("information", {}) if isinstance(info.get("name"), str): info["name"] = {"en": info["name"]} if info["name"] else {} if isinstance(info.get("description"), str): info["description"] = {"en": info["description"]} if info["description"] else {} data["information"] = info return MelodyInDB(id=doc.id, **data) def list_melodies( search: str | None = None, melody_type: str | None = None, tone: str | None = None, total_notes: int | None = None, ) -> list[MelodyInDB]: """List melodies with optional filters.""" db = get_db() ref = db.collection(COLLECTION) # Firestore doesn't support full-text search, so we fetch and filter in-memory # for the name search. Type/tone/totalNotes can be queried server-side. query = ref if melody_type: query = query.where("type", "==", melody_type) docs = query.stream() results = [] for doc in docs: melody = _doc_to_melody(doc) # Client-side filters if tone and melody.information.melodyTone.value != tone: continue if total_notes is not None and melody.information.totalNotes != total_notes: continue if search: search_lower = search.lower() name_match = any( search_lower in v.lower() for v in melody.information.name.values() if isinstance(v, str) ) desc_match = any( search_lower in v.lower() for v in melody.information.description.values() if isinstance(v, str) ) tag_match = any(search_lower in t.lower() for t in melody.information.customTags) if not (name_match or desc_match or tag_match): continue results.append(melody) return results def get_melody(melody_id: str) -> MelodyInDB: """Get a single melody by document ID.""" db = get_db() doc = db.collection(COLLECTION).document(melody_id).get() if not doc.exists: raise NotFoundError("Melody") return _doc_to_melody(doc) def create_melody(data: MelodyCreate) -> MelodyInDB: """Create a new melody document in Firestore.""" db = get_db() doc_data = data.model_dump() _, doc_ref = db.collection(COLLECTION).add(doc_data) return MelodyInDB(id=doc_ref.id, **doc_data) def update_melody(melody_id: str, data: MelodyUpdate) -> MelodyInDB: """Update an existing melody document. Only provided fields are updated.""" db = get_db() doc_ref = db.collection(COLLECTION).document(melody_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Melody") update_data = data.model_dump(exclude_none=True) # For nested structs, merge with existing data rather than replacing existing = doc.to_dict() for key in ("information", "default_settings"): if key in update_data and key in existing: merged = {**existing[key], **update_data[key]} update_data[key] = merged doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_melody(updated_doc) def delete_melody(melody_id: str) -> None: """Delete a melody document and its associated storage files.""" db = get_db() doc_ref = db.collection(COLLECTION).document(melody_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Melody") # Delete associated storage files _delete_storage_files(melody_id) doc_ref.delete() def upload_file(melody_id: str, file_bytes: bytes, filename: str, content_type: str) -> str: """Upload a file to Firebase Storage under melodies/{melody_id}/.""" bucket = get_bucket() if not bucket: raise RuntimeError("Firebase Storage not initialized") # Determine subfolder based on content type if content_type in ("application/octet-stream", "application/macbinary"): storage_path = f"melodies/{melody_id}/binary.bin" else: # Audio preview files ext = filename.rsplit(".", 1)[-1] if "." in filename else "mp3" storage_path = f"melodies/{melody_id}/preview.{ext}" blob = bucket.blob(storage_path) blob.upload_from_string(file_bytes, content_type=content_type) blob.make_public() return blob.public_url def delete_file(melody_id: str, file_type: str) -> None: """Delete a specific file from storage. file_type is 'binary' or 'preview'.""" bucket = get_bucket() if not bucket: return prefix = f"melodies/{melody_id}/" blobs = list(bucket.list_blobs(prefix=prefix)) for blob in blobs: if file_type == "binary" and "binary" in blob.name: blob.delete() elif file_type == "preview" and "preview" in blob.name: blob.delete() def _delete_storage_files(melody_id: str) -> None: """Delete all storage files for a melody.""" bucket = get_bucket() if not bucket: return prefix = f"melodies/{melody_id}/" blobs = list(bucket.list_blobs(prefix=prefix)) for blob in blobs: blob.delete() def get_storage_files(melody_id: str) -> dict: """List storage files for a melody, returning URLs.""" bucket = get_bucket() if not bucket: return {"binary_url": None, "preview_url": None} prefix = f"melodies/{melody_id}/" blobs = list(bucket.list_blobs(prefix=prefix)) result = {"binary_url": None, "preview_url": None} for blob in blobs: blob.make_public() if "binary" in blob.name: result["binary_url"] = blob.public_url elif "preview" in blob.name: result["preview_url"] = blob.public_url return result