Includes all work to date: - local_backend: FastAPI backend with products, orders, tables, shifts, cloud sync - manager_dashboard: React manager UI with product/category management, reports, settings - waiter_pwa: React PWA for waiter devices - Category reparent endpoint and UI - Waiter domain: local_ip sent on heartbeat, waiter_domain persisted from cloud response - QR code modal in AppInfoTab for waiter domain - Product form: number input spinners removed, category pre-selected on new product - Category row: count badge moved to far right Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
85 lines
2.9 KiB
Python
85 lines
2.9 KiB
Python
"""
|
|
SSE Event Bus — in-memory broadcaster for Server-Sent Events.
|
|
|
|
All routers import `broadcast_sync()` to push events from sync routes.
|
|
The SSE endpoint imports `subscribe()` / `unsubscribe()` to manage per-client queues.
|
|
|
|
Event shape (JSON-serialisable dict):
|
|
{ "type": "<event_type>", "data": { ... } }
|
|
|
|
Supported event types:
|
|
order_updated — order created / item added / transferred / merged
|
|
order_paid — items paid on an order
|
|
order_closed — order closed or cancelled
|
|
table_list_changed — table added/removed
|
|
table_flags_changed — flags set/cleared on a table
|
|
message_sent — new staff message (targeted or broadcast)
|
|
shift_changed — shift started / ended by manager
|
|
business_day_changed — business day opened / closed
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from typing import Dict, Set
|
|
|
|
# Captured once at startup by init_loop() called from lifespan.
|
|
# Sync route threads use this to schedule coroutines safely.
|
|
_main_loop: asyncio.AbstractEventLoop | None = None
|
|
|
|
# waiter_id → set of asyncio.Queue (one per SSE connection for that user)
|
|
_queues: Dict[int, Set[asyncio.Queue]] = {}
|
|
|
|
|
|
def init_loop(loop: asyncio.AbstractEventLoop) -> None:
|
|
"""Call once from the FastAPI lifespan (async context) to capture the event loop."""
|
|
global _main_loop
|
|
_main_loop = loop
|
|
|
|
|
|
async def subscribe(user_id: int) -> asyncio.Queue:
|
|
q: asyncio.Queue = asyncio.Queue(maxsize=256)
|
|
if user_id not in _queues:
|
|
_queues[user_id] = set()
|
|
_queues[user_id].add(q)
|
|
return q
|
|
|
|
|
|
async def unsubscribe(user_id: int, q: asyncio.Queue) -> None:
|
|
if user_id in _queues:
|
|
_queues[user_id].discard(q)
|
|
if not _queues[user_id]:
|
|
del _queues[user_id]
|
|
|
|
|
|
def broadcast_sync(event_type: str, data: dict, *, user_ids: list[int] | None = None) -> None:
|
|
"""
|
|
Fire-and-forget broadcast from a synchronous FastAPI route (thread-pool worker).
|
|
Uses call_soon_threadsafe so the coroutine runs on the main event loop, not the thread.
|
|
"""
|
|
if _main_loop is None:
|
|
return
|
|
_main_loop.call_soon_threadsafe(
|
|
_main_loop.create_task,
|
|
broadcast(event_type, data, user_ids=user_ids),
|
|
)
|
|
|
|
|
|
async def broadcast(event_type: str, data: dict, *, user_ids: list[int] | None = None) -> None:
|
|
"""
|
|
Push an event to connected clients.
|
|
user_ids=None → broadcast to ALL connected users
|
|
user_ids=[...] → send only to those specific user IDs
|
|
"""
|
|
payload = json.dumps({"type": event_type, "data": data})
|
|
targets = (
|
|
{uid: qs for uid, qs in _queues.items() if uid in user_ids}
|
|
if user_ids is not None
|
|
else dict(_queues)
|
|
)
|
|
for qs in targets.values():
|
|
for q in list(qs):
|
|
try:
|
|
q.put_nowait(payload)
|
|
except asyncio.QueueFull:
|
|
pass # slow client — drop rather than block
|