Files
bellsystems-cp/backend/users/service.py

279 lines
8.3 KiB
Python

from datetime import datetime
from google.cloud.firestore_v1 import DocumentReference
from shared.firebase import get_db, get_bucket
from shared.exceptions import NotFoundError
from users.models import UserCreate, UserUpdate, UserInDB
COLLECTION = "users"
def _convert_firestore_value(val):
"""Convert Firestore-specific types (Timestamp, DocumentReference) to strings."""
if isinstance(val, datetime):
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
if isinstance(val, DocumentReference):
return val.path
return val
def _sanitize_dict(d: dict) -> dict:
"""Recursively convert Firestore-native types in a dict to plain strings."""
result = {}
for k, v in d.items():
if isinstance(v, dict):
result[k] = _sanitize_dict(v)
elif isinstance(v, list):
result[k] = [
_sanitize_dict(item) if isinstance(item, dict)
else _convert_firestore_value(item)
for item in v
]
else:
result[k] = _convert_firestore_value(v)
return result
def _doc_to_user(doc) -> UserInDB:
"""Convert a Firestore document snapshot to a UserInDB model."""
data = _sanitize_dict(doc.to_dict())
return UserInDB(id=doc.id, **data)
def list_users(
search: str | None = None,
status: str | None = None,
) -> list[UserInDB]:
"""List users with optional filters."""
db = get_db()
ref = db.collection(COLLECTION)
query = ref
if status:
query = query.where("status", "==", status)
docs = query.stream()
results = []
for doc in docs:
user = _doc_to_user(doc)
if search:
search_lower = search.lower()
name_match = search_lower in (user.display_name or "").lower()
email_match = search_lower in (user.email or "").lower()
phone_match = search_lower in (user.phone_number or "").lower()
uid_match = search_lower in (user.uid or "").lower()
if not (name_match or email_match or phone_match or uid_match):
continue
results.append(user)
return results
def get_user(user_doc_id: str) -> UserInDB:
"""Get a single user by Firestore document ID."""
db = get_db()
doc = db.collection(COLLECTION).document(user_doc_id).get()
if not doc.exists:
raise NotFoundError("User")
return _doc_to_user(doc)
def create_user(data: UserCreate) -> UserInDB:
"""Create a new user document in Firestore."""
db = get_db()
doc_data = data.model_dump()
doc_data["friendsList"] = []
doc_data["friendsInvited"] = []
_, doc_ref = db.collection(COLLECTION).add(doc_data)
return UserInDB(id=doc_ref.id, **doc_data)
def update_user(user_doc_id: str, data: UserUpdate) -> UserInDB:
"""Update an existing user document. Only provided fields are updated."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(user_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("User")
update_data = data.model_dump(exclude_none=True)
doc_ref.update(update_data)
updated_doc = doc_ref.get()
return _doc_to_user(updated_doc)
def delete_user(user_doc_id: str) -> None:
"""Delete a user document from Firestore."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(user_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("User")
doc_ref.delete()
def block_user(user_doc_id: str) -> UserInDB:
"""Block a user by setting their status to 'blocked'."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(user_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("User")
doc_ref.update({"status": "blocked"})
updated_doc = doc_ref.get()
return _doc_to_user(updated_doc)
def unblock_user(user_doc_id: str) -> UserInDB:
"""Unblock a user by setting their status to 'active'."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(user_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("User")
doc_ref.update({"status": "active"})
updated_doc = doc_ref.get()
return _doc_to_user(updated_doc)
def assign_device(user_doc_id: str, device_doc_id: str) -> UserInDB:
"""Assign a device to a user by adding user ref to device's user_list."""
db = get_db()
# Verify user exists
user_ref = db.collection(COLLECTION).document(user_doc_id)
user_doc = user_ref.get()
if not user_doc.exists:
raise NotFoundError("User")
# Verify device exists
device_ref = db.collection("devices").document(device_doc_id)
device_doc = device_ref.get()
if not device_doc.exists:
raise NotFoundError("Device")
# Add user path to device's user_list if not already there
device_data = device_doc.to_dict()
user_list = device_data.get("user_list", [])
user_path = f"users/{user_doc_id}"
# Check if already assigned (handle both string paths and DocumentReferences)
already_assigned = False
for entry in user_list:
if isinstance(entry, DocumentReference):
if entry.path == user_path:
already_assigned = True
break
elif entry == user_path:
already_assigned = True
break
if not already_assigned:
user_list.append(user_path)
device_ref.update({"user_list": user_list})
return _doc_to_user(user_ref.get())
def unassign_device(user_doc_id: str, device_doc_id: str) -> UserInDB:
"""Remove a user from a device's user_list."""
db = get_db()
# Verify user exists
user_ref = db.collection(COLLECTION).document(user_doc_id)
user_doc = user_ref.get()
if not user_doc.exists:
raise NotFoundError("User")
# Verify device exists
device_ref = db.collection("devices").document(device_doc_id)
device_doc = device_ref.get()
if not device_doc.exists:
raise NotFoundError("Device")
# Remove user from device's user_list
device_data = device_doc.to_dict()
user_list = device_data.get("user_list", [])
user_path = f"users/{user_doc_id}"
new_list = []
for entry in user_list:
if isinstance(entry, DocumentReference):
if entry.path != user_path:
new_list.append(entry)
elif entry != user_path:
new_list.append(entry)
device_ref.update({"user_list": new_list})
return _doc_to_user(user_ref.get())
def get_user_devices(user_doc_id: str) -> list[dict]:
"""Get all devices assigned to a user."""
db = get_db()
# Verify user exists
user_ref = db.collection(COLLECTION).document(user_doc_id)
user_doc = user_ref.get()
if not user_doc.exists:
raise NotFoundError("User")
user_path = f"users/{user_doc_id}"
# Search all devices for this user in their user_list
devices = []
for doc in db.collection("devices").stream():
data = doc.to_dict()
user_list = data.get("user_list", [])
for entry in user_list:
entry_path = entry.path if isinstance(entry, DocumentReference) else entry
if entry_path == user_path:
devices.append({
"id": doc.id,
"device_name": data.get("device_name", ""),
"device_id": data.get("device_id", ""),
"device_location": data.get("device_location", ""),
"is_Online": data.get("is_Online", False),
})
break
return devices
def upload_photo(user_doc_id: str, file_bytes: bytes, filename: str, content_type: str) -> str:
"""Upload a profile photo to Firebase Storage and update the user's photo_url."""
db = get_db()
bucket = get_bucket()
if not bucket:
raise RuntimeError("Firebase Storage not initialized")
# Verify user exists
doc_ref = db.collection(COLLECTION).document(user_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("User")
ext = filename.rsplit(".", 1)[-1] if "." in filename else "jpg"
storage_path = f"users/{user_doc_id}/uploads/profile.{ext}"
blob = bucket.blob(storage_path)
blob.upload_from_string(file_bytes, content_type=content_type)
blob.make_public()
photo_url = blob.public_url
doc_ref.update({"photo_url": photo_url})
return photo_url