279 lines
8.3 KiB
Python
279 lines
8.3 KiB
Python
from datetime import datetime
|
|
|
|
from google.cloud.firestore_v1 import DocumentReference
|
|
|
|
from shared.firebase import get_db, get_bucket
|
|
from shared.exceptions import NotFoundError
|
|
from users.models import UserCreate, UserUpdate, UserInDB
|
|
|
|
COLLECTION = "users"
|
|
|
|
|
|
def _convert_firestore_value(val):
|
|
"""Convert Firestore-specific types (Timestamp, DocumentReference) to strings."""
|
|
if isinstance(val, datetime):
|
|
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
|
|
if isinstance(val, DocumentReference):
|
|
return val.path
|
|
return val
|
|
|
|
|
|
def _sanitize_dict(d: dict) -> dict:
|
|
"""Recursively convert Firestore-native types in a dict to plain strings."""
|
|
result = {}
|
|
for k, v in d.items():
|
|
if isinstance(v, dict):
|
|
result[k] = _sanitize_dict(v)
|
|
elif isinstance(v, list):
|
|
result[k] = [
|
|
_sanitize_dict(item) if isinstance(item, dict)
|
|
else _convert_firestore_value(item)
|
|
for item in v
|
|
]
|
|
else:
|
|
result[k] = _convert_firestore_value(v)
|
|
return result
|
|
|
|
|
|
def _doc_to_user(doc) -> UserInDB:
|
|
"""Convert a Firestore document snapshot to a UserInDB model."""
|
|
data = _sanitize_dict(doc.to_dict())
|
|
return UserInDB(id=doc.id, **data)
|
|
|
|
|
|
def list_users(
|
|
search: str | None = None,
|
|
status: str | None = None,
|
|
) -> list[UserInDB]:
|
|
"""List users with optional filters."""
|
|
db = get_db()
|
|
ref = db.collection(COLLECTION)
|
|
query = ref
|
|
|
|
if status:
|
|
query = query.where("status", "==", status)
|
|
|
|
docs = query.stream()
|
|
results = []
|
|
|
|
for doc in docs:
|
|
user = _doc_to_user(doc)
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
name_match = search_lower in (user.display_name or "").lower()
|
|
email_match = search_lower in (user.email or "").lower()
|
|
phone_match = search_lower in (user.phone_number or "").lower()
|
|
uid_match = search_lower in (user.uid or "").lower()
|
|
if not (name_match or email_match or phone_match or uid_match):
|
|
continue
|
|
|
|
results.append(user)
|
|
|
|
return results
|
|
|
|
|
|
def get_user(user_doc_id: str) -> UserInDB:
|
|
"""Get a single user by Firestore document ID."""
|
|
db = get_db()
|
|
doc = db.collection(COLLECTION).document(user_doc_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("User")
|
|
return _doc_to_user(doc)
|
|
|
|
|
|
def create_user(data: UserCreate) -> UserInDB:
|
|
"""Create a new user document in Firestore."""
|
|
db = get_db()
|
|
doc_data = data.model_dump()
|
|
doc_data["friendsList"] = []
|
|
doc_data["friendsInvited"] = []
|
|
|
|
_, doc_ref = db.collection(COLLECTION).add(doc_data)
|
|
|
|
return UserInDB(id=doc_ref.id, **doc_data)
|
|
|
|
|
|
def update_user(user_doc_id: str, data: UserUpdate) -> UserInDB:
|
|
"""Update an existing user document. Only provided fields are updated."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
doc_ref.update(update_data)
|
|
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_user(updated_doc)
|
|
|
|
|
|
def delete_user(user_doc_id: str) -> None:
|
|
"""Delete a user document from Firestore."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
doc_ref.delete()
|
|
|
|
|
|
def block_user(user_doc_id: str) -> UserInDB:
|
|
"""Block a user by setting their status to 'blocked'."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
doc_ref.update({"status": "blocked"})
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_user(updated_doc)
|
|
|
|
|
|
def unblock_user(user_doc_id: str) -> UserInDB:
|
|
"""Unblock a user by setting their status to 'active'."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
doc_ref.update({"status": "active"})
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_user(updated_doc)
|
|
|
|
|
|
def assign_device(user_doc_id: str, device_doc_id: str) -> UserInDB:
|
|
"""Assign a device to a user by adding user ref to device's user_list."""
|
|
db = get_db()
|
|
|
|
# Verify user exists
|
|
user_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
user_doc = user_ref.get()
|
|
if not user_doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
# Verify device exists
|
|
device_ref = db.collection("devices").document(device_doc_id)
|
|
device_doc = device_ref.get()
|
|
if not device_doc.exists:
|
|
raise NotFoundError("Device")
|
|
|
|
# Add user path to device's user_list if not already there
|
|
device_data = device_doc.to_dict()
|
|
user_list = device_data.get("user_list", [])
|
|
user_path = f"users/{user_doc_id}"
|
|
|
|
# Check if already assigned (handle both string paths and DocumentReferences)
|
|
already_assigned = False
|
|
for entry in user_list:
|
|
if isinstance(entry, DocumentReference):
|
|
if entry.path == user_path:
|
|
already_assigned = True
|
|
break
|
|
elif entry == user_path:
|
|
already_assigned = True
|
|
break
|
|
|
|
if not already_assigned:
|
|
user_list.append(user_path)
|
|
device_ref.update({"user_list": user_list})
|
|
|
|
return _doc_to_user(user_ref.get())
|
|
|
|
|
|
def unassign_device(user_doc_id: str, device_doc_id: str) -> UserInDB:
|
|
"""Remove a user from a device's user_list."""
|
|
db = get_db()
|
|
|
|
# Verify user exists
|
|
user_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
user_doc = user_ref.get()
|
|
if not user_doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
# Verify device exists
|
|
device_ref = db.collection("devices").document(device_doc_id)
|
|
device_doc = device_ref.get()
|
|
if not device_doc.exists:
|
|
raise NotFoundError("Device")
|
|
|
|
# Remove user from device's user_list
|
|
device_data = device_doc.to_dict()
|
|
user_list = device_data.get("user_list", [])
|
|
user_path = f"users/{user_doc_id}"
|
|
|
|
new_list = []
|
|
for entry in user_list:
|
|
if isinstance(entry, DocumentReference):
|
|
if entry.path != user_path:
|
|
new_list.append(entry)
|
|
elif entry != user_path:
|
|
new_list.append(entry)
|
|
|
|
device_ref.update({"user_list": new_list})
|
|
|
|
return _doc_to_user(user_ref.get())
|
|
|
|
|
|
def get_user_devices(user_doc_id: str) -> list[dict]:
|
|
"""Get all devices assigned to a user."""
|
|
db = get_db()
|
|
|
|
# Verify user exists
|
|
user_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
user_doc = user_ref.get()
|
|
if not user_doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
user_path = f"users/{user_doc_id}"
|
|
|
|
# Search all devices for this user in their user_list
|
|
devices = []
|
|
for doc in db.collection("devices").stream():
|
|
data = doc.to_dict()
|
|
user_list = data.get("user_list", [])
|
|
|
|
for entry in user_list:
|
|
entry_path = entry.path if isinstance(entry, DocumentReference) else entry
|
|
if entry_path == user_path:
|
|
devices.append({
|
|
"id": doc.id,
|
|
"device_name": data.get("device_name", ""),
|
|
"device_id": data.get("device_id", ""),
|
|
"device_location": data.get("device_location", ""),
|
|
"is_Online": data.get("is_Online", False),
|
|
})
|
|
break
|
|
|
|
return devices
|
|
|
|
|
|
def upload_photo(user_doc_id: str, file_bytes: bytes, filename: str, content_type: str) -> str:
|
|
"""Upload a profile photo to Firebase Storage and update the user's photo_url."""
|
|
db = get_db()
|
|
bucket = get_bucket()
|
|
if not bucket:
|
|
raise RuntimeError("Firebase Storage not initialized")
|
|
|
|
# Verify user exists
|
|
doc_ref = db.collection(COLLECTION).document(user_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("User")
|
|
|
|
ext = filename.rsplit(".", 1)[-1] if "." in filename else "jpg"
|
|
storage_path = f"users/{user_doc_id}/uploads/profile.{ext}"
|
|
|
|
blob = bucket.blob(storage_path)
|
|
blob.upload_from_string(file_bytes, content_type=content_type)
|
|
blob.make_public()
|
|
|
|
photo_url = blob.public_url
|
|
doc_ref.update({"photo_url": photo_url})
|
|
|
|
return photo_url
|