170 lines
5.1 KiB
Python
170 lines
5.1 KiB
Python
from datetime import datetime, timezone
|
|
|
|
from google.cloud.firestore_v1 import DocumentReference
|
|
|
|
from shared.firebase import get_db
|
|
from shared.exceptions import NotFoundError
|
|
from helpdesk.models import HelpdeskMessage
|
|
|
|
COLLECTION = "helpdesk"
|
|
|
|
|
|
def _convert_firestore_value(val):
|
|
"""Convert Firestore-specific types to strings."""
|
|
if isinstance(val, datetime):
|
|
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
|
|
if isinstance(val, DocumentReference):
|
|
return val.path
|
|
return val
|
|
|
|
|
|
def _resolve_sender(db, sender_ref) -> tuple[str, str]:
|
|
"""Resolve a sender DocumentReference to (sender_id, sender_name)."""
|
|
sender_id = ""
|
|
sender_name = ""
|
|
|
|
try:
|
|
if isinstance(sender_ref, DocumentReference):
|
|
doc = sender_ref.get()
|
|
if doc.exists:
|
|
data = doc.to_dict()
|
|
sender_id = doc.id
|
|
sender_name = data.get("display_name", "") or data.get("email", "")
|
|
elif isinstance(sender_ref, str) and sender_ref.strip():
|
|
# String path like "users/abc123"
|
|
doc = db.document(sender_ref).get()
|
|
if doc.exists:
|
|
data = doc.to_dict()
|
|
sender_id = doc.id
|
|
sender_name = data.get("display_name", "") or data.get("email", "")
|
|
except Exception as e:
|
|
print(f"[helpdesk] Error resolving sender: {e}")
|
|
|
|
return sender_id, sender_name
|
|
|
|
|
|
def _resolve_device_name(db, device_id: str) -> str:
|
|
"""Look up device_name from device_id."""
|
|
if not device_id:
|
|
return ""
|
|
try:
|
|
doc = db.collection("devices").document(device_id.strip()).get()
|
|
if doc.exists:
|
|
return doc.to_dict().get("device_name", "")
|
|
except Exception as e:
|
|
print(f"[helpdesk] Error resolving device name: {e}")
|
|
return ""
|
|
|
|
|
|
def _doc_to_message(db, doc) -> HelpdeskMessage:
|
|
"""Convert a Firestore document snapshot to a HelpdeskMessage."""
|
|
data = doc.to_dict()
|
|
|
|
# Resolve sender reference
|
|
sender_ref = data.get("sender")
|
|
sender_id, sender_name = _resolve_sender(db, sender_ref)
|
|
|
|
# Handle date_sent (could be Firestore Timestamp)
|
|
date_sent = data.get("date_sent", "")
|
|
if isinstance(date_sent, datetime):
|
|
date_sent = date_sent.strftime("%d %B %Y at %H:%M:%S UTC%z")
|
|
|
|
# Resolve device name if device_id present
|
|
device_id = data.get("device_id", "")
|
|
device_name = ""
|
|
if device_id:
|
|
device_name = _resolve_device_name(db, device_id)
|
|
|
|
# Handle acknowledged_at
|
|
acknowledged_at = data.get("acknowledged_at", "")
|
|
if isinstance(acknowledged_at, datetime):
|
|
acknowledged_at = acknowledged_at.strftime("%d %B %Y at %H:%M:%S UTC%z")
|
|
|
|
return HelpdeskMessage(
|
|
id=doc.id,
|
|
sender_id=sender_id,
|
|
sender_name=sender_name,
|
|
type=data.get("type", ""),
|
|
date_sent=date_sent,
|
|
subject=data.get("subject", ""),
|
|
message=data.get("message", ""),
|
|
phone=data.get("phone", ""),
|
|
device_id=device_id,
|
|
device_name=device_name,
|
|
acknowledged=data.get("acknowledged", False),
|
|
acknowledged_by=data.get("acknowledged_by", ""),
|
|
acknowledged_at=acknowledged_at,
|
|
)
|
|
|
|
|
|
def list_messages(
|
|
user_id: str | None = None,
|
|
device_id: str | None = None,
|
|
msg_type: str | None = None,
|
|
) -> list[HelpdeskMessage]:
|
|
"""List helpdesk messages with optional filters."""
|
|
db = get_db()
|
|
ref = db.collection(COLLECTION)
|
|
query = ref
|
|
|
|
if msg_type:
|
|
query = query.where("type", "==", msg_type)
|
|
|
|
if device_id:
|
|
query = query.where("device_id", "==", device_id)
|
|
|
|
docs = list(query.stream())
|
|
results = []
|
|
|
|
for doc in docs:
|
|
msg = _doc_to_message(db, doc)
|
|
|
|
# Filter by sender user_id client-side (sender is a DocumentReference)
|
|
if user_id and msg.sender_id != user_id:
|
|
continue
|
|
|
|
results.append(msg)
|
|
|
|
# Sort by date_sent descending
|
|
results.sort(key=lambda m: m.date_sent or "", reverse=True)
|
|
|
|
return results
|
|
|
|
|
|
def get_message(message_id: str) -> HelpdeskMessage:
|
|
"""Get a single helpdesk message by ID."""
|
|
db = get_db()
|
|
doc = db.collection(COLLECTION).document(message_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Helpdesk message")
|
|
return _doc_to_message(db, doc)
|
|
|
|
|
|
def toggle_acknowledged(message_id: str, acknowledged_by: str = "") -> HelpdeskMessage:
|
|
"""Toggle the acknowledged status of a helpdesk message."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(message_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Helpdesk message")
|
|
|
|
data = doc.to_dict()
|
|
currently_acknowledged = data.get("acknowledged", False)
|
|
now = datetime.now(timezone.utc).strftime("%d %B %Y at %H:%M:%S UTC")
|
|
|
|
if currently_acknowledged:
|
|
doc_ref.update({
|
|
"acknowledged": False,
|
|
"acknowledged_by": "",
|
|
"acknowledged_at": "",
|
|
})
|
|
else:
|
|
doc_ref.update({
|
|
"acknowledged": True,
|
|
"acknowledged_by": acknowledged_by,
|
|
"acknowledged_at": now,
|
|
})
|
|
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_message(db, updated_doc)
|