Files
simple-pos-system/local_backend/services/printer_service.py

900 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ESC/POS printer service — Jolimark TP850UE confirmed configuration.
Key findings from printer testing:
- Code page n=29 (CP737) is the only working Greek code page on this model.
- All Greek text MUST be sent as raw CP737 bytes via p._raw() — never p.text().
- Set the code page immediately after connecting, before any output.
- 80mm paper = 48 chars wide at standard font. Double-height keeps 48-char width.
"""
import json
import logging
import socket
import datetime
from typing import Tuple, List
from escpos.printer import Network
from sqlalchemy.orm import Session
from database import SessionLocal
from models.order import Order, OrderItem, PrintLog
from models.printer import Printer
from models.product import Product
from models.settings import PosSettings
logger = logging.getLogger(__name__)
LINE_WIDTH = 48
PRINTER_TIMEOUT = 5
# ── Low-level helpers ────────────────────────────────────────────────────────
def _get_printer(ip: str, port: int) -> Network:
p = Network(ip, port, timeout=PRINTER_TIMEOUT)
p._raw(b'\x1b\x40') # ESC @ — reset printer
p._raw(b'\x1b\x74\x1d') # ESC t 29 — select CP737 (Greek) — confirmed n=29
return p
def _gr(text: str) -> bytes:
"""Encode text to CP737 bytes. Replaces unknown chars instead of crashing."""
return text.encode('cp737', errors='replace')
def _raw_text(p: Network, text: str):
"""Send text as raw CP737 bytes — the ONLY safe way to print Greek."""
p._raw(_gr(text))
_DIVIDER_CHARS = {
"dash": "-",
"equals": "=",
"star": "*",
"empty": "",
}
_PRINT_SETTING_KEYS = [
"print.ticket_mode",
"print.divider_style",
"print.font_order_number",
"print.font_meta",
"print.font_item_name",
"print.font_quick",
"print.font_pref",
"print.font_extra",
"print.font_ingredient",
"print.font_item_note",
"print.font_order_note",
]
_PRINT_SETTING_DEFAULTS = {
"print.ticket_mode": "detailed",
"print.divider_style": "dash",
"print.font_order_number": "48:1:0",
"print.font_meta": "0:0:0",
"print.font_item_name": "16:1:0",
"print.font_quick": "0:0:0",
"print.font_pref": "0:0:0",
"print.font_extra": "0:0:0",
"print.font_ingredient": "0:0:0",
"print.font_item_note": "0:0:0",
"print.font_order_note": "0:1:0",
}
# SIZE byte values (ESC ! base, no bold bit):
# 0 = normal
# 16 = double-height (bit4)
# 32 = double-width (bit5)
# 48 = double-height + double-width (bits 4+5)
# Bold applied via ESC E, caps applied in software before encoding.
def _decode_font(value: str) -> tuple[int, bool, bool]:
"""Parse 'SIZE:BOLD:CAPS' string → (esc_bang_byte, bold_flag, caps_flag)."""
try:
parts = str(value).split(":")
size = int(parts[0])
bold = len(parts) > 1 and parts[1] == "1"
caps = len(parts) > 2 and parts[2] == "1"
return size, bold, caps
except (ValueError, AttributeError):
return 0, False, False
def _load_print_settings(db: Session) -> dict:
rows = db.query(PosSettings).filter(
PosSettings.key.in_(_PRINT_SETTING_KEYS)
).all()
settings = dict(_PRINT_SETTING_DEFAULTS)
for row in rows:
settings[row.key] = row.value
return settings
def _divider(p: Network, style: str = "dash"):
char = _DIVIDER_CHARS.get(style, "-")
p._raw(b'\x1b\x61\x00')
if char:
p._raw(_gr(char * LINE_WIDTH + "\n"))
else:
p._raw(b'\n')
def _item_line(name: str, qty: int, line_width: int = LINE_WIDTH) -> str:
"""Build a dot-leader line ending with 'xN'.
line_width must reflect the effective width at the chosen font size
(double-width fonts halve the available char count to 24)."""
suffix = f"x{qty}"
available = line_width - len(name) - len(suffix)
if available < 2:
# Name alone is too long — put qty on same line with a single space
return f"{name} {suffix}"
dots = (". " * ((available // 2) + 1))[:available]
return f"{name}{dots}{suffix}"
def _apply_font(p: Network, size: int, bold: bool):
p._raw(bytes([0x1b, 0x21, size]))
p._raw(b'\x1b\x45\x01' if bold else b'\x1b\x45\x00')
def _reset_font(p: Network):
p._raw(b'\x1b\x21\x00')
p._raw(b'\x1b\x45\x00')
def _print_line(p: Network, text: str, size: int, bold: bool, caps: bool,
align: bytes = b'\x1b\x61\x00'):
"""Apply font, optionally capitalize, print text + newline, reset font."""
p._raw(align)
_apply_font(p, size, bold)
out = text.upper() if caps else text
_raw_text(p, out + "\n")
_reset_font(p)
def _greek_date(dt: datetime.datetime) -> str:
"""Return date/time string in Greek format: HH:MM DD-MM-YYYY"""
return dt.strftime("%H:%M %d-%m-%Y")
def check_printer(ip: str, port: int) -> bool:
"""Quick TCP connect check — no data sent."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
s.connect((ip, port))
s.close()
return True
except OSError:
return False
def is_spoof_mode() -> bool:
"""Stateless check — opens its own DB session. For use outside route_and_print."""
db = SessionLocal()
try:
return _is_spoof_mode(db)
finally:
db.close()
def send_test_print(ip: str, port: int, name: str) -> Tuple[bool, str]:
if is_spoof_mode():
logger.info("Spoof printing ON — dropping test print for %s", name)
return True, ""
try:
p = _get_printer(ip, port)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, f"TEST — {name}\n")
p._raw(b'\x1b\x21\x00')
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
_raw_text(p, f"{now}\n")
p._raw(b'\n\n\n')
p.cut()
p.close()
return True, ""
except Exception as e:
logger.error("Test print failed for %s:%s%s", ip, port, e)
return False, str(e)
def send_test_order_print(ip: str, port: int, db: Session) -> Tuple[bool, str]:
"""Print a fake order using the current font/layout settings — for settings preview."""
if _is_spoof_mode(db):
logger.info("Spoof printing ON — dropping test order print")
return True, ""
# ── Fake data structures (no DB writes) ──────────────────────────────────
class _Table:
label = "O2"
number = 2
class _User:
nickname = "bonamin"
username = "bonamin"
class _Order:
id = 99
table = _Table()
opener = _User()
table_id = 2
opened_by = 1
notes = "Χωρις καψαλισμα παρακαλω"
class _Item:
def __init__(self, product_id, quantity, selected_options, removed_ingredients, notes):
self.product_id = product_id
self.quantity = quantity
self.selected_options = selected_options
self.removed_ingredients = removed_ingredients
self.notes = notes
import json as _json
items = [
# Item 1: Freddo Espresso — quick options + preference + note
_Item(
product_id=1001,
quantity=2,
selected_options=_json.dumps([
{"name": "Διπλος", "price_delta": 0.5, "type": "quick"},
{"name": "Εξτρα ζαχαρη", "price_delta": 0.0, "type": "quick"},
{"name": "Παγωμενος", "price_delta": 0.0, "type": "quick"},
{"name": "Γαλα", "price_delta": 0.0, "type": "pref"},
{"name": "Βρωμης", "price_delta": 0.3, "type": "pref_sub"},
]),
removed_ingredients=None,
notes="Πολυ κρυο παρακαλω",
),
# Item 2: Club Sandwich — extra with sub + removed ingredients
_Item(
product_id=1002,
quantity=1,
selected_options=_json.dumps([
{"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"},
{"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"},
{"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"},
{"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"},
{"name": "Ψωμι", "price_delta": 0.0, "type": "pref"},
{"name": "Σικαλεως", "price_delta": 0.0, "type": "pref_sub"},
]),
removed_ingredients=_json.dumps(["Ντοματα", "Μουσταρδα"]),
notes=None,
),
# Item 3: Margherita — quick + extra + removed
_Item(
product_id=1003,
quantity=3,
selected_options=_json.dumps([
{"name": "Well Done", "price_delta": 0.0, "type": "quick"},
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
{"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"},
]),
removed_ingredients=_json.dumps(["Ελιες", "Κρεμμυδι"]),
notes=None,
),
]
# Patch product lookup so _print_kitchen_ticket gets real names
_FAKE_NAMES = {1001: "Freddo Espresso", 1002: "Club Sandwich", 1003: "Margherita Pizza"}
# Monkey-patch db.query for Product only inside this call
_orig_query = db.query
class _FakeQuery:
def __init__(self, model):
self._model = model
self._filter_id = None
def filter(self, *args):
# extract id from the filter expression value
for arg in args:
try:
self._filter_id = arg.right.value
except Exception:
pass
return self
def first(self):
if self._model.__name__ == "Product" and self._filter_id in _FAKE_NAMES:
class _P:
name = _FAKE_NAMES[self._filter_id]
return _P()
return _orig_query(self._model).filter(self._model.id == self._filter_id).first()
class _PatchedDB:
def query(self, model):
from models.product import Product as _Product
if model is _Product:
return _FakeQuery(model)
return _orig_query(model)
# delegate everything else to real db
def __getattr__(self, name):
return getattr(db, name)
try:
p = _get_printer(ip, port)
_print_kitchen_ticket(p, _Order(), items, _PatchedDB())
p.close()
return True, ""
except Exception as e:
logger.error("Test order print failed for %s:%s%s", ip, port, e)
return False, str(e)
# ── Receipt formatting ───────────────────────────────────────────────────────
def _parse_options(item: OrderItem) -> dict:
"""
Parse selected_options JSON into grouped dict:
{ 'quick': [(name, qty)], 'pref': [(name, sub|None)],
'extra': [(name, sub|None, qty)], 'unknown': [name] }
Falls back gracefully when type tags are absent (old data).
"""
result = {"quick": [], "pref": [], "extra": [], "unknown": []}
if not item.selected_options:
return result
try:
raw = json.loads(item.selected_options)
except (json.JSONDecodeError, TypeError):
return result
if not isinstance(raw, list):
return result
i = 0
while i < len(raw):
entry = raw[i]
if not isinstance(entry, dict):
i += 1
continue
name = entry.get("name") or ""
etype = entry.get("type")
# Peek at next entry to collect sub-choice
sub = None
if i + 1 < len(raw):
nxt = raw[i + 1]
if isinstance(nxt, dict) and nxt.get("type") in ("pref_sub", "extra_sub"):
sub = nxt.get("name") or ""
i += 1 # consume sub
if etype == "quick":
# Collapse repeated quick entries into a single (name, qty) tuple
existing = next((q for q in result["quick"] if q[0] == name), None)
if existing:
result["quick"][result["quick"].index(existing)] = (name, existing[1] + 1)
else:
result["quick"].append((name, 1))
elif etype == "pref":
result["pref"].append((name, sub))
elif etype == "extra":
# Collapse repeated extra entries (same name+sub) → (name, sub, qty)
existing = next((e for e in result["extra"] if e[0] == name and e[1] == sub), None)
if existing:
result["extra"][result["extra"].index(existing)] = (name, sub, existing[2] + 1)
else:
result["extra"].append((name, sub, 1))
else:
# Legacy data without type tag — treat as unknown, display plainly
if name:
result["unknown"].append(name + (f" · {sub}" if sub else ""))
i += 1
return result
def _print_kitchen_ticket(p: Network, order: Order, items: List[OrderItem], db: Session):
cfg = _load_print_settings(db)
mode = cfg.get("print.ticket_mode", "detailed")
div = cfg.get("print.divider_style", "dash")
compact = (mode == "compact")
sz_ord, b_ord, c_ord = _decode_font(cfg["print.font_order_number"])
sz_meta, b_meta, c_meta = _decode_font(cfg["print.font_meta"])
sz_item, b_item, c_item = _decode_font(cfg["print.font_item_name"])
sz_qk, b_qk, c_qk = _decode_font(cfg["print.font_quick"])
sz_pr, b_pr, c_pr = _decode_font(cfg["print.font_pref"])
sz_ex, b_ex, c_ex = _decode_font(cfg["print.font_extra"])
sz_ing, b_ing, c_ing = _decode_font(cfg["print.font_ingredient"])
sz_note, b_note, c_note = _decode_font(cfg["print.font_item_note"])
sz_onote,b_onote,c_onote= _decode_font(cfg["print.font_order_note"])
# Resolve display names
table_name = order.table.label or str(order.table.number) if order.table else str(order.table_id)
waiter_nick = (order.opener.nickname or order.opener.username) if order.opener else str(order.opened_by)
now_str = _greek_date(datetime.datetime.now())
# ── COMPACT header — single line ────────────────────────────────────────
if compact:
p._raw(b'\x1b\x61\x00')
_apply_font(p, sz_ord, b_ord)
header = f"Παρ. #{order.id} | Τρ. {table_name} | {now_str} | {waiter_nick}"
_raw_text(p, (header.upper() if c_ord else header) + "\n")
_reset_font(p)
_divider(p, div)
# ── DETAILED header ──────────────────────────────────────────────────────
else:
_print_line(p, f"Παραγγελια #{order.id}", sz_ord, b_ord, c_ord,
align=b'\x1b\x61\x01')
_divider(p, div)
p._raw(b'\x1b\x61\x00')
_apply_font(p, sz_meta, b_meta)
_raw_text(p, ("ΤΡΑΠΕΖΙ:" if c_meta else "Τραπεζι:") + f" Τραπεζι {table_name}\n")
_raw_text(p, ("ΗΜΕΡΟΜΗΝΙΑ:" if c_meta else "Ημερομηνια:") + f" {now_str}\n")
_raw_text(p, ("ΣΕΡΒΙΤΟΡΟΣ:" if c_meta else "Σερβιτορος:") + f" {waiter_nick}\n")
_reset_font(p)
_divider(p, div)
# ── Items ────────────────────────────────────────────────────────────────
# Double-width fonts halve the effective character width
item_line_width = LINE_WIDTH // 2 if sz_item in (32, 48) else LINE_WIDTH
for item in items:
product = db.query(Product).filter(Product.id == item.product_id).first()
raw_name = product.name if product else f"Product #{item.product_id}"
item_name = raw_name.upper() if c_item else raw_name
p._raw(b'\x1b\x61\x00')
_apply_font(p, sz_item, b_item)
_raw_text(p, _item_line(item_name, item.quantity, item_line_width) + "\n")
_reset_font(p)
opts = _parse_options(item)
# Quick options (* marker)
if opts["quick"]:
if compact:
parts = []
for name, qty in opts["quick"]:
n = name.upper() if c_qk else name
parts.append(f"{n} x{qty}" if qty > 1 else n)
_apply_font(p, sz_qk, b_qk)
_raw_text(p, "* " + " | ".join(parts) + "\n")
_reset_font(p)
else:
for name, qty in opts["quick"]:
n = name.upper() if c_qk else name
line = f"* {n} x{qty}" if qty > 1 else f"* {n}"
_apply_font(p, sz_qk, b_qk)
_raw_text(p, line + "\n")
_reset_font(p)
# Preferences (> marker)
if opts["pref"]:
if compact:
parts = []
for name, sub in opts["pref"]:
n = name.upper() if c_pr else name
s = (sub.upper() if c_pr else sub) if sub else None
parts.append(f"{n} · {s}" if s else n)
_apply_font(p, sz_pr, b_pr)
_raw_text(p, "> " + " | ".join(parts) + "\n")
_reset_font(p)
else:
for name, sub in opts["pref"]:
n = name.upper() if c_pr else name
s = (sub.upper() if c_pr else sub) if sub else None
line = f"> {n} · {s}" if s else f"> {n}"
_apply_font(p, sz_pr, b_pr)
_raw_text(p, line + "\n")
_reset_font(p)
# Extras (+ marker)
if opts["extra"]:
if compact:
parts = []
for name, sub, qty in opts["extra"]:
n = name.upper() if c_ex else name
s = (sub.upper() if c_ex else sub) if sub else None
part = f"{n} · {s}" if s else n
if qty > 1:
part += f" · x{qty}"
parts.append(part)
_apply_font(p, sz_ex, b_ex)
_raw_text(p, "+ " + " | ".join(parts) + "\n")
_reset_font(p)
else:
for name, sub, qty in opts["extra"]:
n = name.upper() if c_ex else name
s = (sub.upper() if c_ex else sub) if sub else None
line = f"+ {n}"
if s:
line += f" · {s}"
if qty > 1:
line += f" · x{qty}"
_apply_font(p, sz_ex, b_ex)
_raw_text(p, line + "\n")
_reset_font(p)
# Legacy untagged options
for entry in opts["unknown"]:
_apply_font(p, sz_ex, b_ex)
_raw_text(p, f"+ {entry}\n")
_reset_font(p)
# Removed ingredients (- marker)
if item.removed_ingredients:
try:
removed = json.loads(item.removed_ingredients)
if removed:
names = [n.upper() if c_ing else n for n in removed]
joined = " · ".join(names)
_apply_font(p, sz_ing, b_ing)
_raw_text(p, f"- ΧΩΡΙΣ: {joined}\n")
_reset_font(p)
except (json.JSONDecodeError, TypeError):
pass
# Per-item note
if item.notes:
note_text = item.notes.upper() if c_note else item.notes
_apply_font(p, sz_note, b_note)
if compact:
_raw_text(p, f"! {note_text}\n")
else:
_raw_text(p, f"\n(!) {note_text}\n\n")
_reset_font(p)
# Blank line between items in detailed mode
if not compact:
p._raw(b'\n')
_divider(p, div)
# Order-level notes
if order.notes:
note_text = order.notes.upper() if c_onote else order.notes
_apply_font(p, sz_onote, b_onote)
_raw_text(p, f"Σημ: {note_text}\n")
_reset_font(p)
if not compact:
_divider(p, div)
# Footer (detailed only)
if not compact:
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, "Τελος Παραγγελιας\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\n\n\n')
p.cut()
# ── On-demand report / receipt prints ────────────────────────────────────────
def print_waiter_report(ip: str, port: int, report: dict, mode: str):
"""Print a waiter shift/period report. mode='simple'|'extensive'."""
if is_spoof_mode():
logger.info("Spoof printing ON — dropping waiter report print")
return
try:
p = _get_printer(ip, port)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, "ΑΝΑΦΟΡΑ ΣΕΡΒΙΤΟΡΟΥ\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x00')
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Σερβιτορος: {report['waiter_name']}\n")
_raw_text(p, f"Απο: {report['from_dt']}\n")
_raw_text(p, f"Εως: {report['to_dt']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Παραγγελιες: {report['orders']}\n")
_raw_text(p, f"Αντικειμενα: {report['items']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, f"ΣΥΝΟΛΟ: {report['total']:.2f}e\n")
p._raw(b'\x1b\x21\x00')
if mode == "extensive" and report.get("order_data"):
_divider(p)
p._raw(b'\x1b\x61\x00')
_raw_text(p, "ΑΝΑΛΥΤΙΚΑ\n")
_divider(p)
for od in report["order_data"]:
# Build right-aligned total: "HH:MM - HH:MM - TABLE . . . 9.99e"
time_open = od.get("time_open", "")
time_close = od.get("time_close", "")
table = od["table"]
value = f"{od['total']:.2f}e"
times_part = f"{time_open} - {time_close}" if time_close else time_open
prefix = f"{times_part} - {table}"
gap = LINE_WIDTH - len(prefix) - len(value)
if gap < 3:
line = f"{prefix} {value}"
else:
dots = (". " * ((gap // 2) + 1))[:gap]
line = f"{prefix}{dots}{value}"
_raw_text(p, line + "\n")
p._raw(b'\n\n\n')
p.cut()
p.close()
except Exception as e:
logger.error("print_waiter_report failed for %s:%s%s", ip, port, e)
def print_printer_report(ip: str, port: int, report: dict, mode: str):
"""Print a per-printer totals report. mode='simple'|'extensive'."""
if is_spoof_mode():
logger.info("Spoof printing ON — dropping printer report print")
return
try:
p = _get_printer(ip, port)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, "ΑΝΑΦΟΡΑ ΕΚΤΥΠΩΤΗ\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x00')
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Εκτυπωτης: {report['printer_name']}\n")
_raw_text(p, f"Απο: {report['from_dt']}\n")
_raw_text(p, f"Εως: {report['to_dt']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Εργασιες εκτ.: {report['print_jobs']}\n")
_raw_text(p, f"Παραγγελιες: {report['orders']}\n")
_raw_text(p, f"Αντικειμενα: {report['items']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, f"ΣΥΝΟΛΟ: {report['total']:.2f}e\n")
p._raw(b'\x1b\x21\x00')
if mode == "extensive" and report.get("order_data"):
_divider(p)
p._raw(b'\x1b\x61\x00')
_raw_text(p, "ΑΝΑΛΥΤΙΚΑ\n")
_divider(p)
for od in report["order_data"]:
# Header line: "HH:MM - TABLE . . . . 9.99e"
prefix = f"{od['time']} - {od['table']}"
value = f"{od['total']:.2f}e"
gap = LINE_WIDTH - len(prefix) - len(value)
if gap < 3:
header_line = f"{prefix} {value}"
else:
dots = (". " * ((gap // 2) + 1))[:gap]
header_line = f"{prefix}{dots}{value}"
p._raw(b'\x1b\x45\x01')
_raw_text(p, header_line + "\n")
p._raw(b'\x1b\x45\x00')
# Indented items
for item in od.get("items", []):
_raw_text(p, f" {item['quantity']} x {item['name']}\n")
p._raw(b'\n\n\n')
p.cut()
p.close()
except Exception as e:
logger.error("print_printer_report failed for %s:%s%s", ip, port, e)
def print_order_receipt(ip: str, port: int, receipt: dict):
"""Print a manager-triggered order receipt."""
if is_spoof_mode():
logger.info("Spoof printing ON — dropping order receipt print")
return
try:
p = _get_printer(ip, port)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, f"ΠΑΡΑΓΓΕΛΙΑ #{receipt['order_id']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x00')
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Τραπεζι: {receipt['table_name']}\n")
_raw_text(p, f"Σερβιτορος: {receipt['waiter_name']}\n")
_raw_text(p, f"Ανοιχτηκε: {receipt['opened_at']}\n")
if receipt.get("closed_at"):
_raw_text(p, f"Εκλεισε: {receipt['closed_at']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
for item in receipt.get("items", []):
p._raw(b'\x1b\x21\x10')
_raw_text(p, _item_line(item["name"], item["quantity"]) + "\n")
p._raw(b'\x1b\x21\x00')
_raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n")
_divider(p)
if receipt.get("notes"):
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Σημ: {receipt['notes']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, f"ΣΥΝΟΛΟ: {receipt['total']:.2f}e\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\n\n\n')
p.cut()
p.close()
except Exception as e:
logger.error("print_order_receipt failed for %s:%s%s", ip, port, e)
def print_order_synopsis(ip: str, port: int, synopsis: dict):
"""Print a waiter-triggered order synopsis (not a kitchen ticket)."""
if is_spoof_mode():
logger.info("Spoof printing ON — dropping order synopsis print")
return
try:
p = _get_printer(ip, port)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, "ΣΥΝΟΨΗ ΠΑΡΑΓΓΕΛΙΑΣ\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
p._raw(b'\x1b\x61\x00')
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Τραπεζι: {synopsis['table_name']}\n")
_raw_text(p, f"Σερβιτορος: {synopsis['waiter_name']}\n")
_raw_text(p, f"Ωρα: {synopsis['opened_at']}\n")
p._raw(b'\x1b\x21\x00')
_divider(p)
paid_items = [i for i in synopsis.get("items", []) if i["status"] == "paid"]
active_items = [i for i in synopsis.get("items", []) if i["status"] == "active"]
if active_items:
p._raw(b'\x1b\x21\x10')
_raw_text(p, "ΕΚΚΡΕΜΗ:\n")
p._raw(b'\x1b\x21\x00')
for item in active_items:
_raw_text(p, f" {_item_line(item['name'], item['quantity'])}\n")
_raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n")
_divider(p)
if paid_items:
p._raw(b'\x1b\x21\x10')
_raw_text(p, "ΠΛΗΡΩΜΕΝΑ:\n")
p._raw(b'\x1b\x21\x00')
for item in paid_items:
_raw_text(p, f" {_item_line(item['name'], item['quantity'])}\n")
_raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n")
_divider(p)
p._raw(b'\x1b\x61\x01')
p._raw(b'\x1b\x21\x30')
_raw_text(p, f"ΣΥΝΟΛΟ: {synopsis['total']:.2f}e\n")
if synopsis.get('paid_total', 0) > 0:
p._raw(b'\x1b\x21\x10')
_raw_text(p, f"Πληρωμενο: {synopsis['paid_total']:.2f}e\n")
_raw_text(p, f"Εκκρεμει: {synopsis['remaining']:.2f}e\n")
p._raw(b'\x1b\x21\x00')
p._raw(b'\n\n\n')
p.cut()
p.close()
except Exception as e:
logger.error("print_order_synopsis failed for %s:%s%s", ip, port, e)
# ── Routing logic ────────────────────────────────────────────────────────────
def route_and_print(order_id: int, item_ids: List[int]):
"""
Background task: group items by printer zone, send to each printer.
Printer failures are logged but never raise — order is already saved.
"""
db: Session = SessionLocal()
try:
_do_route_and_print(order_id, item_ids, db)
except Exception as e:
logger.exception("Unexpected error in route_and_print for order %s: %s", order_id, e)
finally:
db.close()
def route_and_print_sync(order_id: int, item_ids: List[int], db: Session) -> List[dict]:
"""
Synchronous variant used when the caller needs print results.
Returns a list of per-printer result dicts:
{ printer_name, success, error }
"""
return _do_route_and_print(order_id, item_ids, db)
def _is_spoof_mode(db: Session) -> bool:
row = db.query(PosSettings).filter(PosSettings.key == "dev.spoof_printing").first()
return row is not None and row.value == "true"
def _do_route_and_print(order_id: int, item_ids: List[int], db: Session) -> List[dict]:
if _is_spoof_mode(db):
logger.info("Spoof printing ON — dropping print job for order %s", order_id)
for item_id in item_ids:
item = db.query(OrderItem).filter(OrderItem.id == item_id).first()
if item:
item.printed = True
db.commit()
return [{"printer_name": "spoof", "success": True, "error": None}]
results = []
order = db.query(Order).filter(Order.id == order_id).first()
if not order:
logger.error("route_and_print: order %s not found", order_id)
return results
items = db.query(OrderItem).filter(OrderItem.id.in_(item_ids)).all()
# Group items by printer zone
zone_map: dict[int, List[OrderItem]] = {}
unzoned: List[OrderItem] = []
for item in items:
product = db.query(Product).filter(Product.id == item.product_id).first()
if product and product.printer_zone_id:
zone_map.setdefault(product.printer_zone_id, []).append(item)
else:
unzoned.append(item)
if unzoned:
logger.warning("order %s has %d item(s) with no printer zone — skipped", order_id, len(unzoned))
for printer_id, zone_items in zone_map.items():
printer = db.query(Printer).filter(Printer.id == printer_id, Printer.is_active == True).first()
if not printer:
logger.warning("Printer %s not found or inactive", printer_id)
results.append({"printer_name": f"#{printer_id}", "success": False, "error": "Printer not found or inactive"})
continue
success = False
error_msg = None
try:
p = _get_printer(printer.ip_address, printer.port)
_print_kitchen_ticket(p, order, zone_items, db)
p.close()
success = True
for item in zone_items:
item.printed = True
db.commit()
except Exception as e:
error_msg = str(e)
logger.error("Print failed for printer %s (%s:%s): %s", printer.name, printer.ip_address, printer.port, e)
log = PrintLog(
order_id=order_id,
printer_id=printer_id,
item_ids=json.dumps([i.id for i in zone_items]),
success=success,
error_message=error_msg,
)
db.add(log)
db.commit()
results.append({"printer_name": printer.name, "success": success, "error": error_msg})
return results