""" SSE stream endpoint — one long-lived GET per connected phone. Authentication: token passed as query param ?token= (EventSource API in browsers cannot set custom headers, so query param is the standard pattern.) The client receives a stream of JSON lines: data: {"type": "...", "data": {...}}\n\n A keepalive comment (": ping") is sent every 25 seconds to prevent proxy timeouts. """ import asyncio from fastapi import APIRouter, Query from fastapi.responses import StreamingResponse from routers.deps import decode_token from services.sse_bus import subscribe, unsubscribe router = APIRouter() KEEPALIVE_INTERVAL = 25 # seconds async def _event_stream(user_id: int): q = await subscribe(user_id) try: while True: try: payload = await asyncio.wait_for(q.get(), timeout=KEEPALIVE_INTERVAL) yield f"data: {payload}\n\n" except asyncio.TimeoutError: # keepalive — prevents nginx/proxies from closing idle connections yield ": ping\n\n" except asyncio.CancelledError: pass finally: await unsubscribe(user_id, q) @router.get("/stream") async def sse_stream(token: str = Query(...)): """ Open an SSE stream for the authenticated user. The phone connects once on login and stays connected. On reconnect (after network drop) it does a full GET first, then reconnects here. """ # decode_token raises HTTPException on invalid/expired — no manual check needed payload = decode_token(token) user_id: int = int(payload["sub"]) return StreamingResponse( _event_stream(user_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # disable nginx buffering "Connection": "keep-alive", }, )