import secrets import string from datetime import datetime from google.cloud.firestore_v1 import GeoPoint, DocumentReference from shared.firebase import get_db from shared.exceptions import NotFoundError from devices.models import DeviceCreate, DeviceUpdate, DeviceInDB from mqtt.mosquitto import register_device_password COLLECTION = "devices" # Serial number format: BS-XXXX-XXXX (uppercase alphanumeric) SN_CHARS = string.ascii_uppercase + string.digits SN_SEGMENT_LEN = 4 # Clock/silence/backlight fields stored as Firestore Timestamps (written as datetime) _TIMESTAMP_FIELD_NAMES = { "daySilenceFrom", "daySilenceTo", "nightSilenceFrom", "nightSilenceTo", "backlightTurnOnTime", "backlightTurnOffTime", } def _restore_timestamps(d: dict) -> dict: """Recursively convert ISO 8601 strings for known timestamp fields to datetime objects. Firestore stores Python datetime objects as native Timestamps, which Flutter reads as DateTime. Plain strings would break the Flutter app. """ result = {} for k, v in d.items(): if isinstance(v, dict): result[k] = _restore_timestamps(v) elif isinstance(v, str) and k in _TIMESTAMP_FIELD_NAMES: try: result[k] = datetime.fromisoformat(v.replace("Z", "+00:00")) except ValueError: result[k] = v else: result[k] = v return result def _generate_serial_number() -> str: """Generate a unique serial number in the format BS-XXXX-XXXX.""" seg1 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN)) seg2 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN)) return f"BS-{seg1}-{seg2}" def _ensure_unique_serial(db) -> str: """Generate a serial number and verify it doesn't already exist in Firestore.""" existing_sns = set() for doc in db.collection(COLLECTION).select(["device_id"]).stream(): data = doc.to_dict() if data.get("device_id"): existing_sns.add(data["device_id"]) for _ in range(100): # safety limit sn = _generate_serial_number() if sn not in existing_sns: return sn raise RuntimeError("Could not generate a unique serial number after 100 attempts") def _convert_firestore_value(val): """Convert Firestore-specific types (Timestamp, GeoPoint, DocumentReference) to strings.""" if isinstance(val, datetime): # Firestore DatetimeWithNanoseconds is a datetime subclass return val.strftime("%d %B %Y at %H:%M:%S UTC%z") if isinstance(val, GeoPoint): return f"{val.latitude}° N, {val.longitude}° E" if isinstance(val, DocumentReference): # Store the document path (e.g. "users/abc123") return val.path return val def _sanitize_dict(d: dict) -> dict: """Recursively convert Firestore-native types in a dict to plain strings.""" result = {} for k, v in d.items(): if isinstance(v, dict): result[k] = _sanitize_dict(v) elif isinstance(v, list): result[k] = [ _sanitize_dict(item) if isinstance(item, dict) else _convert_firestore_value(item) for item in v ] else: result[k] = _convert_firestore_value(v) return result def _doc_to_device(doc) -> DeviceInDB: """Convert a Firestore document snapshot to a DeviceInDB model.""" data = _sanitize_dict(doc.to_dict()) return DeviceInDB(id=doc.id, **data) def list_devices( search: str | None = None, online_only: bool | None = None, subscription_tier: str | None = None, ) -> list[DeviceInDB]: """List devices with optional filters.""" db = get_db() ref = db.collection(COLLECTION) query = ref if subscription_tier: query = query.where("device_subscription.subscrTier", "==", subscription_tier) docs = query.stream() results = [] for doc in docs: device = _doc_to_device(doc) # Client-side filters if online_only is not None and device.is_Online != online_only: continue if search: search_lower = search.lower() name_match = search_lower in (device.device_name or "").lower() location_match = search_lower in (device.device_location or "").lower() sn_match = search_lower in (device.device_id or "").lower() if not (name_match or location_match or sn_match): continue results.append(device) return results def get_device(device_doc_id: str) -> DeviceInDB: """Get a single device by Firestore document ID.""" db = get_db() doc = db.collection(COLLECTION).document(device_doc_id).get() if not doc.exists: raise NotFoundError("Device") return _doc_to_device(doc) def create_device(data: DeviceCreate) -> DeviceInDB: """Create a new device document in Firestore with auto-generated serial number.""" db = get_db() # Generate unique serial number serial_number = _ensure_unique_serial(db) # Generate MQTT password and register with Mosquitto mqtt_password = secrets.token_urlsafe(24) register_device_password(serial_number, mqtt_password) doc_data = data.model_dump() doc_data["device_id"] = serial_number _, doc_ref = db.collection(COLLECTION).add(doc_data) return DeviceInDB(id=doc_ref.id, **doc_data) def _deep_merge(base: dict, overrides: dict) -> dict: """Recursively merge overrides into base, preserving unmentioned nested keys.""" result = dict(base) for k, v in overrides.items(): if isinstance(v, dict) and isinstance(result.get(k), dict): result[k] = _deep_merge(result[k], v) else: result[k] = v return result def update_device(device_doc_id: str, data: DeviceUpdate) -> DeviceInDB: """Update an existing device document. Only provided fields are updated.""" db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") update_data = data.model_dump(exclude_none=True) # Deep-merge nested structs so unmentioned sub-fields are preserved existing = doc.to_dict() nested_keys = ( "device_attributes", "device_subscription", "device_stats", ) for key in nested_keys: if key in update_data and isinstance(existing.get(key), dict): update_data[key] = _deep_merge(existing[key], update_data[key]) update_data = _restore_timestamps(update_data) doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_device(updated_doc) def get_device_users(device_doc_id: str) -> list[dict]: """Get users assigned to a device from the device_users sub-collection. Falls back to the user_list field on the device document if the sub-collection is empty. """ db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") # Try sub-collection first sub_docs = list(doc_ref.collection("device_users").stream()) users = [] if sub_docs: for sub_doc in sub_docs: sub_data = sub_doc.to_dict() role = sub_data.get("role", "") user_ref = sub_data.get("user_reference") # Resolve user reference user_id = "" display_name = "" email = "" photo_url = "" if isinstance(user_ref, DocumentReference): try: user_doc = user_ref.get() if user_doc.exists: user_data = user_doc.to_dict() user_id = user_doc.id display_name = user_data.get("display_name", "") email = user_data.get("email", "") photo_url = user_data.get("photo_url", "") except Exception as e: print(f"[devices] Error resolving user reference: {e}") continue elif isinstance(user_ref, str): # String path like "users/abc123" try: ref_doc = db.document(user_ref).get() if ref_doc.exists: user_data = ref_doc.to_dict() user_id = ref_doc.id display_name = user_data.get("display_name", "") email = user_data.get("email", "") photo_url = user_data.get("photo_url", "") except Exception as e: print(f"[devices] Error resolving user path: {e}") continue users.append({ "user_id": user_id, "display_name": display_name, "email": email, "role": role, "photo_url": photo_url, }) else: # Fallback to user_list field device_data = doc.to_dict() user_list = device_data.get("user_list", []) for entry in user_list: try: if isinstance(entry, DocumentReference): user_doc = entry.get() elif isinstance(entry, str) and entry.strip(): # Could be a path like "users/abc" or a raw doc ID if "/" in entry: user_doc = db.document(entry).get() else: user_doc = db.collection("users").document(entry).get() else: continue if user_doc.exists: user_data = user_doc.to_dict() users.append({ "user_id": user_doc.id, "display_name": user_data.get("display_name", ""), "email": user_data.get("email", ""), "role": "", "photo_url": user_data.get("photo_url", ""), }) except Exception as e: print(f"[devices] Error resolving user_list entry: {e}") return users def delete_device(device_doc_id: str) -> None: """Delete a device document from Firestore.""" db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") doc_ref.delete()