import secrets import string from datetime import datetime from google.cloud.firestore_v1 import GeoPoint, DocumentReference from shared.firebase import get_db from shared.exceptions import NotFoundError from devices.models import DeviceCreate, DeviceUpdate, DeviceInDB from mqtt.mosquitto import register_device_password COLLECTION = "devices" # Serial number format: BS-XXXX-XXXX (uppercase alphanumeric) SN_CHARS = string.ascii_uppercase + string.digits SN_SEGMENT_LEN = 4 def _generate_serial_number() -> str: """Generate a unique serial number in the format BS-XXXX-XXXX.""" seg1 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN)) seg2 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN)) return f"BS-{seg1}-{seg2}" def _ensure_unique_serial(db) -> str: """Generate a serial number and verify it doesn't already exist in Firestore.""" existing_sns = set() for doc in db.collection(COLLECTION).select(["device_id"]).stream(): data = doc.to_dict() if data.get("device_id"): existing_sns.add(data["device_id"]) for _ in range(100): # safety limit sn = _generate_serial_number() if sn not in existing_sns: return sn raise RuntimeError("Could not generate a unique serial number after 100 attempts") def _convert_firestore_value(val): """Convert Firestore-specific types (Timestamp, GeoPoint, DocumentReference) to strings.""" if isinstance(val, datetime): # Firestore DatetimeWithNanoseconds is a datetime subclass return val.strftime("%d %B %Y at %H:%M:%S UTC%z") if isinstance(val, GeoPoint): return f"{val.latitude}° N, {val.longitude}° E" if isinstance(val, DocumentReference): # Store the document path (e.g. "users/abc123") return val.path return val def _sanitize_dict(d: dict) -> dict: """Recursively convert Firestore-native types in a dict to plain strings.""" result = {} for k, v in d.items(): if isinstance(v, dict): result[k] = _sanitize_dict(v) elif isinstance(v, list): result[k] = [ _sanitize_dict(item) if isinstance(item, dict) else _convert_firestore_value(item) for item in v ] else: result[k] = _convert_firestore_value(v) return result def _doc_to_device(doc) -> DeviceInDB: """Convert a Firestore document snapshot to a DeviceInDB model.""" data = _sanitize_dict(doc.to_dict()) return DeviceInDB(id=doc.id, **data) def list_devices( search: str | None = None, online_only: bool | None = None, subscription_tier: str | None = None, ) -> list[DeviceInDB]: """List devices with optional filters.""" db = get_db() ref = db.collection(COLLECTION) query = ref if subscription_tier: query = query.where("device_subscription.subscrTier", "==", subscription_tier) docs = query.stream() results = [] for doc in docs: device = _doc_to_device(doc) # Client-side filters if online_only is not None and device.is_Online != online_only: continue if search: search_lower = search.lower() name_match = search_lower in (device.device_name or "").lower() location_match = search_lower in (device.device_location or "").lower() sn_match = search_lower in (device.device_id or "").lower() if not (name_match or location_match or sn_match): continue results.append(device) return results def get_device(device_doc_id: str) -> DeviceInDB: """Get a single device by Firestore document ID.""" db = get_db() doc = db.collection(COLLECTION).document(device_doc_id).get() if not doc.exists: raise NotFoundError("Device") return _doc_to_device(doc) def create_device(data: DeviceCreate) -> DeviceInDB: """Create a new device document in Firestore with auto-generated serial number.""" db = get_db() # Generate unique serial number serial_number = _ensure_unique_serial(db) # Generate MQTT password and register with Mosquitto mqtt_password = secrets.token_urlsafe(24) register_device_password(serial_number, mqtt_password) doc_data = data.model_dump() doc_data["device_id"] = serial_number _, doc_ref = db.collection(COLLECTION).add(doc_data) return DeviceInDB(id=doc_ref.id, **doc_data) def update_device(device_doc_id: str, data: DeviceUpdate) -> DeviceInDB: """Update an existing device document. Only provided fields are updated.""" db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") update_data = data.model_dump(exclude_none=True) # For nested structs, merge with existing data rather than replacing existing = doc.to_dict() nested_keys = ( "device_attributes", "device_subscription", "device_stats", ) for key in nested_keys: if key in update_data and key in existing: merged = {**existing[key], **update_data[key]} update_data[key] = merged doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_device(updated_doc) def get_device_users(device_doc_id: str) -> list[dict]: """Get users assigned to a device from the device_users sub-collection. Falls back to the user_list field on the device document if the sub-collection is empty. """ db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") # Try sub-collection first sub_docs = list(doc_ref.collection("device_users").stream()) users = [] if sub_docs: for sub_doc in sub_docs: sub_data = sub_doc.to_dict() role = sub_data.get("role", "") user_ref = sub_data.get("user_reference") # Resolve user reference user_id = "" display_name = "" email = "" photo_url = "" if isinstance(user_ref, DocumentReference): try: user_doc = user_ref.get() if user_doc.exists: user_data = user_doc.to_dict() user_id = user_doc.id display_name = user_data.get("display_name", "") email = user_data.get("email", "") photo_url = user_data.get("photo_url", "") except Exception as e: print(f"[devices] Error resolving user reference: {e}") continue elif isinstance(user_ref, str): # String path like "users/abc123" try: ref_doc = db.document(user_ref).get() if ref_doc.exists: user_data = ref_doc.to_dict() user_id = ref_doc.id display_name = user_data.get("display_name", "") email = user_data.get("email", "") photo_url = user_data.get("photo_url", "") except Exception as e: print(f"[devices] Error resolving user path: {e}") continue users.append({ "user_id": user_id, "display_name": display_name, "email": email, "role": role, "photo_url": photo_url, }) else: # Fallback to user_list field device_data = doc.to_dict() user_list = device_data.get("user_list", []) for entry in user_list: try: if isinstance(entry, DocumentReference): user_doc = entry.get() elif isinstance(entry, str) and entry.strip(): # Could be a path like "users/abc" or a raw doc ID if "/" in entry: user_doc = db.document(entry).get() else: user_doc = db.collection("users").document(entry).get() else: continue if user_doc.exists: user_data = user_doc.to_dict() users.append({ "user_id": user_doc.id, "display_name": user_data.get("display_name", ""), "email": user_data.get("email", ""), "role": "", "photo_url": user_data.get("photo_url", ""), }) except Exception as e: print(f"[devices] Error resolving user_list entry: {e}") return users def delete_device(device_doc_id: str) -> None: """Delete a device document from Firestore.""" db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") doc_ref.delete()