from datetime import datetime, timezone from shared.firebase import get_db from shared.exceptions import NotFoundError from equipment.models import NoteCreate, NoteUpdate, NoteInDB COLLECTION = "notes" VALID_CATEGORIES = {"general", "maintenance", "installation", "issue", "action_item", "other"} def _convert_firestore_value(val): """Convert Firestore-specific types to strings.""" if isinstance(val, datetime): return val.strftime("%d %B %Y at %H:%M:%S UTC%z") return val def _sanitize_dict(d: dict) -> dict: """Recursively convert Firestore-native types in a dict to plain strings.""" result = {} for k, v in d.items(): if isinstance(v, dict): result[k] = _sanitize_dict(v) elif isinstance(v, list): result[k] = [ _sanitize_dict(item) if isinstance(item, dict) else _convert_firestore_value(item) for item in v ] else: result[k] = _convert_firestore_value(v) return result def _doc_to_note(doc) -> NoteInDB: """Convert a Firestore document snapshot to a NoteInDB model.""" data = _sanitize_dict(doc.to_dict()) return NoteInDB(id=doc.id, **data) def _resolve_names(db, device_id: str | None, user_id: str | None) -> tuple[str, str]: """Look up device_name and user_name from their IDs.""" device_name = "" user_name = "" try: if device_id and isinstance(device_id, str) and device_id.strip(): device_doc = db.collection("devices").document(device_id.strip()).get() if device_doc.exists: device_name = device_doc.to_dict().get("device_name", "") if user_id and isinstance(user_id, str) and user_id.strip(): user_doc = db.collection("users").document(user_id.strip()).get() if user_doc.exists: user_doc_data = user_doc.to_dict() user_name = user_doc_data.get("display_name", "") or user_doc_data.get("email", "") except Exception as e: print(f"[equipment] Error resolving names (device_id={device_id}, user_id={user_id}): {e}") return device_name, user_name def list_notes( search: str | None = None, category: str | None = None, device_id: str | None = None, user_id: str | None = None, ) -> list[NoteInDB]: """List notes with optional filters.""" db = get_db() ref = db.collection(COLLECTION) query = ref if category and category in VALID_CATEGORIES: query = query.where("category", "==", category) if device_id: query = query.where("device_id", "==", device_id) if user_id: query = query.where("user_id", "==", user_id) # Only use order_by when no field filters are applied (avoids composite index requirement) has_field_filters = bool(category or device_id or user_id) if not has_field_filters: query = query.order_by("created_at", direction="DESCENDING") docs = query.stream() results = [] for doc in docs: note = _doc_to_note(doc) if search: search_lower = search.lower() title_match = search_lower in (note.title or "").lower() content_match = search_lower in (note.content or "").lower() if not (title_match or content_match): continue results.append(note) # Sort client-side when we couldn't use order_by if has_field_filters: results.sort(key=lambda n: n.created_at or "", reverse=True) return results def get_note(note_id: str) -> NoteInDB: """Get a single note by Firestore document ID.""" db = get_db() doc = db.collection(COLLECTION).document(note_id).get() if not doc.exists: raise NotFoundError("Note") return _doc_to_note(doc) def create_note(data: NoteCreate, created_by: str = "") -> NoteInDB: """Create a new note document in Firestore.""" db = get_db() now = datetime.now(timezone.utc).strftime("%d %B %Y at %H:%M:%S UTC") device_name, user_name = _resolve_names(db, data.device_id, data.user_id) doc_data = data.model_dump() doc_data["device_id"] = data.device_id or "" doc_data["user_id"] = data.user_id or "" doc_data["device_name"] = device_name doc_data["user_name"] = user_name doc_data["created_by"] = created_by doc_data["created_at"] = now doc_data["updated_at"] = now _, doc_ref = db.collection(COLLECTION).add(doc_data) return NoteInDB(id=doc_ref.id, **doc_data) def update_note(note_id: str, data: NoteUpdate) -> NoteInDB: """Update an existing note. Only provided fields are updated.""" db = get_db() doc_ref = db.collection(COLLECTION).document(note_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Note") update_data = data.model_dump(exclude_none=True) # Re-resolve names if device_id or user_id changed existing = doc.to_dict() new_device_id = update_data.get("device_id", existing.get("device_id", "")) new_user_id = update_data.get("user_id", existing.get("user_id", "")) if "device_id" in update_data or "user_id" in update_data: device_name, user_name = _resolve_names(db, new_device_id, new_user_id) if "device_id" in update_data: update_data["device_name"] = device_name if "user_id" in update_data: update_data["user_name"] = user_name update_data["updated_at"] = datetime.now(timezone.utc).strftime("%d %B %Y at %H:%M:%S UTC") doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_note(updated_doc) def delete_note(note_id: str) -> None: """Delete a note document from Firestore.""" db = get_db() doc_ref = db.collection(COLLECTION).document(note_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Note") doc_ref.delete()