Waiter PWA fixes, and extra feautures. Also added Emergency Mode, search etc

This commit is contained in:
2026-05-02 21:08:53 +03:00
parent 8e27b7666e
commit c9ad78ec71
50 changed files with 4441 additions and 643 deletions

View File

@@ -26,6 +26,7 @@ from routers import shifts as shifts_router
from routers import settings as settings_router
from routers import flags as flags_router
from routers import messages as messages_router
from routers import sse as sse_router
def _run_migrations():
@@ -111,10 +112,13 @@ def _run_migrations():
name VARCHAR NOT NULL,
emoji VARCHAR,
color VARCHAR DEFAULT '#6b7280',
text_color VARCHAR DEFAULT NULL,
sort_order INTEGER NOT NULL DEFAULT 0,
is_active INTEGER NOT NULL DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)""",
# Migration: add text_color if upgrading from older schema
"ALTER TABLE table_flag_defs ADD COLUMN text_color VARCHAR DEFAULT NULL",
"""CREATE TABLE IF NOT EXISTS table_flag_assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_id INTEGER NOT NULL REFERENCES tables(id),
@@ -181,6 +185,21 @@ def _run_migrations():
"ALTER TABLE printers ADD COLUMN protocol VARCHAR NOT NULL DEFAULT 'escpos_tcp'",
# Compact (half-width) display flag for quick options
"ALTER TABLE product_quick_options ADD COLUMN is_compact INTEGER NOT NULL DEFAULT 0",
# Print layout + per-type font settings
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.ticket_mode', 'detailed', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_order_number', '48:1:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_meta', '0:0:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_item_name', '16:1:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_quick', '0:0:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_pref', '0:0:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_extra', '0:0:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_ingredient', '0:0:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_item_note', '0:0:0', CURRENT_TIMESTAMP)",
"INSERT OR IGNORE INTO pos_settings (key, value, updated_at) VALUES ('print.font_order_note', '0:1:0', CURRENT_TIMESTAMP)",
# Offline/emergency payment tracking
"ALTER TABLE order_audit_log ADD COLUMN offline_uuid VARCHAR",
"ALTER TABLE order_audit_log ADD COLUMN offline_at VARCHAR",
"ALTER TABLE order_audit_log ADD COLUMN is_duplicate INTEGER NOT NULL DEFAULT 0",
]
for sql in migrations:
try:
@@ -193,6 +212,9 @@ def _run_migrations():
@asynccontextmanager
async def lifespan(app: FastAPI):
import asyncio
from services.sse_bus import init_loop
init_loop(asyncio.get_running_loop())
Base.metadata.create_all(bind=engine)
_run_migrations()
sync_task = await start_cloud_sync()
@@ -232,3 +254,4 @@ app.include_router(shifts_router.router, prefix="/api/shifts", tag
app.include_router(settings_router.router, prefix="/api/settings", tags=["settings"])
app.include_router(flags_router.router, prefix="/api/flags", tags=["flags"])
app.include_router(messages_router.router, prefix="/api/messages", tags=["messages"])
app.include_router(sse_router.router, prefix="/api/sse", tags=["sse"])

View File

@@ -15,7 +15,8 @@ class TableFlagDef(Base):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
emoji = Column(String, nullable=True)
color = Column(String, nullable=True, default="#6b7280") # hex
color = Column(String, nullable=True, default="#6b7280") # hex background
text_color = Column(String, nullable=True, default=None) # hex text; None = white
sort_order = Column(Integer, default=0, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime(timezone=True), default=_utcnow)

View File

@@ -93,13 +93,17 @@ class OrderAuditLog(Base):
id = Column(Integer, primary_key=True, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False)
event_type = Column(String, nullable=False)
# ORDER_OPENED | ITEMS_ADDED | PAYMENT | ORDER_CLOSED | ORDER_CANCELLED | ITEM_CANCELLED
# ORDER_OPENED | ITEMS_ADDED | PAYMENT | PAYMENT_OFFLINE | ORDER_CLOSED | ORDER_CANCELLED | ITEM_CANCELLED
waiter_id = Column(Integer, ForeignKey("users.id"), nullable=True)
item_ids = Column(Text, nullable=True) # JSON list of OrderItem ids (for ITEMS_ADDED, PAYMENT, ITEM_CANCELLED)
item_ids = Column(Text, nullable=True) # JSON list of OrderItem ids
amount = Column(Float, nullable=True) # total value for PAYMENT events
payment_method = Column(String, nullable=True)
note = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), default=_utcnow)
# Emergency offline payment fields
offline_uuid = Column(String, nullable=True) # client-generated UUID for dedup
offline_at = Column(String, nullable=True) # ISO timestamp from client
is_duplicate = Column(Integer, nullable=False, default=0) # 1 = duplicate payment flagged
order = relationship("Order", back_populates="audit_logs")
waiter = relationship("User")

View File

