from datetime import datetime from google.cloud.firestore_v1 import DocumentReference from shared.firebase import get_db, get_bucket from shared.exceptions import NotFoundError from users.models import UserCreate, UserUpdate, UserInDB COLLECTION = "users" def _convert_firestore_value(val): """Convert Firestore-specific types (Timestamp, DocumentReference) to strings.""" if isinstance(val, datetime): return val.strftime("%d %B %Y at %H:%M:%S UTC%z") if isinstance(val, DocumentReference): return val.path return val def _sanitize_dict(d: dict) -> dict: """Recursively convert Firestore-native types in a dict to plain strings.""" result = {} for k, v in d.items(): if isinstance(v, dict): result[k] = _sanitize_dict(v) elif isinstance(v, list): result[k] = [ _sanitize_dict(item) if isinstance(item, dict) else _convert_firestore_value(item) for item in v ] else: result[k] = _convert_firestore_value(v) return result def _doc_to_user(doc) -> UserInDB: """Convert a Firestore document snapshot to a UserInDB model.""" data = _sanitize_dict(doc.to_dict()) return UserInDB(id=doc.id, **data) def list_users( search: str | None = None, status: str | None = None, ) -> list[UserInDB]: """List users with optional filters.""" db = get_db() ref = db.collection(COLLECTION) query = ref if status: query = query.where("status", "==", status) docs = query.stream() results = [] for doc in docs: user = _doc_to_user(doc) if search: search_lower = search.lower() name_match = search_lower in (user.display_name or "").lower() email_match = search_lower in (user.email or "").lower() phone_match = search_lower in (user.phone_number or "").lower() uid_match = search_lower in (user.uid or "").lower() if not (name_match or email_match or phone_match or uid_match): continue results.append(user) return results def get_user(user_doc_id: str) -> UserInDB: """Get a single user by Firestore document ID.""" db = get_db() doc = db.collection(COLLECTION).document(user_doc_id).get() if not doc.exists: raise NotFoundError("User") return _doc_to_user(doc) def create_user(data: UserCreate) -> UserInDB: """Create a new user document in Firestore.""" db = get_db() doc_data = data.model_dump() doc_data["friendsList"] = [] doc_data["friendsInvited"] = [] _, doc_ref = db.collection(COLLECTION).add(doc_data) return UserInDB(id=doc_ref.id, **doc_data) def update_user(user_doc_id: str, data: UserUpdate) -> UserInDB: """Update an existing user document. Only provided fields are updated.""" db = get_db() doc_ref = db.collection(COLLECTION).document(user_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("User") update_data = data.model_dump(exclude_none=True) doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_user(updated_doc) def delete_user(user_doc_id: str) -> None: """Delete a user document from Firestore.""" db = get_db() doc_ref = db.collection(COLLECTION).document(user_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("User") doc_ref.delete() def block_user(user_doc_id: str) -> UserInDB: """Block a user by setting their status to 'blocked'.""" db = get_db() doc_ref = db.collection(COLLECTION).document(user_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("User") doc_ref.update({"status": "blocked"}) updated_doc = doc_ref.get() return _doc_to_user(updated_doc) def unblock_user(user_doc_id: str) -> UserInDB: """Unblock a user by setting their status to 'active'.""" db = get_db() doc_ref = db.collection(COLLECTION).document(user_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("User") doc_ref.update({"status": "active"}) updated_doc = doc_ref.get() return _doc_to_user(updated_doc) def assign_device(user_doc_id: str, device_doc_id: str) -> UserInDB: """Assign a device to a user by adding user ref to device's user_list.""" db = get_db() # Verify user exists user_ref = db.collection(COLLECTION).document(user_doc_id) user_doc = user_ref.get() if not user_doc.exists: raise NotFoundError("User") # Verify device exists device_ref = db.collection("devices").document(device_doc_id) device_doc = device_ref.get() if not device_doc.exists: raise NotFoundError("Device") # Add user path to device's user_list if not already there device_data = device_doc.to_dict() user_list = device_data.get("user_list", []) user_path = f"users/{user_doc_id}" # Check if already assigned (handle both string paths and DocumentReferences) already_assigned = False for entry in user_list: if isinstance(entry, DocumentReference): if entry.path == user_path: already_assigned = True break elif entry == user_path: already_assigned = True break if not already_assigned: user_list.append(user_path) device_ref.update({"user_list": user_list}) return _doc_to_user(user_ref.get()) def unassign_device(user_doc_id: str, device_doc_id: str) -> UserInDB: """Remove a user from a device's user_list.""" db = get_db() # Verify user exists user_ref = db.collection(COLLECTION).document(user_doc_id) user_doc = user_ref.get() if not user_doc.exists: raise NotFoundError("User") # Verify device exists device_ref = db.collection("devices").document(device_doc_id) device_doc = device_ref.get() if not device_doc.exists: raise NotFoundError("Device") # Remove user from device's user_list device_data = device_doc.to_dict() user_list = device_data.get("user_list", []) user_path = f"users/{user_doc_id}" new_list = [] for entry in user_list: if isinstance(entry, DocumentReference): if entry.path != user_path: new_list.append(entry) elif entry != user_path: new_list.append(entry) device_ref.update({"user_list": new_list}) return _doc_to_user(user_ref.get()) def get_user_devices(user_doc_id: str) -> list[dict]: """Get all devices assigned to a user.""" db = get_db() # Verify user exists user_ref = db.collection(COLLECTION).document(user_doc_id) user_doc = user_ref.get() if not user_doc.exists: raise NotFoundError("User") user_path = f"users/{user_doc_id}" # Search all devices for this user in their user_list devices = [] for doc in db.collection("devices").stream(): data = doc.to_dict() user_list = data.get("user_list", []) for entry in user_list: entry_path = entry.path if isinstance(entry, DocumentReference) else entry if entry_path == user_path: devices.append({ "id": doc.id, "device_name": data.get("device_name", ""), "device_id": data.get("device_id", ""), "device_location": data.get("device_location", ""), "is_Online": data.get("is_Online", False), }) break return devices def upload_photo(user_doc_id: str, file_bytes: bytes, filename: str, content_type: str) -> str: """Upload a profile photo to Firebase Storage and update the user's photo_url.""" db = get_db() bucket = get_bucket() if not bucket: raise RuntimeError("Firebase Storage not initialized") # Verify user exists doc_ref = db.collection(COLLECTION).document(user_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("User") ext = filename.rsplit(".", 1)[-1] if "." in filename else "jpg" storage_path = f"users/{user_doc_id}/uploads/profile.{ext}" blob = bucket.blob(storage_path) blob.upload_from_string(file_bytes, content_type=content_type) blob.make_public() photo_url = blob.public_url doc_ref.update({"photo_url": photo_url}) return photo_url