Files
bellsystems-cp/backend/devices/service.py

303 lines
10 KiB
Python

import secrets
import string
from datetime import datetime
from google.cloud.firestore_v1 import GeoPoint, DocumentReference
from shared.firebase import get_db
from shared.exceptions import NotFoundError
from devices.models import DeviceCreate, DeviceUpdate, DeviceInDB
COLLECTION = "devices"
# Serial number format: BS-XXXX-XXXX (uppercase alphanumeric)
SN_CHARS = string.ascii_uppercase + string.digits
SN_SEGMENT_LEN = 4
# Clock/silence/backlight fields stored as Firestore Timestamps (written as datetime)
_TIMESTAMP_FIELD_NAMES = {
"daySilenceFrom", "daySilenceTo",
"nightSilenceFrom", "nightSilenceTo",
"backlightTurnOnTime", "backlightTurnOffTime",
}
def _restore_timestamps(d: dict) -> dict:
"""Recursively convert ISO 8601 strings for known timestamp fields to datetime objects.
Firestore stores Python datetime objects as native Timestamps, which Flutter
reads as DateTime. Plain strings would break the Flutter app.
"""
result = {}
for k, v in d.items():
if isinstance(v, dict):
result[k] = _restore_timestamps(v)
elif isinstance(v, str) and k in _TIMESTAMP_FIELD_NAMES:
try:
result[k] = datetime.fromisoformat(v.replace("Z", "+00:00"))
except ValueError:
result[k] = v
else:
result[k] = v
return result
def _generate_serial_number() -> str:
"""Generate a unique serial number in the format BS-XXXX-XXXX."""
seg1 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN))
seg2 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN))
return f"BS-{seg1}-{seg2}"
def _ensure_unique_serial(db) -> str:
"""Generate a serial number and verify it doesn't already exist in Firestore."""
existing_sns = set()
for doc in db.collection(COLLECTION).select(["device_id"]).stream():
data = doc.to_dict()
if data.get("device_id"):
existing_sns.add(data["device_id"])
for _ in range(100): # safety limit
sn = _generate_serial_number()
if sn not in existing_sns:
return sn
raise RuntimeError("Could not generate a unique serial number after 100 attempts")
def _convert_firestore_value(val):
"""Convert Firestore-specific types (Timestamp, GeoPoint, DocumentReference) to strings."""
if isinstance(val, datetime):
# Firestore DatetimeWithNanoseconds is a datetime subclass
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
if isinstance(val, GeoPoint):
return f"{val.latitude}° N, {val.longitude}° E"
if isinstance(val, DocumentReference):
# Store the document path (e.g. "users/abc123")
return val.path
return val
def _sanitize_dict(d: dict) -> dict:
"""Recursively convert Firestore-native types in a dict to plain strings."""
result = {}
for k, v in d.items():
if isinstance(v, dict):
result[k] = _sanitize_dict(v)
elif isinstance(v, list):
result[k] = [
_sanitize_dict(item) if isinstance(item, dict)
else _convert_firestore_value(item)
for item in v
]
else:
result[k] = _convert_firestore_value(v)
return result
def _doc_to_device(doc) -> DeviceInDB:
"""Convert a Firestore document snapshot to a DeviceInDB model."""
data = _sanitize_dict(doc.to_dict())
return DeviceInDB(id=doc.id, **data)
def list_devices(
search: str | None = None,
online_only: bool | None = None,
subscription_tier: str | None = None,
) -> list[DeviceInDB]:
"""List devices with optional filters."""
db = get_db()
ref = db.collection(COLLECTION)
query = ref
if subscription_tier:
query = query.where("device_subscription.subscrTier", "==", subscription_tier)
docs = query.stream()
results = []
for doc in docs:
device = _doc_to_device(doc)
# Client-side filters
if online_only is not None and device.is_Online != online_only:
continue
if search:
search_lower = search.lower()
name_match = search_lower in (device.device_name or "").lower()
location_match = search_lower in (device.device_location or "").lower()
sn_match = search_lower in (device.device_id or "").lower()
if not (name_match or location_match or sn_match):
continue
results.append(device)
return results
def get_device(device_doc_id: str) -> DeviceInDB:
"""Get a single device by Firestore document ID."""
db = get_db()
doc = db.collection(COLLECTION).document(device_doc_id).get()
if not doc.exists:
raise NotFoundError("Device")
return _doc_to_device(doc)
def create_device(data: DeviceCreate) -> DeviceInDB:
"""Create a new device document in Firestore with auto-generated serial number."""
db = get_db()
# Generate unique serial number
serial_number = _ensure_unique_serial(db)
doc_data = data.model_dump()
doc_data["device_id"] = serial_number
_, doc_ref = db.collection(COLLECTION).add(doc_data)
return DeviceInDB(id=doc_ref.id, **doc_data)
def _deep_merge(base: dict, overrides: dict) -> dict:
"""Recursively merge overrides into base, preserving unmentioned nested keys."""
result = dict(base)
for k, v in overrides.items():
if isinstance(v, dict) and isinstance(result.get(k), dict):
result[k] = _deep_merge(result[k], v)
else:
result[k] = v
return result
def update_device(device_doc_id: str, data: DeviceUpdate) -> DeviceInDB:
"""Update an existing device document. Only provided fields are updated."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(device_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Device")
update_data = data.model_dump(exclude_none=True)
# Deep-merge nested structs so unmentioned sub-fields are preserved
existing = doc.to_dict()
nested_keys = (
"device_attributes", "device_subscription", "device_stats",
)
for key in nested_keys:
if key in update_data and isinstance(existing.get(key), dict):
update_data[key] = _deep_merge(existing[key], update_data[key])
update_data = _restore_timestamps(update_data)
doc_ref.update(update_data)
updated_doc = doc_ref.get()
return _doc_to_device(updated_doc)
def get_device_users(device_doc_id: str) -> list[dict]:
"""Get users assigned to a device from the device_users sub-collection.
Falls back to the user_list field on the device document if the
sub-collection is empty.
"""
db = get_db()
doc_ref = db.collection(COLLECTION).document(device_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Device")
# Try sub-collection first
sub_docs = list(doc_ref.collection("device_users").stream())
users = []
if sub_docs:
for sub_doc in sub_docs:
sub_data = sub_doc.to_dict()
role = sub_data.get("role", "")
user_ref = sub_data.get("user_reference")
# Resolve user reference
user_id = ""
display_name = ""
email = ""
photo_url = ""
if isinstance(user_ref, DocumentReference):
try:
user_doc = user_ref.get()
if user_doc.exists:
user_data = user_doc.to_dict()
user_id = user_doc.id
display_name = user_data.get("display_name", "")
email = user_data.get("email", "")
photo_url = user_data.get("photo_url", "")
except Exception as e:
print(f"[devices] Error resolving user reference: {e}")
continue
elif isinstance(user_ref, str):
# String path like "users/abc123"
try:
ref_doc = db.document(user_ref).get()
if ref_doc.exists:
user_data = ref_doc.to_dict()
user_id = ref_doc.id
display_name = user_data.get("display_name", "")
email = user_data.get("email", "")
photo_url = user_data.get("photo_url", "")
except Exception as e:
print(f"[devices] Error resolving user path: {e}")
continue
users.append({
"user_id": user_id,
"display_name": display_name,
"email": email,
"role": role,
"photo_url": photo_url,
})
else:
# Fallback to user_list field
device_data = doc.to_dict()
user_list = device_data.get("user_list", [])
for entry in user_list:
try:
if isinstance(entry, DocumentReference):
user_doc = entry.get()
elif isinstance(entry, str) and entry.strip():
# Could be a path like "users/abc" or a raw doc ID
if "/" in entry:
user_doc = db.document(entry).get()
else:
user_doc = db.collection("users").document(entry).get()
else:
continue
if user_doc.exists:
user_data = user_doc.to_dict()
users.append({
"user_id": user_doc.id,
"display_name": user_data.get("display_name", ""),
"email": user_data.get("email", ""),
"role": "",
"photo_url": user_data.get("photo_url", ""),
})
except Exception as e:
print(f"[devices] Error resolving user_list entry: {e}")
return users
def delete_device(device_doc_id: str) -> None:
"""Delete a device document from Firestore."""
db = get_db()
doc_ref = db.collection(COLLECTION).document(device_doc_id)
doc = doc_ref.get()
if not doc.exists:
raise NotFoundError("Device")
doc_ref.delete()