Includes all work to date: - local_backend: FastAPI backend with products, orders, tables, shifts, cloud sync - manager_dashboard: React manager UI with product/category management, reports, settings - waiter_pwa: React PWA for waiter devices - Category reparent endpoint and UI - Waiter domain: local_ip sent on heartbeat, waiter_domain persisted from cloud response - QR code modal in AppInfoTab for waiter domain - Product form: number input spinners removed, category pre-selected on new product - Category row: count badge moved to far right Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
900 lines
33 KiB
Python
900 lines
33 KiB
Python
"""
|
||
ESC/POS printer service — Jolimark TP850UE confirmed configuration.
|
||
|
||
Key findings from printer testing:
|
||
- Code page n=29 (CP737) is the only working Greek code page on this model.
|
||
- All Greek text MUST be sent as raw CP737 bytes via p._raw() — never p.text().
|
||
- Set the code page immediately after connecting, before any output.
|
||
- 80mm paper = 48 chars wide at standard font. Double-height keeps 48-char width.
|
||
"""
|
||
import json
|
||
import logging
|
||
import socket
|
||
import datetime
|
||
from typing import Tuple, List
|
||
|
||
from escpos.printer import Network
|
||
from sqlalchemy.orm import Session
|
||
|
||
from database import SessionLocal
|
||
from models.order import Order, OrderItem, PrintLog
|
||
from models.printer import Printer
|
||
from models.product import Product
|
||
from models.settings import PosSettings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
LINE_WIDTH = 48
|
||
PRINTER_TIMEOUT = 5
|
||
|
||
|
||
# ── Low-level helpers ────────────────────────────────────────────────────────
|
||
|
||
def _get_printer(ip: str, port: int) -> Network:
|
||
p = Network(ip, port, timeout=PRINTER_TIMEOUT)
|
||
p._raw(b'\x1b\x40') # ESC @ — reset printer
|
||
p._raw(b'\x1b\x74\x1d') # ESC t 29 — select CP737 (Greek) — confirmed n=29
|
||
return p
|
||
|
||
|
||
def _gr(text: str) -> bytes:
|
||
"""Encode text to CP737 bytes. Replaces unknown chars instead of crashing."""
|
||
return text.encode('cp737', errors='replace')
|
||
|
||
|
||
def _raw_text(p: Network, text: str):
|
||
"""Send text as raw CP737 bytes — the ONLY safe way to print Greek."""
|
||
p._raw(_gr(text))
|
||
|
||
|
||
_DIVIDER_CHARS = {
|
||
"dash": "-",
|
||
"equals": "=",
|
||
"star": "*",
|
||
"empty": "",
|
||
}
|
||
|
||
_PRINT_SETTING_KEYS = [
|
||
"print.ticket_mode",
|
||
"print.divider_style",
|
||
"print.font_order_number",
|
||
"print.font_meta",
|
||
"print.font_item_name",
|
||
"print.font_quick",
|
||
"print.font_pref",
|
||
"print.font_extra",
|
||
"print.font_ingredient",
|
||
"print.font_item_note",
|
||
"print.font_order_note",
|
||
]
|
||
|
||
_PRINT_SETTING_DEFAULTS = {
|
||
"print.ticket_mode": "detailed",
|
||
"print.divider_style": "dash",
|
||
"print.font_order_number": "48:1:0",
|
||
"print.font_meta": "0:0:0",
|
||
"print.font_item_name": "16:1:0",
|
||
"print.font_quick": "0:0:0",
|
||
"print.font_pref": "0:0:0",
|
||
"print.font_extra": "0:0:0",
|
||
"print.font_ingredient": "0:0:0",
|
||
"print.font_item_note": "0:0:0",
|
||
"print.font_order_note": "0:1:0",
|
||
}
|
||
|
||
# SIZE byte values (ESC ! base, no bold bit):
|
||
# 0 = normal
|
||
# 16 = double-height (bit4)
|
||
# 32 = double-width (bit5)
|
||
# 48 = double-height + double-width (bits 4+5)
|
||
# Bold applied via ESC E, caps applied in software before encoding.
|
||
|
||
def _decode_font(value: str) -> tuple[int, bool, bool]:
|
||
"""Parse 'SIZE:BOLD:CAPS' string → (esc_bang_byte, bold_flag, caps_flag)."""
|
||
try:
|
||
parts = str(value).split(":")
|
||
size = int(parts[0])
|
||
bold = len(parts) > 1 and parts[1] == "1"
|
||
caps = len(parts) > 2 and parts[2] == "1"
|
||
return size, bold, caps
|
||
except (ValueError, AttributeError):
|
||
return 0, False, False
|
||
|
||
|
||
def _load_print_settings(db: Session) -> dict:
|
||
rows = db.query(PosSettings).filter(
|
||
PosSettings.key.in_(_PRINT_SETTING_KEYS)
|
||
).all()
|
||
settings = dict(_PRINT_SETTING_DEFAULTS)
|
||
for row in rows:
|
||
settings[row.key] = row.value
|
||
return settings
|
||
|
||
|
||
def _divider(p: Network, style: str = "dash"):
|
||
char = _DIVIDER_CHARS.get(style, "-")
|
||
p._raw(b'\x1b\x61\x00')
|
||
if char:
|
||
p._raw(_gr(char * LINE_WIDTH + "\n"))
|
||
else:
|
||
p._raw(b'\n')
|
||
|
||
|
||
def _item_line(name: str, qty: int, line_width: int = LINE_WIDTH) -> str:
|
||
"""Build a dot-leader line ending with 'xN'.
|
||
line_width must reflect the effective width at the chosen font size
|
||
(double-width fonts halve the available char count to 24)."""
|
||
suffix = f"x{qty}"
|
||
available = line_width - len(name) - len(suffix)
|
||
if available < 2:
|
||
# Name alone is too long — put qty on same line with a single space
|
||
return f"{name} {suffix}"
|
||
dots = (". " * ((available // 2) + 1))[:available]
|
||
return f"{name}{dots}{suffix}"
|
||
|
||
|
||
def _apply_font(p: Network, size: int, bold: bool):
|
||
p._raw(bytes([0x1b, 0x21, size]))
|
||
p._raw(b'\x1b\x45\x01' if bold else b'\x1b\x45\x00')
|
||
|
||
|
||
def _reset_font(p: Network):
|
||
p._raw(b'\x1b\x21\x00')
|
||
p._raw(b'\x1b\x45\x00')
|
||
|
||
|
||
def _print_line(p: Network, text: str, size: int, bold: bool, caps: bool,
|
||
align: bytes = b'\x1b\x61\x00'):
|
||
"""Apply font, optionally capitalize, print text + newline, reset font."""
|
||
p._raw(align)
|
||
_apply_font(p, size, bold)
|
||
out = text.upper() if caps else text
|
||
_raw_text(p, out + "\n")
|
||
_reset_font(p)
|
||
|
||
|
||
def _greek_date(dt: datetime.datetime) -> str:
|
||
"""Return date/time string in Greek format: HH:MM DD-MM-YYYY"""
|
||
return dt.strftime("%H:%M %d-%m-%Y")
|
||
|
||
|
||
def check_printer(ip: str, port: int) -> bool:
|
||
"""Quick TCP connect check — no data sent."""
|
||
try:
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
s.settimeout(2)
|
||
s.connect((ip, port))
|
||
s.close()
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
def is_spoof_mode() -> bool:
|
||
"""Stateless check — opens its own DB session. For use outside route_and_print."""
|
||
db = SessionLocal()
|
||
try:
|
||
return _is_spoof_mode(db)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def send_test_print(ip: str, port: int, name: str) -> Tuple[bool, str]:
|
||
if is_spoof_mode():
|
||
logger.info("Spoof printing ON — dropping test print for %s", name)
|
||
return True, ""
|
||
try:
|
||
p = _get_printer(ip, port)
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, f"TEST — {name}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
_raw_text(p, f"{now}\n")
|
||
p._raw(b'\n\n\n')
|
||
p.cut()
|
||
p.close()
|
||
return True, ""
|
||
except Exception as e:
|
||
logger.error("Test print failed for %s:%s — %s", ip, port, e)
|
||
return False, str(e)
|
||
|
||
|
||
def send_test_order_print(ip: str, port: int, db: Session) -> Tuple[bool, str]:
|
||
"""Print a fake order using the current font/layout settings — for settings preview."""
|
||
if _is_spoof_mode(db):
|
||
logger.info("Spoof printing ON — dropping test order print")
|
||
return True, ""
|
||
|
||
# ── Fake data structures (no DB writes) ──────────────────────────────────
|
||
class _Table:
|
||
label = "O2"
|
||
number = 2
|
||
|
||
class _User:
|
||
nickname = "bonamin"
|
||
username = "bonamin"
|
||
|
||
class _Order:
|
||
id = 99
|
||
table = _Table()
|
||
opener = _User()
|
||
table_id = 2
|
||
opened_by = 1
|
||
notes = "Χωρις καψαλισμα παρακαλω"
|
||
|
||
class _Item:
|
||
def __init__(self, product_id, quantity, selected_options, removed_ingredients, notes):
|
||
self.product_id = product_id
|
||
self.quantity = quantity
|
||
self.selected_options = selected_options
|
||
self.removed_ingredients = removed_ingredients
|
||
self.notes = notes
|
||
|
||
import json as _json
|
||
|
||
items = [
|
||
# Item 1: Freddo Espresso — quick options + preference + note
|
||
_Item(
|
||
product_id=1001,
|
||
quantity=2,
|
||
selected_options=_json.dumps([
|
||
{"name": "Διπλος", "price_delta": 0.5, "type": "quick"},
|
||
{"name": "Εξτρα ζαχαρη", "price_delta": 0.0, "type": "quick"},
|
||
{"name": "Παγωμενος", "price_delta": 0.0, "type": "quick"},
|
||
{"name": "Γαλα", "price_delta": 0.0, "type": "pref"},
|
||
{"name": "Βρωμης", "price_delta": 0.3, "type": "pref_sub"},
|
||
]),
|
||
removed_ingredients=None,
|
||
notes="Πολυ κρυο παρακαλω",
|
||
),
|
||
# Item 2: Club Sandwich — extra with sub + removed ingredients
|
||
_Item(
|
||
product_id=1002,
|
||
quantity=1,
|
||
selected_options=_json.dumps([
|
||
{"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"},
|
||
{"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"},
|
||
{"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"},
|
||
{"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"},
|
||
{"name": "Ψωμι", "price_delta": 0.0, "type": "pref"},
|
||
{"name": "Σικαλεως", "price_delta": 0.0, "type": "pref_sub"},
|
||
]),
|
||
removed_ingredients=_json.dumps(["Ντοματα", "Μουσταρδα"]),
|
||
notes=None,
|
||
),
|
||
# Item 3: Margherita — quick + extra + removed
|
||
_Item(
|
||
product_id=1003,
|
||
quantity=3,
|
||
selected_options=_json.dumps([
|
||
{"name": "Well Done", "price_delta": 0.0, "type": "quick"},
|
||
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
|
||
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
|
||
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
|
||
]),
|
||
removed_ingredients=_json.dumps(["Ελιες", "Κρεμμυδι"]),
|
||
notes=None,
|
||
),
|
||
]
|
||
|
||
# Patch product lookup so _print_kitchen_ticket gets real names
|
||
_FAKE_NAMES = {1001: "Freddo Espresso", 1002: "Club Sandwich", 1003: "Margherita Pizza"}
|
||
|
||
# Monkey-patch db.query for Product only inside this call
|
||
_orig_query = db.query
|
||
|
||
class _FakeQuery:
|
||
def __init__(self, model):
|
||
self._model = model
|
||
self._filter_id = None
|
||
def filter(self, *args):
|
||
# extract id from the filter expression value
|
||
for arg in args:
|
||
try:
|
||
self._filter_id = arg.right.value
|
||
except Exception:
|
||
pass
|
||
return self
|
||
def first(self):
|
||
if self._model.__name__ == "Product" and self._filter_id in _FAKE_NAMES:
|
||
class _P:
|
||
name = _FAKE_NAMES[self._filter_id]
|
||
return _P()
|
||
return _orig_query(self._model).filter(self._model.id == self._filter_id).first()
|
||
|
||
class _PatchedDB:
|
||
def query(self, model):
|
||
from models.product import Product as _Product
|
||
if model is _Product:
|
||
return _FakeQuery(model)
|
||
return _orig_query(model)
|
||
# delegate everything else to real db
|
||
def __getattr__(self, name):
|
||
return getattr(db, name)
|
||
|
||
try:
|
||
p = _get_printer(ip, port)
|
||
_print_kitchen_ticket(p, _Order(), items, _PatchedDB())
|
||
p.close()
|
||
return True, ""
|
||
except Exception as e:
|
||
logger.error("Test order print failed for %s:%s — %s", ip, port, e)
|
||
return False, str(e)
|
||
|
||
|
||
# ── Receipt formatting ───────────────────────────────────────────────────────
|
||
|
||
def _parse_options(item: OrderItem) -> dict:
|
||
"""
|
||
Parse selected_options JSON into grouped dict:
|
||
{ 'quick': [(name, qty)], 'pref': [(name, sub|None)],
|
||
'extra': [(name, sub|None, qty)], 'unknown': [name] }
|
||
Falls back gracefully when type tags are absent (old data).
|
||
"""
|
||
result = {"quick": [], "pref": [], "extra": [], "unknown": []}
|
||
if not item.selected_options:
|
||
return result
|
||
|
||
try:
|
||
raw = json.loads(item.selected_options)
|
||
except (json.JSONDecodeError, TypeError):
|
||
return result
|
||
|
||
if not isinstance(raw, list):
|
||
return result
|
||
|
||
i = 0
|
||
while i < len(raw):
|
||
entry = raw[i]
|
||
if not isinstance(entry, dict):
|
||
i += 1
|
||
continue
|
||
name = entry.get("name") or ""
|
||
etype = entry.get("type")
|
||
|
||
# Peek at next entry to collect sub-choice
|
||
sub = None
|
||
if i + 1 < len(raw):
|
||
nxt = raw[i + 1]
|
||
if isinstance(nxt, dict) and nxt.get("type") in ("pref_sub", "extra_sub"):
|
||
sub = nxt.get("name") or ""
|
||
i += 1 # consume sub
|
||
|
||
if etype == "quick":
|
||
# Collapse repeated quick entries into a single (name, qty) tuple
|
||
existing = next((q for q in result["quick"] if q[0] == name), None)
|
||
if existing:
|
||
result["quick"][result["quick"].index(existing)] = (name, existing[1] + 1)
|
||
else:
|
||
result["quick"].append((name, 1))
|
||
elif etype == "pref":
|
||
result["pref"].append((name, sub))
|
||
elif etype == "extra":
|
||
# Collapse repeated extra entries (same name+sub) → (name, sub, qty)
|
||
existing = next((e for e in result["extra"] if e[0] == name and e[1] == sub), None)
|
||
if existing:
|
||
result["extra"][result["extra"].index(existing)] = (name, sub, existing[2] + 1)
|
||
else:
|
||
result["extra"].append((name, sub, 1))
|
||
else:
|
||
# Legacy data without type tag — treat as unknown, display plainly
|
||
if name:
|
||
result["unknown"].append(name + (f" · {sub}" if sub else ""))
|
||
|
||
i += 1
|
||
|
||
return result
|
||
|
||
|
||
def _print_kitchen_ticket(p: Network, order: Order, items: List[OrderItem], db: Session):
|
||
cfg = _load_print_settings(db)
|
||
mode = cfg.get("print.ticket_mode", "detailed")
|
||
div = cfg.get("print.divider_style", "dash")
|
||
compact = (mode == "compact")
|
||
|
||
sz_ord, b_ord, c_ord = _decode_font(cfg["print.font_order_number"])
|
||
sz_meta, b_meta, c_meta = _decode_font(cfg["print.font_meta"])
|
||
sz_item, b_item, c_item = _decode_font(cfg["print.font_item_name"])
|
||
sz_qk, b_qk, c_qk = _decode_font(cfg["print.font_quick"])
|
||
sz_pr, b_pr, c_pr = _decode_font(cfg["print.font_pref"])
|
||
sz_ex, b_ex, c_ex = _decode_font(cfg["print.font_extra"])
|
||
sz_ing, b_ing, c_ing = _decode_font(cfg["print.font_ingredient"])
|
||
sz_note, b_note, c_note = _decode_font(cfg["print.font_item_note"])
|
||
sz_onote,b_onote,c_onote= _decode_font(cfg["print.font_order_note"])
|
||
|
||
# Resolve display names
|
||
table_name = order.table.label or str(order.table.number) if order.table else str(order.table_id)
|
||
waiter_nick = (order.opener.nickname or order.opener.username) if order.opener else str(order.opened_by)
|
||
now_str = _greek_date(datetime.datetime.now())
|
||
|
||
# ── COMPACT header — single line ────────────────────────────────────────
|
||
if compact:
|
||
p._raw(b'\x1b\x61\x00')
|
||
_apply_font(p, sz_ord, b_ord)
|
||
header = f"Παρ. #{order.id} | Τρ. {table_name} | {now_str} | {waiter_nick}"
|
||
_raw_text(p, (header.upper() if c_ord else header) + "\n")
|
||
_reset_font(p)
|
||
_divider(p, div)
|
||
|
||
# ── DETAILED header ──────────────────────────────────────────────────────
|
||
else:
|
||
_print_line(p, f"Παραγγελια #{order.id}", sz_ord, b_ord, c_ord,
|
||
align=b'\x1b\x61\x01')
|
||
_divider(p, div)
|
||
p._raw(b'\x1b\x61\x00')
|
||
_apply_font(p, sz_meta, b_meta)
|
||
_raw_text(p, ("ΤΡΑΠΕΖΙ:" if c_meta else "Τραπεζι:") + f" Τραπεζι {table_name}\n")
|
||
_raw_text(p, ("ΗΜΕΡΟΜΗΝΙΑ:" if c_meta else "Ημερομηνια:") + f" {now_str}\n")
|
||
_raw_text(p, ("ΣΕΡΒΙΤΟΡΟΣ:" if c_meta else "Σερβιτορος:") + f" {waiter_nick}\n")
|
||
_reset_font(p)
|
||
_divider(p, div)
|
||
|
||
# ── Items ────────────────────────────────────────────────────────────────
|
||
# Double-width fonts halve the effective character width
|
||
item_line_width = LINE_WIDTH // 2 if sz_item in (32, 48) else LINE_WIDTH
|
||
|
||
for item in items:
|
||
product = db.query(Product).filter(Product.id == item.product_id).first()
|
||
raw_name = product.name if product else f"Product #{item.product_id}"
|
||
item_name = raw_name.upper() if c_item else raw_name
|
||
|
||
p._raw(b'\x1b\x61\x00')
|
||
_apply_font(p, sz_item, b_item)
|
||
_raw_text(p, _item_line(item_name, item.quantity, item_line_width) + "\n")
|
||
_reset_font(p)
|
||
|
||
opts = _parse_options(item)
|
||
|
||
# Quick options (* marker)
|
||
if opts["quick"]:
|
||
if compact:
|
||
parts = []
|
||
for name, qty in opts["quick"]:
|
||
n = name.upper() if c_qk else name
|
||
parts.append(f"{n} x{qty}" if qty > 1 else n)
|
||
_apply_font(p, sz_qk, b_qk)
|
||
_raw_text(p, "* " + " | ".join(parts) + "\n")
|
||
_reset_font(p)
|
||
else:
|
||
for name, qty in opts["quick"]:
|
||
n = name.upper() if c_qk else name
|
||
line = f"* {n} x{qty}" if qty > 1 else f"* {n}"
|
||
_apply_font(p, sz_qk, b_qk)
|
||
_raw_text(p, line + "\n")
|
||
_reset_font(p)
|
||
|
||
# Preferences (> marker)
|
||
if opts["pref"]:
|
||
if compact:
|
||
parts = []
|
||
for name, sub in opts["pref"]:
|
||
n = name.upper() if c_pr else name
|
||
s = (sub.upper() if c_pr else sub) if sub else None
|
||
parts.append(f"{n} · {s}" if s else n)
|
||
_apply_font(p, sz_pr, b_pr)
|
||
_raw_text(p, "> " + " | ".join(parts) + "\n")
|
||
_reset_font(p)
|
||
else:
|
||
for name, sub in opts["pref"]:
|
||
n = name.upper() if c_pr else name
|
||
s = (sub.upper() if c_pr else sub) if sub else None
|
||
line = f"> {n} · {s}" if s else f"> {n}"
|
||
_apply_font(p, sz_pr, b_pr)
|
||
_raw_text(p, line + "\n")
|
||
_reset_font(p)
|
||
|
||
# Extras (+ marker)
|
||
if opts["extra"]:
|
||
if compact:
|
||
parts = []
|
||
for name, sub, qty in opts["extra"]:
|
||
n = name.upper() if c_ex else name
|
||
s = (sub.upper() if c_ex else sub) if sub else None
|
||
part = f"{n} · {s}" if s else n
|
||
if qty > 1:
|
||
part += f" · x{qty}"
|
||
parts.append(part)
|
||
_apply_font(p, sz_ex, b_ex)
|
||
_raw_text(p, "+ " + " | ".join(parts) + "\n")
|
||
_reset_font(p)
|
||
else:
|
||
for name, sub, qty in opts["extra"]:
|
||
n = name.upper() if c_ex else name
|
||
s = (sub.upper() if c_ex else sub) if sub else None
|
||
line = f"+ {n}"
|
||
if s:
|
||
line += f" · {s}"
|
||
if qty > 1:
|
||
line += f" · x{qty}"
|
||
_apply_font(p, sz_ex, b_ex)
|
||
_raw_text(p, line + "\n")
|
||
_reset_font(p)
|
||
|
||
# Legacy untagged options
|
||
for entry in opts["unknown"]:
|
||
_apply_font(p, sz_ex, b_ex)
|
||
_raw_text(p, f"+ {entry}\n")
|
||
_reset_font(p)
|
||
|
||
# Removed ingredients (- marker)
|
||
if item.removed_ingredients:
|
||
try:
|
||
removed = json.loads(item.removed_ingredients)
|
||
if removed:
|
||
names = [n.upper() if c_ing else n for n in removed]
|
||
joined = " · ".join(names)
|
||
_apply_font(p, sz_ing, b_ing)
|
||
_raw_text(p, f"- ΧΩΡΙΣ: {joined}\n")
|
||
_reset_font(p)
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
# Per-item note
|
||
if item.notes:
|
||
note_text = item.notes.upper() if c_note else item.notes
|
||
_apply_font(p, sz_note, b_note)
|
||
if compact:
|
||
_raw_text(p, f"! {note_text}\n")
|
||
else:
|
||
_raw_text(p, f"\n(!) {note_text}\n\n")
|
||
_reset_font(p)
|
||
|
||
# Blank line between items in detailed mode
|
||
if not compact:
|
||
p._raw(b'\n')
|
||
|
||
_divider(p, div)
|
||
|
||
# Order-level notes
|
||
if order.notes:
|
||
note_text = order.notes.upper() if c_onote else order.notes
|
||
_apply_font(p, sz_onote, b_onote)
|
||
_raw_text(p, f"Σημ: {note_text}\n")
|
||
_reset_font(p)
|
||
if not compact:
|
||
_divider(p, div)
|
||
|
||
# Footer (detailed only)
|
||
if not compact:
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, "Τελος Παραγγελιας\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
|
||
p._raw(b'\n\n\n')
|
||
p.cut()
|
||
|
||
|
||
# ── On-demand report / receipt prints ────────────────────────────────────────
|
||
|
||
def print_waiter_report(ip: str, port: int, report: dict, mode: str):
|
||
"""Print a waiter shift/period report. mode='simple'|'extensive'."""
|
||
if is_spoof_mode():
|
||
logger.info("Spoof printing ON — dropping waiter report print")
|
||
return
|
||
try:
|
||
p = _get_printer(ip, port)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, "ΑΝΑΦΟΡΑ ΣΕΡΒΙΤΟΡΟΥ\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x00')
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Σερβιτορος: {report['waiter_name']}\n")
|
||
_raw_text(p, f"Απο: {report['from_dt']}\n")
|
||
_raw_text(p, f"Εως: {report['to_dt']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Παραγγελιες: {report['orders']}\n")
|
||
_raw_text(p, f"Αντικειμενα: {report['items']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, f"ΣΥΝΟΛΟ: {report['total']:.2f}e\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
|
||
if mode == "extensive" and report.get("order_data"):
|
||
_divider(p)
|
||
p._raw(b'\x1b\x61\x00')
|
||
_raw_text(p, "ΑΝΑΛΥΤΙΚΑ\n")
|
||
_divider(p)
|
||
for od in report["order_data"]:
|
||
# Build right-aligned total: "HH:MM - HH:MM - TABLE . . . 9.99e"
|
||
time_open = od.get("time_open", "")
|
||
time_close = od.get("time_close", "")
|
||
table = od["table"]
|
||
value = f"{od['total']:.2f}e"
|
||
times_part = f"{time_open} - {time_close}" if time_close else time_open
|
||
prefix = f"{times_part} - {table}"
|
||
gap = LINE_WIDTH - len(prefix) - len(value)
|
||
if gap < 3:
|
||
line = f"{prefix} {value}"
|
||
else:
|
||
dots = (". " * ((gap // 2) + 1))[:gap]
|
||
line = f"{prefix}{dots}{value}"
|
||
_raw_text(p, line + "\n")
|
||
|
||
p._raw(b'\n\n\n')
|
||
p.cut()
|
||
p.close()
|
||
except Exception as e:
|
||
logger.error("print_waiter_report failed for %s:%s — %s", ip, port, e)
|
||
|
||
|
||
def print_printer_report(ip: str, port: int, report: dict, mode: str):
|
||
"""Print a per-printer totals report. mode='simple'|'extensive'."""
|
||
if is_spoof_mode():
|
||
logger.info("Spoof printing ON — dropping printer report print")
|
||
return
|
||
try:
|
||
p = _get_printer(ip, port)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, "ΑΝΑΦΟΡΑ ΕΚΤΥΠΩΤΗ\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x00')
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Εκτυπωτης: {report['printer_name']}\n")
|
||
_raw_text(p, f"Απο: {report['from_dt']}\n")
|
||
_raw_text(p, f"Εως: {report['to_dt']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Εργασιες εκτ.: {report['print_jobs']}\n")
|
||
_raw_text(p, f"Παραγγελιες: {report['orders']}\n")
|
||
_raw_text(p, f"Αντικειμενα: {report['items']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, f"ΣΥΝΟΛΟ: {report['total']:.2f}e\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
|
||
if mode == "extensive" and report.get("order_data"):
|
||
_divider(p)
|
||
p._raw(b'\x1b\x61\x00')
|
||
_raw_text(p, "ΑΝΑΛΥΤΙΚΑ\n")
|
||
_divider(p)
|
||
for od in report["order_data"]:
|
||
# Header line: "HH:MM - TABLE . . . . 9.99e"
|
||
prefix = f"{od['time']} - {od['table']}"
|
||
value = f"{od['total']:.2f}e"
|
||
gap = LINE_WIDTH - len(prefix) - len(value)
|
||
if gap < 3:
|
||
header_line = f"{prefix} {value}"
|
||
else:
|
||
dots = (". " * ((gap // 2) + 1))[:gap]
|
||
header_line = f"{prefix}{dots}{value}"
|
||
p._raw(b'\x1b\x45\x01')
|
||
_raw_text(p, header_line + "\n")
|
||
p._raw(b'\x1b\x45\x00')
|
||
# Indented items
|
||
for item in od.get("items", []):
|
||
_raw_text(p, f" {item['quantity']} x {item['name']}\n")
|
||
|
||
p._raw(b'\n\n\n')
|
||
p.cut()
|
||
p.close()
|
||
except Exception as e:
|
||
logger.error("print_printer_report failed for %s:%s — %s", ip, port, e)
|
||
|
||
|
||
def print_order_receipt(ip: str, port: int, receipt: dict):
|
||
"""Print a manager-triggered order receipt."""
|
||
if is_spoof_mode():
|
||
logger.info("Spoof printing ON — dropping order receipt print")
|
||
return
|
||
try:
|
||
p = _get_printer(ip, port)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, f"ΠΑΡΑΓΓΕΛΙΑ #{receipt['order_id']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x00')
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Τραπεζι: {receipt['table_name']}\n")
|
||
_raw_text(p, f"Σερβιτορος: {receipt['waiter_name']}\n")
|
||
_raw_text(p, f"Ανοιχτηκε: {receipt['opened_at']}\n")
|
||
if receipt.get("closed_at"):
|
||
_raw_text(p, f"Εκλεισε: {receipt['closed_at']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
for item in receipt.get("items", []):
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, _item_line(item["name"], item["quantity"]) + "\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n")
|
||
|
||
_divider(p)
|
||
|
||
if receipt.get("notes"):
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Σημ: {receipt['notes']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, f"ΣΥΝΟΛΟ: {receipt['total']:.2f}e\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
|
||
p._raw(b'\n\n\n')
|
||
p.cut()
|
||
p.close()
|
||
except Exception as e:
|
||
logger.error("print_order_receipt failed for %s:%s — %s", ip, port, e)
|
||
|
||
|
||
def print_order_synopsis(ip: str, port: int, synopsis: dict):
|
||
"""Print a waiter-triggered order synopsis (not a kitchen ticket)."""
|
||
if is_spoof_mode():
|
||
logger.info("Spoof printing ON — dropping order synopsis print")
|
||
return
|
||
try:
|
||
p = _get_printer(ip, port)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, "ΣΥΝΟΨΗ ΠΑΡΑΓΓΕΛΙΑΣ\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x00')
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Τραπεζι: {synopsis['table_name']}\n")
|
||
_raw_text(p, f"Σερβιτορος: {synopsis['waiter_name']}\n")
|
||
_raw_text(p, f"Ωρα: {synopsis['opened_at']}\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
_divider(p)
|
||
|
||
paid_items = [i for i in synopsis.get("items", []) if i["status"] == "paid"]
|
||
active_items = [i for i in synopsis.get("items", []) if i["status"] == "active"]
|
||
|
||
if active_items:
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, "ΕΚΚΡΕΜΗ:\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
for item in active_items:
|
||
_raw_text(p, f" {_item_line(item['name'], item['quantity'])}\n")
|
||
_raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n")
|
||
_divider(p)
|
||
|
||
if paid_items:
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, "ΠΛΗΡΩΜΕΝΑ:\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
for item in paid_items:
|
||
_raw_text(p, f" {_item_line(item['name'], item['quantity'])}\n")
|
||
_raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n")
|
||
_divider(p)
|
||
|
||
p._raw(b'\x1b\x61\x01')
|
||
p._raw(b'\x1b\x21\x30')
|
||
_raw_text(p, f"ΣΥΝΟΛΟ: {synopsis['total']:.2f}e\n")
|
||
if synopsis.get('paid_total', 0) > 0:
|
||
p._raw(b'\x1b\x21\x10')
|
||
_raw_text(p, f"Πληρωμενο: {synopsis['paid_total']:.2f}e\n")
|
||
_raw_text(p, f"Εκκρεμει: {synopsis['remaining']:.2f}e\n")
|
||
p._raw(b'\x1b\x21\x00')
|
||
|
||
p._raw(b'\n\n\n')
|
||
p.cut()
|
||
p.close()
|
||
except Exception as e:
|
||
logger.error("print_order_synopsis failed for %s:%s — %s", ip, port, e)
|
||
|
||
|
||
# ── Routing logic ────────────────────────────────────────────────────────────
|
||
|
||
def route_and_print(order_id: int, item_ids: List[int]):
|
||
"""
|
||
Background task: group items by printer zone, send to each printer.
|
||
Printer failures are logged but never raise — order is already saved.
|
||
"""
|
||
db: Session = SessionLocal()
|
||
try:
|
||
_do_route_and_print(order_id, item_ids, db)
|
||
except Exception as e:
|
||
logger.exception("Unexpected error in route_and_print for order %s: %s", order_id, e)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def route_and_print_sync(order_id: int, item_ids: List[int], db: Session) -> List[dict]:
|
||
"""
|
||
Synchronous variant used when the caller needs print results.
|
||
Returns a list of per-printer result dicts:
|
||
{ printer_name, success, error }
|
||
"""
|
||
return _do_route_and_print(order_id, item_ids, db)
|
||
|
||
|
||
def _is_spoof_mode(db: Session) -> bool:
|
||
row = db.query(PosSettings).filter(PosSettings.key == "dev.spoof_printing").first()
|
||
return row is not None and row.value == "true"
|
||
|
||
|
||
def _do_route_and_print(order_id: int, item_ids: List[int], db: Session) -> List[dict]:
|
||
if _is_spoof_mode(db):
|
||
logger.info("Spoof printing ON — dropping print job for order %s", order_id)
|
||
for item_id in item_ids:
|
||
item = db.query(OrderItem).filter(OrderItem.id == item_id).first()
|
||
if item:
|
||
item.printed = True
|
||
db.commit()
|
||
return [{"printer_name": "spoof", "success": True, "error": None}]
|
||
|
||
results = []
|
||
|
||
order = db.query(Order).filter(Order.id == order_id).first()
|
||
if not order:
|
||
logger.error("route_and_print: order %s not found", order_id)
|
||
return results
|
||
|
||
items = db.query(OrderItem).filter(OrderItem.id.in_(item_ids)).all()
|
||
|
||
# Group items by printer zone
|
||
zone_map: dict[int, List[OrderItem]] = {}
|
||
unzoned: List[OrderItem] = []
|
||
for item in items:
|
||
product = db.query(Product).filter(Product.id == item.product_id).first()
|
||
if product and product.printer_zone_id:
|
||
zone_map.setdefault(product.printer_zone_id, []).append(item)
|
||
else:
|
||
unzoned.append(item)
|
||
|
||
if unzoned:
|
||
logger.warning("order %s has %d item(s) with no printer zone — skipped", order_id, len(unzoned))
|
||
|
||
for printer_id, zone_items in zone_map.items():
|
||
printer = db.query(Printer).filter(Printer.id == printer_id, Printer.is_active == True).first()
|
||
if not printer:
|
||
logger.warning("Printer %s not found or inactive", printer_id)
|
||
results.append({"printer_name": f"#{printer_id}", "success": False, "error": "Printer not found or inactive"})
|
||
continue
|
||
|
||
success = False
|
||
error_msg = None
|
||
try:
|
||
p = _get_printer(printer.ip_address, printer.port)
|
||
_print_kitchen_ticket(p, order, zone_items, db)
|
||
p.close()
|
||
success = True
|
||
for item in zone_items:
|
||
item.printed = True
|
||
db.commit()
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.error("Print failed for printer %s (%s:%s): %s", printer.name, printer.ip_address, printer.port, e)
|
||
|
||
log = PrintLog(
|
||
order_id=order_id,
|
||
printer_id=printer_id,
|
||
item_ids=json.dumps([i.id for i in zone_items]),
|
||
success=success,
|
||
error_message=error_msg,
|
||
)
|
||
db.add(log)
|
||
db.commit()
|
||
|
||
results.append({"printer_name": printer.name, "success": success, "error": error_msg})
|
||
|
||
return results
|