303 lines
10 KiB
Python
303 lines
10 KiB
Python
import secrets
|
|
import string
|
|
|
|
from datetime import datetime
|
|
from google.cloud.firestore_v1 import GeoPoint, DocumentReference
|
|
|
|
from shared.firebase import get_db
|
|
from shared.exceptions import NotFoundError
|
|
from devices.models import DeviceCreate, DeviceUpdate, DeviceInDB
|
|
|
|
COLLECTION = "devices"
|
|
|
|
# Serial number format: BS-XXXX-XXXX (uppercase alphanumeric)
|
|
SN_CHARS = string.ascii_uppercase + string.digits
|
|
SN_SEGMENT_LEN = 4
|
|
|
|
# Clock/silence/backlight fields stored as Firestore Timestamps (written as datetime)
|
|
_TIMESTAMP_FIELD_NAMES = {
|
|
"daySilenceFrom", "daySilenceTo",
|
|
"nightSilenceFrom", "nightSilenceTo",
|
|
"backlightTurnOnTime", "backlightTurnOffTime",
|
|
}
|
|
|
|
|
|
def _restore_timestamps(d: dict) -> dict:
|
|
"""Recursively convert ISO 8601 strings for known timestamp fields to datetime objects.
|
|
|
|
Firestore stores Python datetime objects as native Timestamps, which Flutter
|
|
reads as DateTime. Plain strings would break the Flutter app.
|
|
"""
|
|
result = {}
|
|
for k, v in d.items():
|
|
if isinstance(v, dict):
|
|
result[k] = _restore_timestamps(v)
|
|
elif isinstance(v, str) and k in _TIMESTAMP_FIELD_NAMES:
|
|
try:
|
|
result[k] = datetime.fromisoformat(v.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
result[k] = v
|
|
else:
|
|
result[k] = v
|
|
return result
|
|
|
|
|
|
def _generate_serial_number() -> str:
|
|
"""Generate a unique serial number in the format BS-XXXX-XXXX."""
|
|
seg1 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN))
|
|
seg2 = "".join(secrets.choice(SN_CHARS) for _ in range(SN_SEGMENT_LEN))
|
|
return f"BS-{seg1}-{seg2}"
|
|
|
|
|
|
def _ensure_unique_serial(db) -> str:
|
|
"""Generate a serial number and verify it doesn't already exist in Firestore."""
|
|
existing_sns = set()
|
|
for doc in db.collection(COLLECTION).select(["device_id"]).stream():
|
|
data = doc.to_dict()
|
|
if data.get("device_id"):
|
|
existing_sns.add(data["device_id"])
|
|
|
|
for _ in range(100): # safety limit
|
|
sn = _generate_serial_number()
|
|
if sn not in existing_sns:
|
|
return sn
|
|
|
|
raise RuntimeError("Could not generate a unique serial number after 100 attempts")
|
|
|
|
|
|
def _convert_firestore_value(val):
|
|
"""Convert Firestore-specific types (Timestamp, GeoPoint, DocumentReference) to strings."""
|
|
if isinstance(val, datetime):
|
|
# Firestore DatetimeWithNanoseconds is a datetime subclass
|
|
return val.strftime("%d %B %Y at %H:%M:%S UTC%z")
|
|
if isinstance(val, GeoPoint):
|
|
return f"{val.latitude}° N, {val.longitude}° E"
|
|
if isinstance(val, DocumentReference):
|
|
# Store the document path (e.g. "users/abc123")
|
|
return val.path
|
|
return val
|
|
|
|
|
|
def _sanitize_dict(d: dict) -> dict:
|
|
"""Recursively convert Firestore-native types in a dict to plain strings."""
|
|
result = {}
|
|
for k, v in d.items():
|
|
if isinstance(v, dict):
|
|
result[k] = _sanitize_dict(v)
|
|
elif isinstance(v, list):
|
|
result[k] = [
|
|
_sanitize_dict(item) if isinstance(item, dict)
|
|
else _convert_firestore_value(item)
|
|
for item in v
|
|
]
|
|
else:
|
|
result[k] = _convert_firestore_value(v)
|
|
return result
|
|
|
|
|
|
def _doc_to_device(doc) -> DeviceInDB:
|
|
"""Convert a Firestore document snapshot to a DeviceInDB model."""
|
|
data = _sanitize_dict(doc.to_dict())
|
|
return DeviceInDB(id=doc.id, **data)
|
|
|
|
|
|
def list_devices(
|
|
search: str | None = None,
|
|
online_only: bool | None = None,
|
|
subscription_tier: str | None = None,
|
|
) -> list[DeviceInDB]:
|
|
"""List devices with optional filters."""
|
|
db = get_db()
|
|
ref = db.collection(COLLECTION)
|
|
query = ref
|
|
|
|
if subscription_tier:
|
|
query = query.where("device_subscription.subscrTier", "==", subscription_tier)
|
|
|
|
docs = query.stream()
|
|
results = []
|
|
|
|
for doc in docs:
|
|
device = _doc_to_device(doc)
|
|
|
|
# Client-side filters
|
|
if online_only is not None and device.is_Online != online_only:
|
|
continue
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
name_match = search_lower in (device.device_name or "").lower()
|
|
location_match = search_lower in (device.device_location or "").lower()
|
|
sn_match = search_lower in (device.device_id or "").lower()
|
|
if not (name_match or location_match or sn_match):
|
|
continue
|
|
|
|
results.append(device)
|
|
|
|
return results
|
|
|
|
|
|
def get_device(device_doc_id: str) -> DeviceInDB:
|
|
"""Get a single device by Firestore document ID."""
|
|
db = get_db()
|
|
doc = db.collection(COLLECTION).document(device_doc_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Device")
|
|
return _doc_to_device(doc)
|
|
|
|
|
|
def create_device(data: DeviceCreate) -> DeviceInDB:
|
|
"""Create a new device document in Firestore with auto-generated serial number."""
|
|
db = get_db()
|
|
|
|
# Generate unique serial number
|
|
serial_number = _ensure_unique_serial(db)
|
|
|
|
doc_data = data.model_dump()
|
|
doc_data["device_id"] = serial_number
|
|
|
|
_, doc_ref = db.collection(COLLECTION).add(doc_data)
|
|
|
|
return DeviceInDB(id=doc_ref.id, **doc_data)
|
|
|
|
|
|
def _deep_merge(base: dict, overrides: dict) -> dict:
|
|
"""Recursively merge overrides into base, preserving unmentioned nested keys."""
|
|
result = dict(base)
|
|
for k, v in overrides.items():
|
|
if isinstance(v, dict) and isinstance(result.get(k), dict):
|
|
result[k] = _deep_merge(result[k], v)
|
|
else:
|
|
result[k] = v
|
|
return result
|
|
|
|
|
|
def update_device(device_doc_id: str, data: DeviceUpdate) -> DeviceInDB:
|
|
"""Update an existing device document. Only provided fields are updated."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(device_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Device")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
|
|
# Deep-merge nested structs so unmentioned sub-fields are preserved
|
|
existing = doc.to_dict()
|
|
nested_keys = (
|
|
"device_attributes", "device_subscription", "device_stats",
|
|
)
|
|
for key in nested_keys:
|
|
if key in update_data and isinstance(existing.get(key), dict):
|
|
update_data[key] = _deep_merge(existing[key], update_data[key])
|
|
|
|
update_data = _restore_timestamps(update_data)
|
|
doc_ref.update(update_data)
|
|
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_device(updated_doc)
|
|
|
|
|
|
def get_device_users(device_doc_id: str) -> list[dict]:
|
|
"""Get users assigned to a device from the device_users sub-collection.
|
|
|
|
Falls back to the user_list field on the device document if the
|
|
sub-collection is empty.
|
|
"""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(device_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Device")
|
|
|
|
# Try sub-collection first
|
|
sub_docs = list(doc_ref.collection("device_users").stream())
|
|
users = []
|
|
|
|
if sub_docs:
|
|
for sub_doc in sub_docs:
|
|
sub_data = sub_doc.to_dict()
|
|
role = sub_data.get("role", "")
|
|
user_ref = sub_data.get("user_reference")
|
|
|
|
# Resolve user reference
|
|
user_id = ""
|
|
display_name = ""
|
|
email = ""
|
|
photo_url = ""
|
|
|
|
if isinstance(user_ref, DocumentReference):
|
|
try:
|
|
user_doc = user_ref.get()
|
|
if user_doc.exists:
|
|
user_data = user_doc.to_dict()
|
|
user_id = user_doc.id
|
|
display_name = user_data.get("display_name", "")
|
|
email = user_data.get("email", "")
|
|
photo_url = user_data.get("photo_url", "")
|
|
except Exception as e:
|
|
print(f"[devices] Error resolving user reference: {e}")
|
|
continue
|
|
elif isinstance(user_ref, str):
|
|
# String path like "users/abc123"
|
|
try:
|
|
ref_doc = db.document(user_ref).get()
|
|
if ref_doc.exists:
|
|
user_data = ref_doc.to_dict()
|
|
user_id = ref_doc.id
|
|
display_name = user_data.get("display_name", "")
|
|
email = user_data.get("email", "")
|
|
photo_url = user_data.get("photo_url", "")
|
|
except Exception as e:
|
|
print(f"[devices] Error resolving user path: {e}")
|
|
continue
|
|
|
|
users.append({
|
|
"user_id": user_id,
|
|
"display_name": display_name,
|
|
"email": email,
|
|
"role": role,
|
|
"photo_url": photo_url,
|
|
})
|
|
else:
|
|
# Fallback to user_list field
|
|
device_data = doc.to_dict()
|
|
user_list = device_data.get("user_list", [])
|
|
for entry in user_list:
|
|
try:
|
|
if isinstance(entry, DocumentReference):
|
|
user_doc = entry.get()
|
|
elif isinstance(entry, str) and entry.strip():
|
|
# Could be a path like "users/abc" or a raw doc ID
|
|
if "/" in entry:
|
|
user_doc = db.document(entry).get()
|
|
else:
|
|
user_doc = db.collection("users").document(entry).get()
|
|
else:
|
|
continue
|
|
|
|
if user_doc.exists:
|
|
user_data = user_doc.to_dict()
|
|
users.append({
|
|
"user_id": user_doc.id,
|
|
"display_name": user_data.get("display_name", ""),
|
|
"email": user_data.get("email", ""),
|
|
"role": "",
|
|
"photo_url": user_data.get("photo_url", ""),
|
|
})
|
|
except Exception as e:
|
|
print(f"[devices] Error resolving user_list entry: {e}")
|
|
|
|
return users
|
|
|
|
|
|
def delete_device(device_doc_id: str) -> None:
|
|
"""Delete a device document from Firestore."""
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(device_doc_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Device")
|
|
|
|
doc_ref.delete()
|