Files
bellsystems-cp/backend/devices/service.py

265 lines
8.7 KiB
Python

import secrets
import string
from datetime import datetime
from google.cloud.firestore_v1 import GeoPoint, DocumentReference
from shared.firebase import get_db
from shared.exceptions import NotFoundError
from devices.models import DeviceCreate, DeviceUpdate, DeviceInDB
from mqtt.mosquitto import register_device_password
COLLECTION = "devices"
# Serial number format: BS-XXXX-XXXX (uppercase alphanumeric)
SN_CHARS = string.ascii_uppercase + string.digits
SN_SEGMENT_LEN = 4
def _generate_serial_number() -> str:
"""Generate a unique serial number in the format BS-XXXX-XXXX."""
seg1 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN))
seg2 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN))
return f"BS-{seg1}-{seg2}"
def _ensure_unique_serial(db) -> str:
"""Generate a serial number and verify it doesn't already exist in Firestore."""
existing_sns = set()
for doc in db.collection(COLLECTION).select(["device_id"]).stream():
data = doc.to_dict()
if data.get("device_id"):
existing_sns.add(data["device_id"])
for _ in range(100): # safety limit
sn = _generate_serial_number()
if sn not in existing_sns:
return sn
raise RuntimeError("Could not generate a unique serial number after 100 attempts")
def _convert_firestore_value(val):
"""Convert Firestore-specific types (Timestamp, GeoPoint, DocumentReference) to strings."""
if isinstance(val, datetime):
# Firestore DatetimeWithNanoseconds is a datetime subclass
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
if isinstance(val, GeoPoint):
return f"{val.latitude}° N, {val.longitude}° E"
if isinstance(val, DocumentReference):
# Store the document path (e.g. "users/abc123")
return val.path
return val
def _sanitize_dict(d: dict) -> dict:
"""Recursively convert Firestore-native types in a dict to plain strings."""
result = {}
for k, v in d.items():
if isinstance(v, dict):
result[k] = _sanitize_dict(v)
elif isinstance(v, list):
result[k] = [
_sanitize_dict(item) if isinstance(item, dict)
else _convert_firestore_value(item)
for item in v
]
else:
result[k] = _convert_firestore_value(v)
return result
def _doc_to_device(doc) -> DeviceInDB:
"""Convert a Firestore document snapshot to a DeviceInDB model."""
data = _sanitize_dict(doc.to_dict())
return DeviceInDB(id=doc.id, **data)
def list_devices(
search: str | None = None,
online_only: bool | None = None,
subscription_tier: str | None = None,
) -> list[DeviceInDB]:
"""List devices with optional filters."""
db = get_db()
ref = db.collection(COLLECTION)
query = ref
if subscription_tier:
query = query.where("device_subscription.subscrTier", "==", subscription_tier)
docs = query.stream()
results = []
for doc in docs:
device = _doc_to_device(doc)
# Client-side filters
if online_only is not None and device.is_Online != online_only:
continue
if search:
search_lower = search.lower()
name_match = search_lower in (device.device_name or "").lower()
location_match = search_lower in (device.device_location or "").lower()
sn_match = search_lower in (device.device_id or "").lower()
if not (name_match or location_match or sn_match):
continue
results.append(device)
return results
def get_device(device_doc_id: str) -> DeviceInDB:
"""Get a single device by Firestore document ID."""
db = get_db()
doc = db.collection(COLLECTION).document(device_doc_id).get()
if not doc.exists:
raise NotFoundError("Device")
return _doc_to_device(doc)
def create_device(data: DeviceCreate) -> DeviceInDB:
"""Create a new device document in Firestore with auto-generated serial number."""
db = get_db()
# Generate unique serial number
serial_number = _ensure_unique_serial(db)
# Generate MQTT password and register with Mosquitto
mqtt_password = secrets.token_urlsafe(24)
register_device_password(serial_number, mqtt_password)
doc_data = data.model_dump()
doc_data["device_id"] = serial_number
_, doc_ref = db.collection(COLLECTION).add(doc_data)
return DeviceInDB(id=doc_ref.id, **doc_data)
def update_device(device_doc_id: str, data: DeviceUpdate) -> DeviceInDB:
"""Update an existing device document. Only provided fields are updated."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(device_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Device")
update_data = data.model_dump(exclude_none=True)
# For nested structs, merge with existing data rather than replacing
existing = doc.to_dict()
nested_keys = (
"device_attributes", "device_subscription", "device_stats",
)
for key in nested_keys:
if key in update_data and key in existing:
merged = {**existing[key], **update_data[key]}
update_data[key] = merged
doc_ref.update(update_data)
updated_doc = doc_ref.get()
return _doc_to_device(updated_doc)
def get_device_users(device_doc_id: str) -> list[dict]:
"""Get users assigned to a device from the device_users sub-collection.
Falls back to the user_list field on the device document if the
sub-collection is empty.
"""
db = get_db()
doc_ref = db.collection(COLLECTION).document(device_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Device")
# Try sub-collection first
sub_docs = list(doc_ref.collection("device_users").stream())
users = []
if sub_docs:
for sub_doc in sub_docs:
sub_data = sub_doc.to_dict()
role = sub_data.get("role", "")
user_ref = sub_data.get("user_reference")
# Resolve user reference
user_id = ""
display_name = ""
email = ""
if isinstance(user_ref, DocumentReference):
try:
user_doc = user_ref.get()
if user_doc.exists:
user_data = user_doc.to_dict()
user_id = user_doc.id
display_name = user_data.get("display_name", "")
email = user_data.get("email", "")
except Exception as e:
print(f"[devices] Error resolving user reference: {e}")
continue
elif isinstance(user_ref, str):
# String path like "users/abc123"
try:
ref_doc = db.document(user_ref).get()
if ref_doc.exists:
user_data = ref_doc.to_dict()
user_id = ref_doc.id
display_name = user_data.get("display_name", "")
email = user_data.get("email", "")
except Exception as e:
print(f"[devices] Error resolving user path: {e}")
continue
users.append({
"user_id": user_id,
"display_name": display_name,
"email": email,
"role": role,
})
else:
# Fallback to user_list field
device_data = doc.to_dict()
user_list = device_data.get("user_list", [])
for entry in user_list:
try:
if isinstance(entry, DocumentReference):
user_doc = entry.get()
elif isinstance(entry, str) and entry.strip():
# Could be a path like "users/abc" or a raw doc ID
if "/" in entry:
user_doc = db.document(entry).get()
else:
user_doc = db.collection("users").document(entry).get()
else:
continue
if user_doc.exists:
user_data = user_doc.to_dict()
users.append({
"user_id": user_doc.id,
"display_name": user_data.get("display_name", ""),
"email": user_data.get("email", ""),
"role": "",
})
except Exception as e:
print(f"[devices] Error resolving user_list entry: {e}")
return users
def delete_device(device_doc_id: str) -> None:
"""Delete a device document from Firestore."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(device_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Device")
doc_ref.delete()