179 lines
5.8 KiB
Python
179 lines
5.8 KiB
Python
from datetime import datetime, timezone
|
|
|
|
from shared.firebase import get_db
|
|
from shared.exceptions import NotFoundError
|
|
from equipment.models import NoteCreate, NoteUpdate, NoteInDB
|
|
|
|
COLLECTION = "notes"
|
|
|
|
VALID_CATEGORIES = {"general", "maintenance", "installation", "issue", "action_item", "other"}
|
|
|
|
|
|
def _convert_firestore_value(val):
|
|
"""Convert Firestore-specific types to strings."""
|
|
if isinstance(val, datetime):
|
|
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
|
|
return val
|
|
|
|
|
|
def _sanitize_dict(d: dict) -> dict:
|
|
"""Recursively convert Firestore-native types in a dict to plain strings."""
|
|
result = {}
|
|
for k, v in d.items():
|
|
if isinstance(v, dict):
|
|
result[k] = _sanitize_dict(v)
|
|
elif isinstance(v, list):
|
|
result[k] = [
|
|
_sanitize_dict(item) if isinstance(item, dict)
|
|
else _convert_firestore_value(item)
|
|
for item in v
|
|
]
|
|
else:
|
|
result[k] = _convert_firestore_value(v)
|
|
return result
|
|
|
|
|
|
def _doc_to_note(doc) -> NoteInDB:
|
|
"""Convert a Firestore document snapshot to a NoteInDB model."""
|
|
data = _sanitize_dict(doc.to_dict())
|
|
return NoteInDB(id=doc.id, **data)
|
|
|
|
|
|
def _resolve_names(db, device_id: str | None, user_id: str | None) -> tuple[str, str]:
|
|
"""Look up device_name and user_name from their IDs."""
|
|
device_name = ""
|
|
user_name = ""
|
|
|
|
try:
|
|
if device_id and isinstance(device_id, str) and device_id.strip():
|
|
device_doc = db.collection("devices").document(device_id.strip()).get()
|
|
if device_doc.exists:
|
|
device_name = device_doc.to_dict().get("device_name", "")
|
|
|
|
if user_id and isinstance(user_id, str) and user_id.strip():
|
|
user_doc = db.collection("users").document(user_id.strip()).get()
|
|
if user_doc.exists:
|
|
user_doc_data = user_doc.to_dict()
|
|
user_name = user_doc_data.get("display_name", "") or user_doc_data.get("email", "")
|
|
except Exception as e:
|
|
print(f"[equipment] Error resolving names (device_id={device_id}, user_id={user_id}): {e}")
|
|
|
|
return device_name, user_name
|
|
|
|
|
|
def list_notes(
|
|
search: str | None = None,
|
|
category: str | None = None,
|
|
device_id: str | None = None,
|
|
user_id: str | None = None,
|
|
) -> list[NoteInDB]:
|
|
"""List notes with optional filters."""
|
|
db = get_db()
|
|
ref = db.collection(COLLECTION)
|
|
query = ref
|
|
|
|
if category and category in VALID_CATEGORIES:
|
|
query = query.where("category", "==", category)
|
|
|
|
if device_id:
|
|
query = query.where("device_id", "==", device_id)
|
|
|
|
if user_id:
|
|
query = query.where("user_id", "==", user_id)
|
|
|
|
# Only use order_by when no field filters are applied (avoids composite index requirement)
|
|
has_field_filters = bool(category or device_id or user_id)
|
|
if not has_field_filters:
|
|
query = query.order_by("created_at", direction="DESCENDING")
|
|
|
|
docs = query.stream()
|
|
results = []
|
|
|
|
for doc in docs:
|
|
note = _doc_to_note(doc)
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
title_match = search_lower in (note.title or "").lower()
|
|
content_match = search_lower in (note.content or "").lower()
|
|
if not (title_match or content_match):
|
|
continue
|
|
|
|
results.append(note)
|
|
|
|
# Sort client-side when we couldn't use order_by
|
|
if has_field_filters:
|
|
results.sort(key=lambda n: n.created_at or "", reverse=True)
|
|
|
|
return results
|
|
|
|
|
|
def get_note(note_id: str) -> NoteInDB:
|
|
"""Get a single note by Firestore document ID."""
|
|
db = get_db()
|
|
doc = db.collection(COLLECTION).document(note_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Note")
|
|
return _doc_to_note(doc)
|
|
|
|
|
|
def create_note(data: NoteCreate, created_by: str = "") -> NoteInDB:
|
|
"""Create a new note document in Firestore."""
|
|
db = get_db()
|
|
now = datetime.now(timezone.utc).strftime("%d %B %Y at %H:%M:%S UTC")
|
|
|
|
device_name, user_name = _resolve_names(db, data.device_id, data.user_id)
|
|
|
|
doc_data = data.model_dump()
|
|
doc_data["device_id"] = data.device_id or ""
|
|
doc_data["user_id"] = data.user_id or ""
|
|
doc_data["device_name"] = device_name
|
|
doc_data["user_name"] = user_name
|
|
doc_data["created_by"] = created_by
|
|
doc_data["created_at"] = now
|
|
doc_data["updated_at"] = now
|
|
|
|
_, doc_ref = db.collection(COLLECTION).add(doc_data)
|
|
|
|
return NoteInDB(id=doc_ref.id, **doc_data)
|
|
|
|
|
|
def update_note(note_id: str, data: NoteUpdate) -> NoteInDB:
|
|
"""Update an existing note. Only provided fields are updated."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(note_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Note")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
|
|
# Re-resolve names if device_id or user_id changed
|
|
existing = doc.to_dict()
|
|
new_device_id = update_data.get("device_id", existing.get("device_id", ""))
|
|
new_user_id = update_data.get("user_id", existing.get("user_id", ""))
|
|
|
|
if "device_id" in update_data or "user_id" in update_data:
|
|
device_name, user_name = _resolve_names(db, new_device_id, new_user_id)
|
|
if "device_id" in update_data:
|
|
update_data["device_name"] = device_name
|
|
if "user_id" in update_data:
|
|
update_data["user_name"] = user_name
|
|
|
|
update_data["updated_at"] = datetime.now(timezone.utc).strftime("%d %B %Y at %H:%M:%S UTC")
|
|
doc_ref.update(update_data)
|
|
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_note(updated_doc)
|
|
|
|
|
|
def delete_note(note_id: str) -> None:
|
|
"""Delete a note document from Firestore."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(note_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Note")
|
|
|
|
doc_ref.delete()
|