@@ -0,0 +1,137 @@
"""
Font size comparison test — Jolimark TP850UE
Usage: python print_size_test.py [IP] [PORT]
Default: 10.98.20.25:9100
Prints a single page showing all available size options side by side,
to help decide which sizes to expose in the settings UI.
Hardware facts:
ESC ! (0x1B 0x21 n):
0x10 = double-height only (tall + narrow — breaks aspect ratio)
0x20 = double-width only (short + wide — breaks aspect ratio)
0x30 = double-height + double-width (2x in both axes — correct aspect ratio)
There is NO 1.5x in ESC/POS hardware.
GS ! (0x1D 0x21 n) can go 3x, 4x … 8x but they are extremely large.
"""
import sys
PRINTER_IP = sys.argv[1] if len(sys.argv) > 1 else "10.98.20.25"
PRINTER_PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 9100
try:
from escpos.printer import Network
except ImportError:
print("escpos not installed. Run: pip install python-escpos")
sys.exit(1)
def gr(text):
return text.encode('cp737', errors='replace')
def raw(p, b):
p._raw(b)
def section(p, title):
raw(p, b'\x1b\x21\x00')
raw(p, b'\x1b\x45\x00')
raw(p, b'\x1b\x61\x01')
p._raw(gr(f"--- {title} ---\n"))
raw(p, b'\x1b\x61\x00')
def print_sample(p, esc_bang, gs_size, label_en, label_gr):
"""Print one size sample with label."""
# Label at normal size
raw(p, b'\x1b\x21\x00')
raw(p, b'\x1b\x45\x00')
p._raw(gr(f"{label_en}:\n"))
# Apply size via ESC ! and/or GS !
if gs_size is not None:
raw(p, bytes([0x1d, 0x21, gs_size]))
raw(p, bytes([0x1b, 0x21, esc_bang]))
p._raw(gr(f"Club Sandwich. x1\n"))
p._raw(gr(f"* Χωρις αλατι\n"))
p._raw(gr(f"+ Extra Bacon x2\n"))
# Reset
raw(p, b'\x1d\x21\x00')
raw(p, b'\x1b\x21\x00')
raw(p, b'\n')
def divider(p):
raw(p, b'\x1b\x21\x00')
p._raw(gr("-" * 48 + "\n"))
print(f"Connecting to {PRINTER_IP}:{PRINTER_PORT}...")
p = Network(PRINTER_IP, PRINTER_PORT, timeout=10)
raw(p, b'\x1b\x40') # ESC @ reset
raw(p, b'\x1b\x74\x1d') # CP737 Greek
raw(p, b'\x1b\x61\x01')
raw(p, b'\x1b\x21\x30')
raw(p, b'\x1b\x45\x01')
p._raw(gr("SIZE COMPARISON TEST\n"))
raw(p, b'\x1b\x21\x00')
raw(p, b'\x1b\x45\x00')
raw(p, b'\x1b\x61\x00')
p._raw(gr("Which sizes look good for ticket printing?\n\n"))
# ── Section 1: The two aspect-ratio-correct options ───────────────────────
section(p, "CORRECT ASPECT RATIO")
p._raw(gr("\n"))
print_sample(p,
esc_bang=0x00, gs_size=None,
label_en="[1] SMALL (1x1 — normal)",
label_gr="")
print_sample(p,
esc_bang=0x30, gs_size=None,
label_en="[2] LARGE (2x2 — double height+width)",
label_gr="")
# ── Section 2: The broken single-axis options (for comparison) ────────────
divider(p)
section(p, "BROKEN ASPECT RATIO (for comparison)")
p._raw(gr("These scale only ONE axis — shown so\nyou can confirm they look wrong.\n\n"))
print_sample(p,
esc_bang=0x10, gs_size=None,
label_en="[3] Tall only (2x height, 1x width)",
label_gr="")
print_sample(p,
esc_bang=0x20, gs_size=None,
label_en="[4] Wide only (1x height, 2x width)",
label_gr="")
# ── Section 3: GS ! options — 3x and beyond ──────────────────────────────
divider(p)
section(p, "GS! LARGER SIZES (3x3, 4x4)")
p._raw(gr("These are technically available but\nvery large. Shown for completeness.\n\n"))
print_sample(p,
esc_bang=0x00, gs_size=0x22,
label_en="[5] GS! 3x3",
label_gr="")
print_sample(p,
esc_bang=0x00, gs_size=0x33,
label_en="[6] GS! 4x4",
label_gr="")
# ── Conclusion ────────────────────────────────────────────────────────────
divider(p)
raw(p, b'\x1b\x61\x01')
raw(p, b'\x1b\x21\x00')
p._raw(gr("CONCLUSION:\n"))
p._raw(gr("[1] Small = use for modifiers/notes\n"))
p._raw(gr("[2] Large = use for item names/headers\n"))
p._raw(gr("No true 1.5x exists in hardware.\n"))
p._raw(gr("GS! 3x3/4x4 available if desired.\n"))
raw(p, b'\n\n\n')
p.cut()
p.close()
print("Done.")

View File

