import secrets import string from google.cloud.firestore_v1 import GeoPoint from shared.firebase import get_db from shared.exceptions import NotFoundError from devices.models import DeviceCreate, DeviceUpdate, DeviceInDB from mqtt.mosquitto import register_device_password COLLECTION = "devices" # Serial number format: BS-XXXX-XXXX (uppercase alphanumeric) SN_CHARS = string.ascii_uppercase + string.digits SN_SEGMENT_LEN = 4 def _generate_serial_number() -> str: """Generate a unique serial number in the format BS-XXXX-XXXX.""" seg1 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN)) seg2 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN)) return f"BS-{seg1}-{seg2}" def _ensure_unique_serial(db) -> str: """Generate a serial number and verify it doesn't already exist in Firestore.""" existing_sns = set() for doc in db.collection(COLLECTION).select(["device_id"]).stream(): data = doc.to_dict() if data.get("device_id"): existing_sns.add(data["device_id"]) for _ in range(100): # safety limit sn = _generate_serial_number() if sn not in existing_sns: return sn raise RuntimeError("Could not generate a unique serial number after 100 attempts") def _convert_firestore_value(val): """Convert Firestore-specific types (Timestamp, GeoPoint) to strings.""" if hasattr(val, "seconds") and hasattr(val, "nanos"): # Firestore Timestamp → ISO string from datetime import datetime, timezone dt = datetime.fromtimestamp(val.seconds + val.nanos / 1e9, tz=timezone.utc) return dt.strftime("%d %B %Y at %H:%M:%S UTC") if isinstance(val, GeoPoint): return f"{val.latitude}° N, {val.longitude}° E" return val def _sanitize_dict(d: dict) -> dict: """Recursively convert Firestore-native types in a dict to plain strings.""" result = {} for k, v in d.items(): if isinstance(v, dict): result[k] = _sanitize_dict(v) elif isinstance(v, list): result[k] = [ _sanitize_dict(item) if isinstance(item, dict) else _convert_firestore_value(item) for item in v ] else: result[k] = _convert_firestore_value(v) return result def _doc_to_device(doc) -> DeviceInDB: """Convert a Firestore document snapshot to a DeviceInDB model.""" data = _sanitize_dict(doc.to_dict()) return DeviceInDB(id=doc.id, **data) def list_devices( search: str | None = None, online_only: bool | None = None, subscription_tier: str | None = None, ) -> list[DeviceInDB]: """List devices with optional filters.""" db = get_db() ref = db.collection(COLLECTION) query = ref if subscription_tier: query = query.where("device_subscription.subscrTier", "==", subscription_tier) docs = query.stream() results = [] for doc in docs: device = _doc_to_device(doc) # Client-side filters if online_only is not None and device.is_Online != online_only: continue if search: search_lower = search.lower() name_match = search_lower in (device.device_name or "").lower() location_match = search_lower in (device.device_location or "").lower() sn_match = search_lower in (device.device_id or "").lower() if not (name_match or location_match or sn_match): continue results.append(device) return results def get_device(device_doc_id: str) -> DeviceInDB: """Get a single device by Firestore document ID.""" db = get_db() doc = db.collection(COLLECTION).document(device_doc_id).get() if not doc.exists: raise NotFoundError("Device") return _doc_to_device(doc) def create_device(data: DeviceCreate) -> DeviceInDB: """Create a new device document in Firestore with auto-generated serial number.""" db = get_db() # Generate unique serial number serial_number = _ensure_unique_serial(db) # Generate MQTT password and register with Mosquitto mqtt_password = secrets.token_urlsafe(24) register_device_password(serial_number, mqtt_password) doc_data = data.model_dump() doc_data["device_id"] = serial_number _, doc_ref = db.collection(COLLECTION).add(doc_data) return DeviceInDB(id=doc_ref.id, **doc_data) def update_device(device_doc_id: str, data: DeviceUpdate) -> DeviceInDB: """Update an existing device document. Only provided fields are updated.""" db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") update_data = data.model_dump(exclude_none=True) # For nested structs, merge with existing data rather than replacing existing = doc.to_dict() nested_keys = ( "device_attributes", "device_subscription", "device_stats", ) for key in nested_keys: if key in update_data and key in existing: merged = {**existing[key], **update_data[key]} update_data[key] = merged doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_device(updated_doc) def delete_device(device_doc_id: str) -> None: """Delete a device document from Firestore.""" db = get_db() doc_ref = db.collection(COLLECTION).document(device_doc_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Device") doc_ref.delete()