Files

85 lines
2.9 KiB
Python

"""
SSE Event Bus — in-memory broadcaster for Server-Sent Events.
All routers import `broadcast_sync()` to push events from sync routes.
The SSE endpoint imports `subscribe()` / `unsubscribe()` to manage per-client queues.
Event shape (JSON-serialisable dict):
{ "type": "<event_type>", "data": { ... } }
Supported event types:
order_updated — order created / item added / transferred / merged
order_paid — items paid on an order
order_closed — order closed or cancelled
table_list_changed — table added/removed
table_flags_changed — flags set/cleared on a table
message_sent — new staff message (targeted or broadcast)
shift_changed — shift started / ended by manager
business_day_changed — business day opened / closed
"""
import asyncio
import json
from typing import Dict, Set
# Captured once at startup by init_loop() called from lifespan.
# Sync route threads use this to schedule coroutines safely.
_main_loop: asyncio.AbstractEventLoop | None = None
# waiter_id → set of asyncio.Queue (one per SSE connection for that user)
_queues: Dict[int, Set[asyncio.Queue]] = {}
def init_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Call once from the FastAPI lifespan (async context) to capture the event loop."""
global _main_loop
_main_loop = loop
async def subscribe(user_id: int) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=256)
if user_id not in _queues:
_queues[user_id] = set()
_queues[user_id].add(q)
return q
async def unsubscribe(user_id: int, q: asyncio.Queue) -> None:
if user_id in _queues:
_queues[user_id].discard(q)
if not _queues[user_id]:
del _queues[user_id]
def broadcast_sync(event_type: str, data: dict, *, user_ids: list[int] | None = None) -> None:
"""
Fire-and-forget broadcast from a synchronous FastAPI route (thread-pool worker).
Uses call_soon_threadsafe so the coroutine runs on the main event loop, not the thread.
"""
if _main_loop is None:
return
_main_loop.call_soon_threadsafe(
_main_loop.create_task,
broadcast(event_type, data, user_ids=user_ids),
)
async def broadcast(event_type: str, data: dict, *, user_ids: list[int] | None = None) -> None:
"""
Push an event to connected clients.
user_ids=None → broadcast to ALL connected users
user_ids=[...] → send only to those specific user IDs
"""
payload = json.dumps({"type": event_type, "data": data})
targets = (
{uid: qs for uid, qs in _queues.items() if uid in user_ids}
if user_ids is not None
else dict(_queues)
)
for qs in targets.values():
for q in list(qs):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
pass # slow client — drop rather than block