@@ -7,6 +7,7 @@ from models.flag import TableFlagDef, TableFlagAssignment
from schemas.flag import FlagDefCreate, FlagDefUpdate, FlagDefOut, FlagAssignmentOut, SetTableFlagsRequest
from routers.deps import get_current_user, require_manager
from models.user import User
from services.sse_bus import broadcast_sync
router = APIRouter()
@@ -124,9 +125,11 @@ def set_table_flags(
))
db.commit()
return db.query(TableFlagAssignment).filter(
result = db.query(TableFlagAssignment).filter(
TableFlagAssignment.table_id == table_id
).all()
broadcast_sync("table_flags_changed", {"table_id": table_id, "flag_ids": body.flag_ids})
return result
@router.delete("/table/{table_id}/all", status_code=status.HTTP_204_NO_CONTENT)
@@ -139,3 +142,4 @@ def clear_table_flags(
TableFlagAssignment.table_id == table_id
).delete(synchronize_session=False)
db.commit()
broadcast_sync("table_flags_changed", {"table_id": table_id, "flag_ids": []})

View File

@@ -11,6 +11,7 @@ from schemas.message import (
QuickTemplateCreate, QuickTemplateUpdate, QuickTemplateOut,
)
from routers.deps import get_current_user, require_manager
from services.sse_bus import broadcast_sync
router = APIRouter()
@@ -113,7 +114,22 @@ def send_message(
db.add(msg)
db.commit()
msg = _load_msg(db, msg.id)
return _message_out(msg)
out = _message_out(msg)
# Broadcast to targeted users (empty list = all connected users)
target_ids = body.target_waiter_ids if body.target_waiter_ids else None
broadcast_sync(
"message_sent",
{
"id": out.id,
"sender_id": out.sender_id,
"sender_name": out.sender_name,
"body": out.body,
"table_ids": out.table_ids,
"created_at": out.created_at.isoformat() if out.created_at else None,
},
user_ids=target_ids,
)
return out
@router.get("/unread", response_model=List[StaffMessageOut])

View File

@@ -9,7 +9,7 @@ from models.order import Order, OrderItem, OrderWaiter, OrderAuditLog
from models.user import User, WaiterZone
from models.table import Table
from models.product import Product
from schemas.order import OrderCreate, OrderOut, OrderItemOut, AddItemsRequest, AddItemsResponse, PayItemsRequest, AssignWaiterRequest, OrderWaiterOut
from schemas.order import OrderCreate, OrderOut, OrderItemOut, AddItemsRequest, AddItemsResponse, PayItemsRequest, OfflinePaymentRequest, AssignWaiterRequest, OrderWaiterOut
from pydantic import BaseModel
class PrintOrderRequest(BaseModel):
@@ -33,6 +33,7 @@ class MoveItemsRequest(BaseModel):
from routers.deps import get_current_user, require_manager
from services.printer_service import route_and_print, route_and_print_sync, print_order_receipt, print_order_synopsis
from services.sse_bus import broadcast_sync
router = APIRouter()
@@ -159,6 +160,7 @@ def open_order(body: OrderCreate, db: Session = Depends(get_db), user: User = De
_audit(db, order.id, "ORDER_OPENED", waiter_id=user.id)
db.commit()
db.refresh(order)
broadcast_sync("order_updated", {"order_id": order.id, "table_id": order.table_id, "status": order.status, "action": "opened"})
return order
@@ -209,7 +211,7 @@ def add_items(
db.refresh(order)
print_results = route_and_print_sync(order_id, new_item_ids, db)
broadcast_sync("order_updated", {"order_id": order.id, "table_id": order.table_id, "status": order.status, "action": "items_added", "item_ids": new_item_ids})
return {"order": order, "print_results": print_results}
@@ -295,6 +297,7 @@ def pay_items(order_id: int, body: PayItemsRequest, db: Session = Depends(get_db
_audit(db, order_id, "PAYMENT", waiter_id=user.id, item_ids=paid_ids,
amount=total_paid, payment_method=body.payment_method)
db.commit()
broadcast_sync("order_paid", {"order_id": order_id, "table_id": order.table_id, "status": order.status, "paid_item_ids": paid_ids, "amount": total_paid, "payment_method": body.payment_method})
return {"status": order.status, "paid_item_ids": paid_ids}
@@ -312,9 +315,105 @@ def close_order(order_id: int, db: Session = Depends(get_db), user: User = Depen
order.closed_by = user.id
_audit(db, order_id, "ORDER_CLOSED", waiter_id=user.id)
db.commit()
broadcast_sync("order_closed", {"order_id": order_id, "table_id": order.table_id})
return {"status": "closed"}
@router.post("/{order_id}/pay-offline")
def pay_items_offline(
order_id: int,
body: OfflinePaymentRequest,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""
Sync an emergency payment that was taken while the server was offline.
The UUID prevents double-processing. If a payment with the same UUID already
exists on this order, the duplicate is logged in red (is_duplicate=1) rather
than silently dropped — so managers can reconcile.
"""
order = db.query(Order).filter(Order.id == order_id).first()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
if not _can_access_order(order, user, db):
raise HTTPException(status_code=403, detail="Access denied")
# Check for duplicate UUID on this order
existing_uuid = db.query(OrderAuditLog).filter(
OrderAuditLog.order_id == order_id,
OrderAuditLog.offline_uuid == body.uuid,
).first()
is_duplicate = existing_uuid is not None
from models.shift import WaiterShift
items = db.query(OrderItem).filter(
OrderItem.id.in_(body.item_ids),
OrderItem.order_id == order_id,
OrderItem.status == "active",
).all()
# Reject empty payments — client had no offline snapshot for this table
if not items and not is_duplicate:
raise HTTPException(status_code=400, detail="No active items found — payment rejected")
# Use the client-recorded offline timestamp as paid_at so audit reflects real payment time
try:
paid_at = datetime.fromisoformat(body.offline_at.replace("Z", "+00:00")) if body.offline_at else datetime.now(timezone.utc)
except (ValueError, AttributeError):
paid_at = datetime.now(timezone.utc)
active_shift = db.query(WaiterShift).filter(
WaiterShift.waiter_id == user.id,
WaiterShift.ended_at == None,
).first()
total_paid = 0.0
paid_ids = []
if not is_duplicate:
for item in items:
item.status = "paid"
item.paid_by = user.id
item.paid_at = paid_at
item.payment_method = body.payment_method
item.paid_in_shift_id = active_shift.id if active_shift else None
total_paid += item.unit_price * item.quantity
paid_ids.append(item.id)
db.flush()
active_remaining = db.query(OrderItem).filter(
OrderItem.order_id == order_id, OrderItem.status == "active"
).count()
order.status = "paid" if active_remaining == 0 else "partially_paid"
else:
# Duplicate — compute total for audit record without changing item state
total_paid = sum(i.unit_price * i.quantity for i in items)
paid_ids = [i.id for i in items]
# Always write audit log — duplicate flag makes it visible in red in manager dashboard
db.add(OrderAuditLog(
order_id=order_id,
event_type="PAYMENT_OFFLINE",
waiter_id=user.id,
item_ids=json.dumps(paid_ids),
amount=total_paid,
payment_method=body.payment_method,
note=f"Emergency offline payment (uuid={body.uuid}){' — DUPLICATE' if is_duplicate else ''}",
offline_uuid=body.uuid,
offline_at=body.offline_at,
is_duplicate=1 if is_duplicate else 0,
))
db.commit()
if not is_duplicate:
broadcast_sync("order_paid", {"order_id": order_id, "table_id": order.table_id, "status": order.status, "paid_item_ids": paid_ids, "amount": total_paid, "payment_method": body.payment_method})
return {
"status": order.status if not is_duplicate else "duplicate",
"paid_item_ids": paid_ids,
"is_duplicate": is_duplicate,
}
@router.delete("/{order_id}", status_code=status.HTTP_204_NO_CONTENT)
def cancel_order(order_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)):
order = db.query(Order).filter(Order.id == order_id).first()
@@ -325,6 +424,7 @@ def cancel_order(order_id: int, db: Session = Depends(get_db), user: User = Depe
order.closed_by = user.id
_audit(db, order_id, "ORDER_CANCELLED", waiter_id=user.id)
db.commit()
broadcast_sync("order_closed", {"order_id": order_id, "table_id": order.table_id})
@router.put("/{order_id}/assign-waiter")
@@ -444,6 +544,7 @@ def transfer_order(
note=f"Transferred from table {old_table_id} to table {body.target_table_id}")
db.commit()
db.refresh(order)
broadcast_sync("order_updated", {"order_id": order.id, "table_id": order.table_id, "old_table_id": old_table_id, "status": order.status, "action": "transferred"})
return order
@@ -517,6 +618,8 @@ def merge_order(
db.commit()
db.refresh(target)
broadcast_sync("order_updated", {"order_id": target.id, "table_id": target.table_id, "status": target.status, "action": "merged"})
broadcast_sync("order_closed", {"order_id": source.id, "table_id": source.table_id})
return target

View File

@@ -17,13 +17,19 @@ VALID_SETTINGS = {
"system.timezone": "IANA timezone name used by the backend container (e.g. Europe/Athens). Requires container restart to take effect.",
"ui.table_colours": "JSON blob of table card colour scheme (light + dark modes) for the Waiter PWA.",
"dev.spoof_printing": "When enabled, all print jobs are silently dropped. Devices behave as if printing succeeded.",
# Print font settings — values are "SIZE:BOLD" where SIZE is ESC ! base byte (0/16/32/48) and BOLD is 0 or 1
"print.font_item_name": "Font for item name lines: SIZE:BOLD (e.g. '16:0')",
"print.font_options": "Font for option/modifier lines: SIZE:BOLD",
"print.font_table": "Font for table/waiter header lines: SIZE:BOLD",
"print.font_order_number": "Font for order number header: SIZE:BOLD",
"print.font_header": "Font for top header block: SIZE:BOLD",
# Print layout
"print.ticket_mode": "Kitchen ticket layout mode: 'detailed' or 'compact'",
"print.divider_style": "Divider character used between sections: dash, equals, star, or empty",
# Print font settings — values are "SIZE:BOLD:CAPS" where SIZE is ESC ! base byte (0/16/32/48), BOLD 0|1, CAPS 0|1
"print.font_order_number": "Font for order number header: SIZE:BOLD:CAPS",
"print.font_meta": "Font for table/waiter/time header block: SIZE:BOLD:CAPS",
"print.font_item_name": "Font for item name lines: SIZE:BOLD:CAPS",
"print.font_quick": "Font for quick option lines (* marker): SIZE:BOLD:CAPS",
"print.font_pref": "Font for preference choice lines (> marker): SIZE:BOLD:CAPS",
"print.font_extra": "Font for extra/option lines (+ marker): SIZE:BOLD:CAPS",
"print.font_ingredient": "Font for removed ingredient lines (- marker): SIZE:BOLD:CAPS",
"print.font_item_note": "Font for per-item note lines: SIZE:BOLD:CAPS",
"print.font_order_note": "Font for order-level notes: SIZE:BOLD:CAPS",
}
DEFAULTS = {
@@ -33,12 +39,17 @@ DEFAULTS = {
"system.timezone": "Europe/Athens",
"ui.table_colours": "",
"dev.spoof_printing": "false",
"print.font_item_name": "16:0", # double-height, no bold
"print.font_options": "0:0", # normal
"print.font_table": "16:0", # double-height
"print.font_order_number": "48:1", # double-height + double-width + bold
"print.font_header": "48:1", # double-height + double-width + bold
"print.ticket_mode": "detailed",
"print.divider_style": "dash",
"print.font_order_number": "48:1:0",
"print.font_meta": "0:0:0",
"print.font_item_name": "16:1:0",
"print.font_quick": "0:0:0",
"print.font_pref": "0:0:0",
"print.font_extra": "0:0:0",
"print.font_ingredient": "0:0:0",
"print.font_item_note": "0:0:0",
"print.font_order_note": "0:1:0",
}

View File

@@ -0,0 +1,60 @@
"""
SSE stream endpoint — one long-lived GET per connected phone.
Authentication: token passed as query param ?token=<jwt>
(EventSource API in browsers cannot set custom headers, so query param is the standard pattern.)
The client receives a stream of JSON lines:
data: {"type": "...", "data": {...}}\n\n
A keepalive comment (": ping") is sent every 25 seconds to prevent proxy timeouts.
"""
import asyncio
from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse
from routers.deps import decode_token
from services.sse_bus import subscribe, unsubscribe
router = APIRouter()
KEEPALIVE_INTERVAL = 25 # seconds
async def _event_stream(user_id: int):
q = await subscribe(user_id)
try:
while True:
try:
payload = await asyncio.wait_for(q.get(), timeout=KEEPALIVE_INTERVAL)
yield f"data: {payload}\n\n"
except asyncio.TimeoutError:
# keepalive — prevents nginx/proxies from closing idle connections
yield ": ping\n\n"
except asyncio.CancelledError:
pass
finally:
await unsubscribe(user_id, q)
@router.get("/stream")
async def sse_stream(token: str = Query(...)):
"""
Open an SSE stream for the authenticated user.
The phone connects once on login and stays connected.
On reconnect (after network drop) it does a full GET first, then reconnects here.
"""
# decode_token raises HTTPException on invalid/expired — no manual check needed
payload = decode_token(token)
user_id: int = int(payload["sub"])
return StreamingResponse(
_event_stream(user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # disable nginx buffering
"Connection": "keep-alive",
},
)

View File

@@ -61,6 +61,15 @@ def test_printer(printer_id: int, db: Session = Depends(get_db), user: User = De
return {"success": success, "error": error}
@router.post("/printers/test-order")
def test_order_print(printer_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = db.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found")
success, error = printer_service.send_test_order_print(printer.ip_address, printer.port, db)
return {"success": success, "error": error}
@router.put("/printers/{printer_id}", response_model=PrinterOut)
def update_printer(printer_id: int, body: PrinterUpdate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = db.query(Printer).filter(Printer.id == printer_id).first()

View File

@@ -12,6 +12,7 @@ from schemas.table import (
TableBatchCreate,
)
from routers.deps import get_current_user, require_manager
from services.sse_bus import broadcast_sync
router = APIRouter()
@@ -105,6 +106,7 @@ def create_table(body: TableCreate, db: Session = Depends(get_db), user: User =
db.add(table)
db.commit()
db.refresh(table)
broadcast_sync("table_list_changed", {"action": "created", "table_id": table.id})
return table

View File

@@ -7,6 +7,7 @@ class FlagDefCreate(BaseModel):
name: str
emoji: Optional[str] = None
color: Optional[str] = "#6b7280"
text_color: Optional[str] = None
sort_order: Optional[int] = 0
@@ -14,6 +15,7 @@ class FlagDefUpdate(BaseModel):
name: Optional[str] = None
emoji: Optional[str] = None
color: Optional[str] = None
text_color: Optional[str] = None
sort_order: Optional[int] = None
is_active: Optional[bool] = None
@@ -23,6 +25,7 @@ class FlagDefOut(BaseModel):
name: str
emoji: Optional[str] = None
color: Optional[str] = None
text_color: Optional[str] = None
sort_order: int
is_active: bool

View File

@@ -9,6 +9,9 @@ class SelectedOptionInput(BaseModel):
name: Optional[str] = None
price_delta: Optional[float] = None
extra_cost: Optional[float] = None
# type tags: "quick" | "pref" | "pref_sub" | "extra" | "extra_sub"
# Omitted by old clients — print code falls back gracefully.
type: Optional[str] = None
class OrderItemInput(BaseModel):
@@ -73,6 +76,13 @@ class PayItemsRequest(BaseModel):
payment_method: Optional[str] = None # 'cash' | 'card' | 'other' — optional for now
class OfflinePaymentRequest(BaseModel):
uuid: str # client-generated UUID, used for duplicate detection
item_ids: List[int]
payment_method: Optional[str] = None
offline_at: Optional[str] = None # ISO timestamp of when payment was taken offline
class AssignWaiterRequest(BaseModel):
waiter_id: int
@@ -93,6 +103,8 @@ class AuditLogOut(BaseModel):
payment_method: Optional[str] = None
note: Optional[str] = None
created_at: UTCDatetime
offline_at: Optional[str] = None
is_duplicate: int = 0
model_config = {"from_attributes": True}

View File

@@ -54,13 +54,32 @@ _DIVIDER_CHARS = {
"empty": "",
}
_PRINT_FONT_DEFAULTS = {
"print.font_item_name": "16:0",
"print.font_options": "0:0",
"print.font_table": "16:0",
"print.font_order_number": "48:1",
"print.font_header": "48:1",
_PRINT_SETTING_KEYS = [
"print.ticket_mode",
"print.divider_style",
"print.font_order_number",
"print.font_meta",
"print.font_item_name",
"print.font_quick",
"print.font_pref",
"print.font_extra",
"print.font_ingredient",
"print.font_item_note",
"print.font_order_note",
]
_PRINT_SETTING_DEFAULTS = {
"print.ticket_mode": "detailed",
"print.divider_style": "dash",
"print.font_order_number": "48:1:0",
"print.font_meta": "0:0:0",
"print.font_item_name": "16:1:0",
"print.font_quick": "0:0:0",
"print.font_pref": "0:0:0",
"print.font_extra": "0:0:0",
"print.font_ingredient": "0:0:0",
"print.font_item_note": "0:0:0",
"print.font_order_note": "0:1:0",
}
# SIZE byte values (ESC ! base, no bold bit):
@@ -68,27 +87,28 @@ _PRINT_FONT_DEFAULTS = {
# 16 = double-height (bit4)
# 32 = double-width (bit5)
# 48 = double-height + double-width (bits 4+5)
# Bold is applied separately via ESC E.
# Bold applied via ESC E, caps applied in software before encoding.
def _decode_font(value: str) -> tuple[int, bool]:
"""Parse 'SIZE:BOLD' string → (esc_bang_byte, bold_flag)."""
def _decode_font(value: str) -> tuple[int, bool, bool]:
"""Parse 'SIZE:BOLD:CAPS' string → (esc_bang_byte, bold_flag, caps_flag)."""
try:
parts = str(value).split(":")
size = int(parts[0])
bold = len(parts) > 1 and parts[1] == "1"
return size, bold
caps = len(parts) > 2 and parts[2] == "1"
return size, bold, caps
except (ValueError, AttributeError):
return 0, False
return 0, False, False
def _load_print_fonts(db: Session) -> dict:
def _load_print_settings(db: Session) -> dict:
rows = db.query(PosSettings).filter(
PosSettings.key.in_(_PRINT_FONT_DEFAULTS.keys())
PosSettings.key.in_(_PRINT_SETTING_KEYS)
).all()
fonts = dict(_PRINT_FONT_DEFAULTS)
settings = dict(_PRINT_SETTING_DEFAULTS)
for row in rows:
fonts[row.key] = row.value
return fonts
settings[row.key] = row.value
return settings
def _divider(p: Network, style: str = "dash"):
@@ -100,14 +120,42 @@ def _divider(p: Network, style: str = "dash"):
p._raw(b'\n')
def _item_line(name: str, qty: int) -> str:
"""Build a dot-leader line: 'Club Sandwich . . . . 1' at 48 chars."""
qty_str = str(qty)
gap = LINE_WIDTH - len(name) - len(qty_str)
if gap < 3:
return f"{name} {qty_str}"
dots = (". " * ((gap // 2) + 1))[:gap]
return f"{name}{dots}{qty_str}"
def _item_line(name: str, qty: int, line_width: int = LINE_WIDTH) -> str:
"""Build a dot-leader line ending with 'xN'.
line_width must reflect the effective width at the chosen font size
(double-width fonts halve the available char count to 24)."""
suffix = f"x{qty}"
available = line_width - len(name) - len(suffix)
if available < 2:
# Name alone is too long — put qty on same line with a single space
return f"{name} {suffix}"
dots = (". " * ((available // 2) + 1))[:available]
return f"{name}{dots}{suffix}"
def _apply_font(p: Network, size: int, bold: bool):
p._raw(bytes([0x1b, 0x21, size]))
p._raw(b'\x1b\x45\x01' if bold else b'\x1b\x45\x00')
def _reset_font(p: Network):
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x45\x00')
def _print_line(p: Network, text: str, size: int, bold: bool, caps: bool,
align: bytes = b'\x1b\x61\x00'):
"""Apply font, optionally capitalize, print text + newline, reset font."""
p._raw(align)
_apply_font(p, size, bold)
out = text.upper() if caps else text
_raw_text(p, out + "\n")
_reset_font(p)
def _greek_date(dt: datetime.datetime) -> str:
"""Return date/time string in Greek format: HH:MM DD-MM-YYYY"""
return dt.strftime("%H:%M %d-%m-%Y")
def check_printer(ip: str, port: int) -> bool:
@@ -152,88 +200,368 @@ def send_test_print(ip: str, port: int, name: str) -> Tuple[bool, str]:
return False, str(e)
def send_test_order_print(ip: str, port: int, db: Session) -> Tuple[bool, str]:
"""Print a fake order using the current font/layout settings — for settings preview."""
if _is_spoof_mode(db):
logger.info("Spoof printing ON — dropping test order print")
return True, ""
# ── Fake data structures (no DB writes) ──────────────────────────────────
class _Table:
label = "O2"
number = 2
class _User:
nickname = "bonamin"
username = "bonamin"
class _Order:
id = 99
table = _Table()
opener = _User()
table_id = 2
opened_by = 1
notes = "Χωρις καψαλισμα παρακαλω"
class _Item:
def __init__(self, product_id, quantity, selected_options, removed_ingredients, notes):
self.product_id = product_id
self.quantity = quantity
self.selected_options = selected_options
self.removed_ingredients = removed_ingredients
self.notes = notes
import json as _json
items = [
# Item 1: Freddo Espresso — quick options + preference + note
_Item(
product_id=1001,
quantity=2,
selected_options=_json.dumps([
{"name": "Διπλος", "price_delta": 0.5, "type": "quick"},
{"name": "Εξτρα ζαχαρη", "price_delta": 0.0, "type": "quick"},
{"name": "Παγωμενος", "price_delta": 0.0, "type": "quick"},
{"name": "Γαλα", "price_delta": 0.0, "type": "pref"},
{"name": "Βρωμης", "price_delta": 0.3, "type": "pref_sub"},
]),
removed_ingredients=None,
notes="Πολυ κρυο παρακαλω",
),
# Item 2: Club Sandwich — extra with sub + removed ingredients
_Item(
product_id=1002,
quantity=1,
selected_options=_json.dumps([
{"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"},
{"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"},
{"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"},
{"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"},
{"name": "Ψωμι", "price_delta": 0.0, "type": "pref"},
{"name": "Σικαλεως", "price_delta": 0.0, "type": "pref_sub"},
]),
removed_ingredients=_json.dumps(["Ντοματα", "Μουσταρδα"]),
notes=None,
),
# Item 3: Margherita — quick + extra + removed
_Item(
product_id=1003,
quantity=3,
selected_options=_json.dumps([
{"name": "Well Done", "price_delta": 0.0, "type": "quick"},
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
]),
removed_ingredients=_json.dumps(["Ελιες", "Κρεμμυδι"]),
notes=None,
),
]
# Patch product lookup so _print_kitchen_ticket gets real names
_FAKE_NAMES = {1001: "Freddo Espresso", 1002: "Club Sandwich", 1003: "Margherita Pizza"}
# Monkey-patch db.query for Product only inside this call
_orig_query = db.query
class _FakeQuery:
def __init__(self, model):
self._model = model
self._filter_id = None
def filter(self, *args):
# extract id from the filter expression value
for arg in args:
try:
self._filter_id = arg.right.value
except Exception:
pass
return self
def first(self):
if self._model.__name__ == "Product" and self._filter_id in _FAKE_NAMES:
class _P:
name = _FAKE_NAMES[self._filter_id]
return _P()
return _orig_query(self._model).filter(self._model.id == self._filter_id).first()
class _PatchedDB:
def query(self, model):
from models.product import Product as _Product
if model is _Product:
return _FakeQuery(model)
return _orig_query(model)
# delegate everything else to real db
def __getattr__(self, name):
return getattr(db, name)
try:
p = _get_printer(ip, port)
_print_kitchen_ticket(p, _Order(), items, _PatchedDB())
p.close()
return True, ""
except Exception as e:
logger.error("Test order print failed for %s:%s%s", ip, port, e)
return False, str(e)
# ── Receipt formatting ───────────────────────────────────────────────────────
def _font(p: Network, byte_val: int, bold: bool = False):
p._raw(bytes([0x1b, 0x21, byte_val]))
p._raw(b'\x1b\x45\x01' if bold else b'\x1b\x45\x00')
def _parse_options(item: OrderItem) -> dict:
"""
Parse selected_options JSON into grouped dict:
{ 'quick': [(name, qty)], 'pref': [(name, sub|None)],
'extra': [(name, sub|None, qty)], 'unknown': [name] }
Falls back gracefully when type tags are absent (old data).
"""
result = {"quick": [], "pref": [], "extra": [], "unknown": []}
if not item.selected_options:
return result
try:
raw = json.loads(item.selected_options)
except (json.JSONDecodeError, TypeError):
return result
if not isinstance(raw, list):
return result
i = 0
while i < len(raw):
entry = raw[i]
if not isinstance(entry, dict):
i += 1
continue
name = entry.get("name") or ""
etype = entry.get("type")
# Peek at next entry to collect sub-choice
sub = None
if i + 1 < len(raw):
nxt = raw[i + 1]
if isinstance(nxt, dict) and nxt.get("type") in ("pref_sub", "extra_sub"):
sub = nxt.get("name") or ""
i += 1 # consume sub
if etype == "quick":
# Collapse repeated quick entries into a single (name, qty) tuple
existing = next((q for q in result["quick"] if q[0] == name), None)
if existing:
result["quick"][result["quick"].index(existing)] = (name, existing[1] + 1)
else:
result["quick"].append((name, 1))
elif etype == "pref":
result["pref"].append((name, sub))
elif etype == "extra":
# Collapse repeated extra entries (same name+sub) → (name, sub, qty)
existing = next((e for e in result["extra"] if e[0] == name and e[1] == sub), None)
if existing:
result["extra"][result["extra"].index(existing)] = (name, sub, existing[2] + 1)
else:
result["extra"].append((name, sub, 1))
else:
# Legacy data without type tag — treat as unknown, display plainly
if name:
result["unknown"].append(name + (f" · {sub}" if sub else ""))
i += 1
return result
def _print_kitchen_ticket(p: Network, order: Order, items: List[OrderItem], db: Session):
fonts = _load_print_fonts(db)
div = fonts["print.divider_style"]
cfg = _load_print_settings(db)
mode = cfg.get("print.ticket_mode", "detailed")
div = cfg.get("print.divider_style", "dash")
compact = (mode == "compact")
sz_order, bold_order = _decode_font(fonts["print.font_order_number"])
sz_table, bold_table = _decode_font(fonts["print.font_table"])
sz_item, bold_item = _decode_font(fonts["print.font_item_name"])
sz_opt, bold_opt = _decode_font(fonts["print.font_options"])
sz_ord, b_ord, c_ord = _decode_font(cfg["print.font_order_number"])
sz_meta, b_meta, c_meta = _decode_font(cfg["print.font_meta"])
sz_item, b_item, c_item = _decode_font(cfg["print.font_item_name"])
sz_qk, b_qk, c_qk = _decode_font(cfg["print.font_quick"])
sz_pr, b_pr, c_pr = _decode_font(cfg["print.font_pref"])
sz_ex, b_ex, c_ex = _decode_font(cfg["print.font_extra"])
sz_ing, b_ing, c_ing = _decode_font(cfg["print.font_ingredient"])
sz_note, b_note, c_note = _decode_font(cfg["print.font_item_note"])
sz_onote,b_onote,c_onote= _decode_font(cfg["print.font_order_note"])
# Header — order number
p._raw(b'\x1b\x61\x01')
_font(p, sz_order, bold_order)
_raw_text(p, f"Παραγγελια #{order.id}\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x45\x00')
_divider(p, div)
# Resolve display names
table_name = order.table.label or str(order.table.number) if order.table else str(order.table_id)
waiter_nick = (order.opener.nickname or order.opener.username) if order.opener else str(order.opened_by)
now_str = _greek_date(datetime.datetime.now())
# Meta — table / waiter / time
p._raw(b'\x1b\x61\x00')
_font(p, sz_table, bold_table)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
_raw_text(p, f"Date: {now}\n")
_raw_text(p, f"Table: {order.table_id}\n")
_raw_text(p, f"Waiter: {order.opened_by}\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x45\x00')
_divider(p, div)
# ── COMPACT header — single line ────────────────────────────────────────
if compact:
p._raw(b'\x1b\x61\x00')
_apply_font(p, sz_ord, b_ord)
header = f"Παρ. #{order.id} | Τρ. {table_name} | {now_str} | {waiter_nick}"
_raw_text(p, (header.upper() if c_ord else header) + "\n")
_reset_font(p)
_divider(p, div)
# ── DETAILED header ──────────────────────────────────────────────────────
else:
_print_line(p, f"Παραγγελια #{order.id}", sz_ord, b_ord, c_ord,
align=b'\x1b\x61\x01')
_divider(p, div)
p._raw(b'\x1b\x61\x00')
_apply_font(p, sz_meta, b_meta)
_raw_text(p, ("ΤΡΑΠΕΖΙ:" if c_meta else "Τραπεζι:") + f" Τραπεζι {table_name}\n")
_raw_text(p, ("ΗΜΕΡΟΜΗΝΙΑ:" if c_meta else "Ημερομηνια:") + f" {now_str}\n")
_raw_text(p, ("ΣΕΡΒΙΤΟΡΟΣ:" if c_meta else "Σερβιτορος:") + f" {waiter_nick}\n")
_reset_font(p)
_divider(p, div)
# ── Items ────────────────────────────────────────────────────────────────
# Double-width fonts halve the effective character width
item_line_width = LINE_WIDTH // 2 if sz_item in (32, 48) else LINE_WIDTH
# Items
for item in items:
product = db.query(Product).filter(Product.id == item.product_id).first()
name = product.name if product else f"Product #{item.product_id}"
raw_name = product.name if product else f"Product #{item.product_id}"
item_name = raw_name.upper() if c_item else raw_name
_font(p, sz_item, bold_item)
_raw_text(p, _item_line(name, item.quantity) + "\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x45\x00')
p._raw(b'\x1b\x61\x00')
_apply_font(p, sz_item, b_item)
_raw_text(p, _item_line(item_name, item.quantity, item_line_width) + "\n")
_reset_font(p)
_font(p, sz_opt, bold_opt)
opts = _parse_options(item)
# Quick options (* marker)
if opts["quick"]:
if compact:
parts = []
for name, qty in opts["quick"]:
n = name.upper() if c_qk else name
parts.append(f"{n} x{qty}" if qty > 1 else n)
_apply_font(p, sz_qk, b_qk)
_raw_text(p, "* " + " | ".join(parts) + "\n")
_reset_font(p)
else:
for name, qty in opts["quick"]:
n = name.upper() if c_qk else name
line = f"* {n} x{qty}" if qty > 1 else f"* {n}"
_apply_font(p, sz_qk, b_qk)
_raw_text(p, line + "\n")
_reset_font(p)
# Preferences (> marker)
if opts["pref"]:
if compact:
parts = []
for name, sub in opts["pref"]:
n = name.upper() if c_pr else name
s = (sub.upper() if c_pr else sub) if sub else None
parts.append(f"{n} · {s}" if s else n)
_apply_font(p, sz_pr, b_pr)
_raw_text(p, "> " + " | ".join(parts) + "\n")
_reset_font(p)
else:
for name, sub in opts["pref"]:
n = name.upper() if c_pr else name
s = (sub.upper() if c_pr else sub) if sub else None
line = f"> {n} · {s}" if s else f"> {n}"
_apply_font(p, sz_pr, b_pr)
_raw_text(p, line + "\n")
_reset_font(p)
# Extras (+ marker)
if opts["extra"]:
if compact:
parts = []
for name, sub, qty in opts["extra"]:
n = name.upper() if c_ex else name
s = (sub.upper() if c_ex else sub) if sub else None
part = f"{n} · {s}" if s else n
if qty > 1:
part += f" · x{qty}"
parts.append(part)
_apply_font(p, sz_ex, b_ex)
_raw_text(p, "+ " + " | ".join(parts) + "\n")
_reset_font(p)
else:
for name, sub, qty in opts["extra"]:
n = name.upper() if c_ex else name
s = (sub.upper() if c_ex else sub) if sub else None
line = f"+ {n}"
if s:
line += f" · {s}"
if qty > 1:
line += f" · x{qty}"
_apply_font(p, sz_ex, b_ex)
_raw_text(p, line + "\n")
_reset_font(p)
# Legacy untagged options
for entry in opts["unknown"]:
_apply_font(p, sz_ex, b_ex)
_raw_text(p, f"+ {entry}\n")
_reset_font(p)
# Removed ingredients (- marker)
if item.removed_ingredients:
try:
removed_ids = json.loads(item.removed_ingredients)
if removed_ids:
_raw_text(p, f" - χωρις: {', '.join(str(i) for i in removed_ids)}\n")
except (json.JSONDecodeError, TypeError):
pass
if item.selected_options:
try:
option_ids = json.loads(item.selected_options)
if option_ids:
_raw_text(p, f" + επιλογες: {', '.join(str(i) for i in option_ids)}\n")
removed = json.loads(item.removed_ingredients)
if removed:
names = [n.upper() if c_ing else n for n in removed]
joined = " · ".join(names)
_apply_font(p, sz_ing, b_ing)
_raw_text(p, f"- ΧΩΡΙΣ: {joined}\n")
_reset_font(p)
except (json.JSONDecodeError, TypeError):
pass
# Per-item note
if item.notes:
_raw_text(p, f" (i) {item.notes}\n")
note_text = item.notes.upper() if c_note else item.notes
_apply_font(p, sz_note, b_note)
if compact:
_raw_text(p, f"! {note_text}\n")
else:
_raw_text(p, f"\n(!) {note_text}\n\n")
_reset_font(p)
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x45\x00')
# Blank line between items in detailed mode
if not compact:
p._raw(b'\n')
_divider(p, div)
# Order-level notes
if order.notes:
p._raw(b'\x1b\x21\x30')
_raw_text(p, "Σημειωσεις:\n")
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"{order.notes}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
note_text = order.notes.upper() if c_onote else order.notes
_apply_font(p, sz_onote, b_onote)
_raw_text(p, f"Σημ: {note_text}\n")
_reset_font(p)
if not compact:
_divider(p, div)
# Footer (detailed only)
if not compact:
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, "Τελος Παραγγελιας\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, "Τελος Παραγγελιας\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\n\n\n')
p.cut()

View File

@@ -0,0 +1,84 @@
"""
SSE Event Bus — in-memory broadcaster for Server-Sent Events.
All routers import `broadcast_sync()` to push events from sync routes.
The SSE endpoint imports `subscribe()` / `unsubscribe()` to manage per-client queues.
Event shape (JSON-serialisable dict):
{ "type": "<event_type>", "data": { ... } }
Supported event types:
order_updated — order created / item added / transferred / merged
order_paid — items paid on an order
order_closed — order closed or cancelled
table_list_changed — table added/removed
table_flags_changed — flags set/cleared on a table
message_sent — new staff message (targeted or broadcast)
shift_changed — shift started / ended by manager
business_day_changed — business day opened / closed
"""
import asyncio
import json
from typing import Dict, Set
# Captured once at startup by init_loop() called from lifespan.
# Sync route threads use this to schedule coroutines safely.
_main_loop: asyncio.AbstractEventLoop | None = None
# waiter_id → set of asyncio.Queue (one per SSE connection for that user)
_queues: Dict[int, Set[asyncio.Queue]] = {}
def init_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Call once from the FastAPI lifespan (async context) to capture the event loop."""
global _main_loop
_main_loop = loop
async def subscribe(user_id: int) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=256)
if user_id not in _queues:
_queues[user_id] = set()
_queues[user_id].add(q)
return q
async def unsubscribe(user_id: int, q: asyncio.Queue) -> None:
if user_id in _queues:
_queues[user_id].discard(q)
if not _queues[user_id]:
del _queues[user_id]
def broadcast_sync(event_type: str, data: dict, *, user_ids: list[int] | None = None) -> None:
"""
Fire-and-forget broadcast from a synchronous FastAPI route (thread-pool worker).
Uses call_soon_threadsafe so the coroutine runs on the main event loop, not the thread.
"""
if _main_loop is None:
return
_main_loop.call_soon_threadsafe(
_main_loop.create_task,
broadcast(event_type, data, user_ids=user_ids),
)
async def broadcast(event_type: str, data: dict, *, user_ids: list[int] | None = None) -> None:
"""
Push an event to connected clients.
user_ids=None → broadcast to ALL connected users
user_ids=[...] → send only to those specific user IDs
"""
payload = json.dumps({"type": event_type, "data": data})
targets = (
{uid: qs for uid, qs in _queues.items() if uid in user_ids}
if user_ids is not None
else dict(_queues)
)
for qs in targets.values():
for q in list(qs):
try:
q.put_nowait(payload)
except asyncio.QueueFull:
pass # slow client — drop rather than block