""" ESC/POS printer service — Jolimark TP850UE confirmed configuration. Key findings from printer testing: - Code page n=29 (CP737) is the only working Greek code page on this model. - All Greek text MUST be sent as raw CP737 bytes via p._raw() — never p.text(). - Set the code page immediately after connecting, before any output. - 80mm paper = 48 chars wide at standard font. Double-height keeps 48-char width. """ import json import logging import socket import datetime from typing import Tuple, List from escpos.printer import Network from sqlalchemy.orm import Session from database import SessionLocal from models.order import Order, OrderItem, PrintLog from models.printer import Printer from models.product import Product from models.settings import PosSettings logger = logging.getLogger(__name__) LINE_WIDTH = 48 PRINTER_TIMEOUT = 5 # ── Low-level helpers ──────────────────────────────────────────────────────── def _get_printer(ip: str, port: int) -> Network: p = Network(ip, port, timeout=PRINTER_TIMEOUT) p._raw(b'\x1b\x40') # ESC @ — reset printer p._raw(b'\x1b\x74\x1d') # ESC t 29 — select CP737 (Greek) — confirmed n=29 return p def _gr(text: str) -> bytes: """Encode text to CP737 bytes. Replaces unknown chars instead of crashing.""" return text.encode('cp737', errors='replace') def _raw_text(p: Network, text: str): """Send text as raw CP737 bytes — the ONLY safe way to print Greek.""" p._raw(_gr(text)) _DIVIDER_CHARS = { "dash": "-", "equals": "=", "star": "*", "empty": "", } _PRINT_SETTING_KEYS = [ "print.ticket_mode", "print.divider_style", "print.font_order_number", "print.font_meta", "print.font_item_name", "print.font_quick", "print.font_pref", "print.font_extra", "print.font_ingredient", "print.font_item_note", "print.font_order_note", ] _PRINT_SETTING_DEFAULTS = { "print.ticket_mode": "detailed", "print.divider_style": "dash", "print.font_order_number": "48:1:0", "print.font_meta": "0:0:0", "print.font_item_name": "16:1:0", "print.font_quick": "0:0:0", "print.font_pref": "0:0:0", "print.font_extra": "0:0:0", "print.font_ingredient": "0:0:0", "print.font_item_note": "0:0:0", "print.font_order_note": "0:1:0", } # SIZE byte values (ESC ! base, no bold bit): # 0 = normal # 16 = double-height (bit4) # 32 = double-width (bit5) # 48 = double-height + double-width (bits 4+5) # Bold applied via ESC E, caps applied in software before encoding. def _decode_font(value: str) -> tuple[int, bool, bool]: """Parse 'SIZE:BOLD:CAPS' string → (esc_bang_byte, bold_flag, caps_flag).""" try: parts = str(value).split(":") size = int(parts[0]) bold = len(parts) > 1 and parts[1] == "1" caps = len(parts) > 2 and parts[2] == "1" return size, bold, caps except (ValueError, AttributeError): return 0, False, False def _load_print_settings(db: Session) -> dict: rows = db.query(PosSettings).filter( PosSettings.key.in_(_PRINT_SETTING_KEYS) ).all() settings = dict(_PRINT_SETTING_DEFAULTS) for row in rows: settings[row.key] = row.value return settings def _divider(p: Network, style: str = "dash"): char = _DIVIDER_CHARS.get(style, "-") p._raw(b'\x1b\x61\x00') if char: p._raw(_gr(char * LINE_WIDTH + "\n")) else: p._raw(b'\n') def _item_line(name: str, qty: int, line_width: int = LINE_WIDTH) -> str: """Build a dot-leader line ending with 'xN'. line_width must reflect the effective width at the chosen font size (double-width fonts halve the available char count to 24).""" suffix = f"x{qty}" available = line_width - len(name) - len(suffix) if available < 2: # Name alone is too long — put qty on same line with a single space return f"{name} {suffix}" dots = (". " * ((available // 2) + 1))[:available] return f"{name}{dots}{suffix}" def _apply_font(p: Network, size: int, bold: bool): p._raw(bytes([0x1b, 0x21, size])) p._raw(b'\x1b\x45\x01' if bold else b'\x1b\x45\x00') def _reset_font(p: Network): p._raw(b'\x1b\x21\x00') p._raw(b'\x1b\x45\x00') def _print_line(p: Network, text: str, size: int, bold: bool, caps: bool, align: bytes = b'\x1b\x61\x00'): """Apply font, optionally capitalize, print text + newline, reset font.""" p._raw(align) _apply_font(p, size, bold) out = text.upper() if caps else text _raw_text(p, out + "\n") _reset_font(p) def _greek_date(dt: datetime.datetime) -> str: """Return date/time string in Greek format: HH:MM DD-MM-YYYY""" return dt.strftime("%H:%M %d-%m-%Y") def check_printer(ip: str, port: int) -> bool: """Quick TCP connect check — no data sent.""" try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(2) s.connect((ip, port)) s.close() return True except OSError: return False def is_spoof_mode() -> bool: """Stateless check — opens its own DB session. For use outside route_and_print.""" db = SessionLocal() try: return _is_spoof_mode(db) finally: db.close() def send_test_print(ip: str, port: int, name: str) -> Tuple[bool, str]: if is_spoof_mode(): logger.info("Spoof printing ON — dropping test print for %s", name) return True, "" try: p = _get_printer(ip, port) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, f"TEST — {name}\n") p._raw(b'\x1b\x21\x00') now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") _raw_text(p, f"{now}\n") p._raw(b'\n\n\n') p.cut() p.close() return True, "" except Exception as e: logger.error("Test print failed for %s:%s — %s", ip, port, e) return False, str(e) def send_test_order_print(ip: str, port: int, db: Session) -> Tuple[bool, str]: """Print a fake order using the current font/layout settings — for settings preview.""" if _is_spoof_mode(db): logger.info("Spoof printing ON — dropping test order print") return True, "" # ── Fake data structures (no DB writes) ────────────────────────────────── class _Table: label = "O2" number = 2 class _User: nickname = "bonamin" username = "bonamin" class _Order: id = 99 table = _Table() opener = _User() table_id = 2 opened_by = 1 notes = "Χωρις καψαλισμα παρακαλω" class _Item: def __init__(self, product_id, quantity, selected_options, removed_ingredients, notes): self.product_id = product_id self.quantity = quantity self.selected_options = selected_options self.removed_ingredients = removed_ingredients self.notes = notes import json as _json items = [ # Item 1: Freddo Espresso — quick options + preference + note _Item( product_id=1001, quantity=2, selected_options=_json.dumps([ {"name": "Διπλος", "price_delta": 0.5, "type": "quick"}, {"name": "Εξτρα ζαχαρη", "price_delta": 0.0, "type": "quick"}, {"name": "Παγωμενος", "price_delta": 0.0, "type": "quick"}, {"name": "Γαλα", "price_delta": 0.0, "type": "pref"}, {"name": "Βρωμης", "price_delta": 0.3, "type": "pref_sub"}, ]), removed_ingredients=None, notes="Πολυ κρυο παρακαλω", ), # Item 2: Club Sandwich — extra with sub + removed ingredients _Item( product_id=1002, quantity=1, selected_options=_json.dumps([ {"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"}, {"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"}, {"name": "Extra Bacon", "price_delta": 1.5, "type": "extra"}, {"name": "Τραγανο", "price_delta": 0.0, "type": "extra_sub"}, {"name": "Ψωμι", "price_delta": 0.0, "type": "pref"}, {"name": "Σικαλεως", "price_delta": 0.0, "type": "pref_sub"}, ]), removed_ingredients=_json.dumps(["Ντοματα", "Μουσταρδα"]), notes=None, ), # Item 3: Margherita — quick + extra + removed _Item( product_id=1003, quantity=3, selected_options=_json.dumps([ {"name": "Well Done", "price_delta": 0.0, "type": "quick"}, {"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"}, {"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"}, {"name": "Extra Τυρι", "price_delta": 1.0, "type": "extra"}, ]), removed_ingredients=_json.dumps(["Ελιες", "Κρεμμυδι"]), notes=None, ), ] # Patch product lookup so _print_kitchen_ticket gets real names _FAKE_NAMES = {1001: "Freddo Espresso", 1002: "Club Sandwich", 1003: "Margherita Pizza"} # Monkey-patch db.query for Product only inside this call _orig_query = db.query class _FakeQuery: def __init__(self, model): self._model = model self._filter_id = None def filter(self, *args): # extract id from the filter expression value for arg in args: try: self._filter_id = arg.right.value except Exception: pass return self def first(self): if self._model.__name__ == "Product" and self._filter_id in _FAKE_NAMES: class _P: name = _FAKE_NAMES[self._filter_id] return _P() return _orig_query(self._model).filter(self._model.id == self._filter_id).first() class _PatchedDB: def query(self, model): from models.product import Product as _Product if model is _Product: return _FakeQuery(model) return _orig_query(model) # delegate everything else to real db def __getattr__(self, name): return getattr(db, name) try: p = _get_printer(ip, port) _print_kitchen_ticket(p, _Order(), items, _PatchedDB()) p.close() return True, "" except Exception as e: logger.error("Test order print failed for %s:%s — %s", ip, port, e) return False, str(e) # ── Receipt formatting ─────────────────────────────────────────────────────── def _parse_options(item: OrderItem) -> dict: """ Parse selected_options JSON into grouped dict: { 'quick': [(name, qty)], 'pref': [(name, sub|None)], 'extra': [(name, sub|None, qty)], 'unknown': [name] } Falls back gracefully when type tags are absent (old data). """ result = {"quick": [], "pref": [], "extra": [], "unknown": []} if not item.selected_options: return result try: raw = json.loads(item.selected_options) except (json.JSONDecodeError, TypeError): return result if not isinstance(raw, list): return result i = 0 while i < len(raw): entry = raw[i] if not isinstance(entry, dict): i += 1 continue name = entry.get("name") or "" etype = entry.get("type") # Peek at next entry to collect sub-choice sub = None if i + 1 < len(raw): nxt = raw[i + 1] if isinstance(nxt, dict) and nxt.get("type") in ("pref_sub", "extra_sub"): sub = nxt.get("name") or "" i += 1 # consume sub if etype == "quick": # Collapse repeated quick entries into a single (name, qty) tuple existing = next((q for q in result["quick"] if q[0] == name), None) if existing: result["quick"][result["quick"].index(existing)] = (name, existing[1] + 1) else: result["quick"].append((name, 1)) elif etype == "pref": result["pref"].append((name, sub)) elif etype == "extra": # Collapse repeated extra entries (same name+sub) → (name, sub, qty) existing = next((e for e in result["extra"] if e[0] == name and e[1] == sub), None) if existing: result["extra"][result["extra"].index(existing)] = (name, sub, existing[2] + 1) else: result["extra"].append((name, sub, 1)) else: # Legacy data without type tag — treat as unknown, display plainly if name: result["unknown"].append(name + (f" · {sub}" if sub else "")) i += 1 return result def _print_kitchen_ticket(p: Network, order: Order, items: List[OrderItem], db: Session): cfg = _load_print_settings(db) mode = cfg.get("print.ticket_mode", "detailed") div = cfg.get("print.divider_style", "dash") compact = (mode == "compact") sz_ord, b_ord, c_ord = _decode_font(cfg["print.font_order_number"]) sz_meta, b_meta, c_meta = _decode_font(cfg["print.font_meta"]) sz_item, b_item, c_item = _decode_font(cfg["print.font_item_name"]) sz_qk, b_qk, c_qk = _decode_font(cfg["print.font_quick"]) sz_pr, b_pr, c_pr = _decode_font(cfg["print.font_pref"]) sz_ex, b_ex, c_ex = _decode_font(cfg["print.font_extra"]) sz_ing, b_ing, c_ing = _decode_font(cfg["print.font_ingredient"]) sz_note, b_note, c_note = _decode_font(cfg["print.font_item_note"]) sz_onote,b_onote,c_onote= _decode_font(cfg["print.font_order_note"]) # Resolve display names table_name = order.table.label or str(order.table.number) if order.table else str(order.table_id) waiter_nick = (order.opener.nickname or order.opener.username) if order.opener else str(order.opened_by) now_str = _greek_date(datetime.datetime.now()) # ── COMPACT header — single line ──────────────────────────────────────── if compact: p._raw(b'\x1b\x61\x00') _apply_font(p, sz_ord, b_ord) header = f"Παρ. #{order.id} | Τρ. {table_name} | {now_str} | {waiter_nick}" _raw_text(p, (header.upper() if c_ord else header) + "\n") _reset_font(p) _divider(p, div) # ── DETAILED header ────────────────────────────────────────────────────── else: _print_line(p, f"Παραγγελια #{order.id}", sz_ord, b_ord, c_ord, align=b'\x1b\x61\x01') _divider(p, div) p._raw(b'\x1b\x61\x00') _apply_font(p, sz_meta, b_meta) _raw_text(p, ("ΤΡΑΠΕΖΙ:" if c_meta else "Τραπεζι:") + f" Τραπεζι {table_name}\n") _raw_text(p, ("ΗΜΕΡΟΜΗΝΙΑ:" if c_meta else "Ημερομηνια:") + f" {now_str}\n") _raw_text(p, ("ΣΕΡΒΙΤΟΡΟΣ:" if c_meta else "Σερβιτορος:") + f" {waiter_nick}\n") _reset_font(p) _divider(p, div) # ── Items ──────────────────────────────────────────────────────────────── # Double-width fonts halve the effective character width item_line_width = LINE_WIDTH // 2 if sz_item in (32, 48) else LINE_WIDTH for item in items: product = db.query(Product).filter(Product.id == item.product_id).first() raw_name = product.name if product else f"Product #{item.product_id}" item_name = raw_name.upper() if c_item else raw_name p._raw(b'\x1b\x61\x00') _apply_font(p, sz_item, b_item) _raw_text(p, _item_line(item_name, item.quantity, item_line_width) + "\n") _reset_font(p) opts = _parse_options(item) # Quick options (* marker) if opts["quick"]: if compact: parts = [] for name, qty in opts["quick"]: n = name.upper() if c_qk else name parts.append(f"{n} x{qty}" if qty > 1 else n) _apply_font(p, sz_qk, b_qk) _raw_text(p, "* " + " | ".join(parts) + "\n") _reset_font(p) else: for name, qty in opts["quick"]: n = name.upper() if c_qk else name line = f"* {n} x{qty}" if qty > 1 else f"* {n}" _apply_font(p, sz_qk, b_qk) _raw_text(p, line + "\n") _reset_font(p) # Preferences (> marker) if opts["pref"]: if compact: parts = [] for name, sub in opts["pref"]: n = name.upper() if c_pr else name s = (sub.upper() if c_pr else sub) if sub else None parts.append(f"{n} · {s}" if s else n) _apply_font(p, sz_pr, b_pr) _raw_text(p, "> " + " | ".join(parts) + "\n") _reset_font(p) else: for name, sub in opts["pref"]: n = name.upper() if c_pr else name s = (sub.upper() if c_pr else sub) if sub else None line = f"> {n} · {s}" if s else f"> {n}" _apply_font(p, sz_pr, b_pr) _raw_text(p, line + "\n") _reset_font(p) # Extras (+ marker) if opts["extra"]: if compact: parts = [] for name, sub, qty in opts["extra"]: n = name.upper() if c_ex else name s = (sub.upper() if c_ex else sub) if sub else None part = f"{n} · {s}" if s else n if qty > 1: part += f" · x{qty}" parts.append(part) _apply_font(p, sz_ex, b_ex) _raw_text(p, "+ " + " | ".join(parts) + "\n") _reset_font(p) else: for name, sub, qty in opts["extra"]: n = name.upper() if c_ex else name s = (sub.upper() if c_ex else sub) if sub else None line = f"+ {n}" if s: line += f" · {s}" if qty > 1: line += f" · x{qty}" _apply_font(p, sz_ex, b_ex) _raw_text(p, line + "\n") _reset_font(p) # Legacy untagged options for entry in opts["unknown"]: _apply_font(p, sz_ex, b_ex) _raw_text(p, f"+ {entry}\n") _reset_font(p) # Removed ingredients (- marker) if item.removed_ingredients: try: removed = json.loads(item.removed_ingredients) if removed: names = [n.upper() if c_ing else n for n in removed] joined = " · ".join(names) _apply_font(p, sz_ing, b_ing) _raw_text(p, f"- ΧΩΡΙΣ: {joined}\n") _reset_font(p) except (json.JSONDecodeError, TypeError): pass # Per-item note if item.notes: note_text = item.notes.upper() if c_note else item.notes _apply_font(p, sz_note, b_note) if compact: _raw_text(p, f"! {note_text}\n") else: _raw_text(p, f"\n(!) {note_text}\n\n") _reset_font(p) # Blank line between items in detailed mode if not compact: p._raw(b'\n') _divider(p, div) # Order-level notes if order.notes: note_text = order.notes.upper() if c_onote else order.notes _apply_font(p, sz_onote, b_onote) _raw_text(p, f"Σημ: {note_text}\n") _reset_font(p) if not compact: _divider(p, div) # Footer (detailed only) if not compact: p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, "Τελος Παραγγελιας\n") p._raw(b'\x1b\x21\x00') p._raw(b'\n\n\n') p.cut() # ── On-demand report / receipt prints ──────────────────────────────────────── def print_waiter_report(ip: str, port: int, report: dict, mode: str): """Print a waiter shift/period report. mode='simple'|'extensive'.""" if is_spoof_mode(): logger.info("Spoof printing ON — dropping waiter report print") return try: p = _get_printer(ip, port) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, "ΑΝΑΦΟΡΑ ΣΕΡΒΙΤΟΡΟΥ\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x00') p._raw(b'\x1b\x21\x10') _raw_text(p, f"Σερβιτορος: {report['waiter_name']}\n") _raw_text(p, f"Απο: {report['from_dt']}\n") _raw_text(p, f"Εως: {report['to_dt']}\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x21\x10') _raw_text(p, f"Παραγγελιες: {report['orders']}\n") _raw_text(p, f"Αντικειμενα: {report['items']}\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, f"ΣΥΝΟΛΟ: {report['total']:.2f}e\n") p._raw(b'\x1b\x21\x00') if mode == "extensive" and report.get("order_data"): _divider(p) p._raw(b'\x1b\x61\x00') _raw_text(p, "ΑΝΑΛΥΤΙΚΑ\n") _divider(p) for od in report["order_data"]: # Build right-aligned total: "HH:MM - HH:MM - TABLE . . . 9.99e" time_open = od.get("time_open", "") time_close = od.get("time_close", "") table = od["table"] value = f"{od['total']:.2f}e" times_part = f"{time_open} - {time_close}" if time_close else time_open prefix = f"{times_part} - {table}" gap = LINE_WIDTH - len(prefix) - len(value) if gap < 3: line = f"{prefix} {value}" else: dots = (". " * ((gap // 2) + 1))[:gap] line = f"{prefix}{dots}{value}" _raw_text(p, line + "\n") p._raw(b'\n\n\n') p.cut() p.close() except Exception as e: logger.error("print_waiter_report failed for %s:%s — %s", ip, port, e) def print_printer_report(ip: str, port: int, report: dict, mode: str): """Print a per-printer totals report. mode='simple'|'extensive'.""" if is_spoof_mode(): logger.info("Spoof printing ON — dropping printer report print") return try: p = _get_printer(ip, port) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, "ΑΝΑΦΟΡΑ ΕΚΤΥΠΩΤΗ\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x00') p._raw(b'\x1b\x21\x10') _raw_text(p, f"Εκτυπωτης: {report['printer_name']}\n") _raw_text(p, f"Απο: {report['from_dt']}\n") _raw_text(p, f"Εως: {report['to_dt']}\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x21\x10') _raw_text(p, f"Εργασιες εκτ.: {report['print_jobs']}\n") _raw_text(p, f"Παραγγελιες: {report['orders']}\n") _raw_text(p, f"Αντικειμενα: {report['items']}\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, f"ΣΥΝΟΛΟ: {report['total']:.2f}e\n") p._raw(b'\x1b\x21\x00') if mode == "extensive" and report.get("order_data"): _divider(p) p._raw(b'\x1b\x61\x00') _raw_text(p, "ΑΝΑΛΥΤΙΚΑ\n") _divider(p) for od in report["order_data"]: # Header line: "HH:MM - TABLE . . . . 9.99e" prefix = f"{od['time']} - {od['table']}" value = f"{od['total']:.2f}e" gap = LINE_WIDTH - len(prefix) - len(value) if gap < 3: header_line = f"{prefix} {value}" else: dots = (". " * ((gap // 2) + 1))[:gap] header_line = f"{prefix}{dots}{value}" p._raw(b'\x1b\x45\x01') _raw_text(p, header_line + "\n") p._raw(b'\x1b\x45\x00') # Indented items for item in od.get("items", []): _raw_text(p, f" {item['quantity']} x {item['name']}\n") p._raw(b'\n\n\n') p.cut() p.close() except Exception as e: logger.error("print_printer_report failed for %s:%s — %s", ip, port, e) def print_order_receipt(ip: str, port: int, receipt: dict): """Print a manager-triggered order receipt.""" if is_spoof_mode(): logger.info("Spoof printing ON — dropping order receipt print") return try: p = _get_printer(ip, port) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, f"ΠΑΡΑΓΓΕΛΙΑ #{receipt['order_id']}\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x00') p._raw(b'\x1b\x21\x10') _raw_text(p, f"Τραπεζι: {receipt['table_name']}\n") _raw_text(p, f"Σερβιτορος: {receipt['waiter_name']}\n") _raw_text(p, f"Ανοιχτηκε: {receipt['opened_at']}\n") if receipt.get("closed_at"): _raw_text(p, f"Εκλεισε: {receipt['closed_at']}\n") p._raw(b'\x1b\x21\x00') _divider(p) for item in receipt.get("items", []): p._raw(b'\x1b\x21\x10') _raw_text(p, _item_line(item["name"], item["quantity"]) + "\n") p._raw(b'\x1b\x21\x00') _raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n") _divider(p) if receipt.get("notes"): p._raw(b'\x1b\x21\x10') _raw_text(p, f"Σημ: {receipt['notes']}\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, f"ΣΥΝΟΛΟ: {receipt['total']:.2f}e\n") p._raw(b'\x1b\x21\x00') p._raw(b'\n\n\n') p.cut() p.close() except Exception as e: logger.error("print_order_receipt failed for %s:%s — %s", ip, port, e) def print_order_synopsis(ip: str, port: int, synopsis: dict): """Print a waiter-triggered order synopsis (not a kitchen ticket).""" if is_spoof_mode(): logger.info("Spoof printing ON — dropping order synopsis print") return try: p = _get_printer(ip, port) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, "ΣΥΝΟΨΗ ΠΑΡΑΓΓΕΛΙΑΣ\n") p._raw(b'\x1b\x21\x00') _divider(p) p._raw(b'\x1b\x61\x00') p._raw(b'\x1b\x21\x10') _raw_text(p, f"Τραπεζι: {synopsis['table_name']}\n") _raw_text(p, f"Σερβιτορος: {synopsis['waiter_name']}\n") _raw_text(p, f"Ωρα: {synopsis['opened_at']}\n") p._raw(b'\x1b\x21\x00') _divider(p) paid_items = [i for i in synopsis.get("items", []) if i["status"] == "paid"] active_items = [i for i in synopsis.get("items", []) if i["status"] == "active"] if active_items: p._raw(b'\x1b\x21\x10') _raw_text(p, "ΕΚΚΡΕΜΗ:\n") p._raw(b'\x1b\x21\x00') for item in active_items: _raw_text(p, f" {_item_line(item['name'], item['quantity'])}\n") _raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n") _divider(p) if paid_items: p._raw(b'\x1b\x21\x10') _raw_text(p, "ΠΛΗΡΩΜΕΝΑ:\n") p._raw(b'\x1b\x21\x00') for item in paid_items: _raw_text(p, f" {_item_line(item['name'], item['quantity'])}\n") _raw_text(p, f" {item['unit_price']:.2f}e x{item['quantity']} = {item['total']:.2f}e\n") _divider(p) p._raw(b'\x1b\x61\x01') p._raw(b'\x1b\x21\x30') _raw_text(p, f"ΣΥΝΟΛΟ: {synopsis['total']:.2f}e\n") if synopsis.get('paid_total', 0) > 0: p._raw(b'\x1b\x21\x10') _raw_text(p, f"Πληρωμενο: {synopsis['paid_total']:.2f}e\n") _raw_text(p, f"Εκκρεμει: {synopsis['remaining']:.2f}e\n") p._raw(b'\x1b\x21\x00') p._raw(b'\n\n\n') p.cut() p.close() except Exception as e: logger.error("print_order_synopsis failed for %s:%s — %s", ip, port, e) # ── Routing logic ──────────────────────────────────────────────────────────── def route_and_print(order_id: int, item_ids: List[int]): """ Background task: group items by printer zone, send to each printer. Printer failures are logged but never raise — order is already saved. """ db: Session = SessionLocal() try: _do_route_and_print(order_id, item_ids, db) except Exception as e: logger.exception("Unexpected error in route_and_print for order %s: %s", order_id, e) finally: db.close() def route_and_print_sync(order_id: int, item_ids: List[int], db: Session) -> List[dict]: """ Synchronous variant used when the caller needs print results. Returns a list of per-printer result dicts: { printer_name, success, error } """ return _do_route_and_print(order_id, item_ids, db) def _is_spoof_mode(db: Session) -> bool: row = db.query(PosSettings).filter(PosSettings.key == "dev.spoof_printing").first() return row is not None and row.value == "true" def _do_route_and_print(order_id: int, item_ids: List[int], db: Session) -> List[dict]: if _is_spoof_mode(db): logger.info("Spoof printing ON — dropping print job for order %s", order_id) for item_id in item_ids: item = db.query(OrderItem).filter(OrderItem.id == item_id).first() if item: item.printed = True db.commit() return [{"printer_name": "spoof", "success": True, "error": None}] results = [] order = db.query(Order).filter(Order.id == order_id).first() if not order: logger.error("route_and_print: order %s not found", order_id) return results items = db.query(OrderItem).filter(OrderItem.id.in_(item_ids)).all() # Group items by printer zone zone_map: dict[int, List[OrderItem]] = {} unzoned: List[OrderItem] = [] for item in items: product = db.query(Product).filter(Product.id == item.product_id).first() if product and product.printer_zone_id: zone_map.setdefault(product.printer_zone_id, []).append(item) else: unzoned.append(item) if unzoned: logger.warning("order %s has %d item(s) with no printer zone — skipped", order_id, len(unzoned)) for printer_id, zone_items in zone_map.items(): printer = db.query(Printer).filter(Printer.id == printer_id, Printer.is_active == True).first() if not printer: logger.warning("Printer %s not found or inactive", printer_id) results.append({"printer_name": f"#{printer_id}", "success": False, "error": "Printer not found or inactive"}) continue success = False error_msg = None try: p = _get_printer(printer.ip_address, printer.port) _print_kitchen_ticket(p, order, zone_items, db) p.close() success = True for item in zone_items: item.printed = True db.commit() except Exception as e: error_msg = str(e) logger.error("Print failed for printer %s (%s:%s): %s", printer.name, printer.ip_address, printer.port, e) log = PrintLog( order_id=order_id, printer_id=printer_id, item_ids=json.dumps([i.id for i in zone_items]), success=success, error_message=error_msg, ) db.add(log) db.commit() results.append({"printer_name": printer.name, "success": success, "error": error_msg}) return results