Files
simple-pos-system/local_backend/routers/reports.py

658 lines
24 KiB
Python

import json
from fastapi import APIRouter, Depends, Query, HTTPException, BackgroundTasks
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import func
from datetime import date, datetime, timedelta
from typing import Optional, List
from database import get_db
from models.order import Order, OrderItem, OrderWaiter, PrintLog
from models.user import User
from models.table import Table
from models.printer import Printer
from models.shift import WaiterShift
from schemas.order import OrderOut
from schemas.table import TableOut
from routers.deps import require_manager
from services.printer_service import print_waiter_report, print_printer_report, print_order_receipt
router = APIRouter()
def _dt(dt):
if dt is None:
return None
return (dt.isoformat() + "Z") if dt.tzinfo is None else dt.isoformat()
@router.get("/shift")
def shift_summary(
from_dt: Optional[str] = Query(default=None, alias="from"),
to_dt: Optional[str] = Query(default=None, alias="to"),
report_date: Optional[date] = Query(default=None, alias="date"),
waiter_id: Optional[int] = None,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
"""Payments collected per waiter — based on paid_by on order items."""
if from_dt and to_dt:
start = datetime.fromisoformat(from_dt)
end = datetime.fromisoformat(to_dt)
else:
target = report_date or date.today()
start = datetime.combine(target, datetime.min.time())
end = start + timedelta(days=1)
q = db.query(OrderItem).filter(
OrderItem.status == "paid",
OrderItem.paid_at >= start,
OrderItem.paid_at < end,
)
if waiter_id:
q = q.filter(OrderItem.paid_by == waiter_id)
items = q.all()
waiters_db = {u.id: u for u in db.query(User).all()}
tables_db = {t.id: (t.label or f"T{t.number}") for t in db.query(Table).all()}
# Build per-waiter summary keyed by waiter_id
summary: dict[int, dict] = {}
for item in items:
wid = item.paid_by
if wid not in summary:
w = waiters_db.get(wid)
wname = (w.full_name or w.username) if w else f"#{wid}"
summary[wid] = {
"waiter_id": wid,
"waiter_name": wname,
"items": 0,
"total": 0.0,
"order_data": {},
}
summary[wid]["items"] += item.quantity
val = item.unit_price * item.quantity
summary[wid]["total"] += val
oid = item.order_id
if oid not in summary[wid]["order_data"]:
order = db.query(Order).filter(Order.id == oid).first()
summary[wid]["order_data"][oid] = {
"id": oid,
"time_open": order.opened_at.strftime("%H:%M") if order else "",
"time_close": order.closed_at.strftime("%H:%M") if order and order.closed_at else "",
"table": tables_db.get(order.table_id, f"#{oid}") if order else f"#{oid}",
"total": 0.0,
"items": [],
}
summary[wid]["order_data"][oid]["total"] += val
product_name = item.product.name if item.product else f"#{item.product_id}"
summary[wid]["order_data"][oid]["items"].append(
{"name": product_name, "quantity": item.quantity}
)
result = []
for entry in summary.values():
entry["orders"] = len(entry["order_data"])
entry["order_data"] = list(entry["order_data"].values())
result.append(entry)
return {"from": start.isoformat(), "to": end.isoformat(), "waiters": result}
@router.get("/shift/orders")
def shift_orders_summary(
from_dt: Optional[str] = Query(default=None, alias="from"),
to_dt: Optional[str] = Query(default=None, alias="to"),
report_date: Optional[date] = Query(default=None, alias="date"),
waiter_id: Optional[int] = None,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
"""Items sent (added) per waiter — regardless of payment status."""
if from_dt and to_dt:
start = datetime.fromisoformat(from_dt)
end = datetime.fromisoformat(to_dt)
else:
target = report_date or date.today()
start = datetime.combine(target, datetime.min.time())
end = start + timedelta(days=1)
q = db.query(OrderItem).filter(
OrderItem.status.in_(["active", "paid"]),
OrderItem.added_at >= start,
OrderItem.added_at < end,
)
if waiter_id:
q = q.filter(OrderItem.added_by == waiter_id)
items = q.all()
waiters_db = {u.id: u for u in db.query(User).all()}
tables_db = {t.id: (t.label or f"T{t.number}") for t in db.query(Table).all()}
summary: dict[int, dict] = {}
for item in items:
wid = item.added_by
if wid not in summary:
w = waiters_db.get(wid)
wname = (w.full_name or w.username) if w else f"#{wid}"
summary[wid] = {
"waiter_id": wid,
"waiter_name": wname,
"items": 0,
"total": 0.0,
"order_data": {},
}
summary[wid]["items"] += item.quantity
val = item.unit_price * item.quantity
summary[wid]["total"] += val
oid = item.order_id
if oid not in summary[wid]["order_data"]:
order = db.query(Order).filter(Order.id == oid).first()
summary[wid]["order_data"][oid] = {
"id": oid,
"time_open": order.opened_at.strftime("%H:%M") if order else "",
"time_close": order.closed_at.strftime("%H:%M") if order and order.closed_at else "",
"table": tables_db.get(order.table_id, f"#{oid}") if order else f"#{oid}",
"total": 0.0,
"items": [],
}
summary[wid]["order_data"][oid]["total"] += val
product_name = item.product.name if item.product else f"#{item.product_id}"
summary[wid]["order_data"][oid]["items"].append(
{"name": product_name, "quantity": item.quantity}
)
result = []
for entry in summary.values():
entry["orders"] = len(entry["order_data"])
entry["order_data"] = list(entry["order_data"].values())
result.append(entry)
return {"from": start.isoformat(), "to": end.isoformat(), "waiters": result}
@router.get("/orders/history", response_model=List[OrderOut])
def order_history(
from_date: Optional[str] = Query(default=None, alias="from"),
to_date: Optional[str] = Query(default=None, alias="to"),
waiter_id: Optional[int] = None,
order_status: Optional[str] = Query(default=None, alias="status"),
table_id: Optional[int] = None,
page: int = 1,
page_size: int = 50,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
q = db.query(Order)
if from_date:
q = q.filter(Order.opened_at >= datetime.fromisoformat(from_date))
if to_date:
q = q.filter(Order.opened_at <= datetime.fromisoformat(to_date))
if waiter_id:
q = q.join(OrderWaiter).filter(OrderWaiter.waiter_id == waiter_id)
if order_status:
q = q.filter(Order.status == order_status)
if table_id:
q = q.filter(Order.table_id == table_id)
return q.order_by(Order.opened_at.desc()).offset((page - 1) * page_size).limit(page_size).all()
@router.get("/tables/summary")
def tables_summary(db: Session = Depends(get_db), user: User = Depends(require_manager)):
tables = db.query(Table).filter(Table.is_active == True).all()
result = []
for table in tables:
active_order = db.query(Order).filter(
Order.table_id == table.id,
Order.status.in_(["open", "partially_paid"]),
).first()
result.append({
"table": TableOut.model_validate(table),
"status": active_order.status if active_order else "free",
"order_id": active_order.id if active_order else None,
})
return result
@router.get("/printers")
def printer_totals(
from_date: Optional[str] = Query(default=None, alias="from"),
to_date: Optional[str] = Query(default=None, alias="to"),
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
"""Returns totals per printer based on print_log entries in the date range."""
q = db.query(PrintLog).filter(PrintLog.success == True)
if from_date:
q = q.filter(PrintLog.printed_at >= datetime.fromisoformat(from_date))
if to_date:
q = q.filter(PrintLog.printed_at <= datetime.fromisoformat(to_date))
logs = q.all()
printers_db = {p.id: p for p in db.query(Printer).all()}
tables_db = {t.id: (t.label or f"T{t.number}") for t in db.query(Table).all()}
# summary[pid] — aggregated totals
summary: dict[int, dict] = {}
# order_map[pid][order_id] — per-order detail with items
order_map: dict[int, dict] = {}
for log in logs:
pid = log.printer_id
if pid not in summary:
printer = printers_db.get(pid)
summary[pid] = {
"printer_id": pid,
"printer_name": printer.name if printer else f"Printer #{pid}",
"print_jobs": 0,
"orders": set(),
"items": 0,
"total": 0.0,
}
order_map[pid] = {}
summary[pid]["print_jobs"] += 1
summary[pid]["orders"].add(log.order_id)
oid = log.order_id
if oid not in order_map[pid]:
order = db.query(Order).filter(Order.id == oid).first()
order_map[pid][oid] = {
"order_id": oid,
"time": log.printed_at.strftime("%H:%M"),
"table": tables_db.get(order.table_id, f"#{oid}") if order else f"#{oid}",
"total": 0.0,
"items": [],
}
try:
item_ids = json.loads(log.item_ids)
except Exception:
item_ids = []
for item_id in item_ids:
item = db.query(OrderItem).filter(OrderItem.id == item_id).first()
if item and item.status in ("active", "paid"):
summary[pid]["items"] += item.quantity
val = item.unit_price * item.quantity
summary[pid]["total"] += val
order_map[pid][oid]["total"] += val
product_name = item.product.name if item.product else f"#{item.product_id}"
order_map[pid][oid]["items"].append({"name": product_name, "quantity": item.quantity})
result = []
for pid, entry in summary.items():
entry["orders"] = len(entry["orders"])
entry["order_data"] = list(order_map.get(pid, {}).values())
result.append(entry)
return {"printers": result}
class PrintWaiterReportBody(BaseModel):
waiter_name: str
printer_id: int
mode: str # "simple" | "extensive"
from_dt: str
to_dt: str
class PrintPrinterReportBody(BaseModel):
printer_target_id: int
printer_id: int
mode: str # "simple" | "extensive"
from_dt: str
to_dt: str
class PrintOrderBody(BaseModel):
printer_id: int
@router.post("/print/waiter")
def print_waiter(
body: PrintWaiterReportBody,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
printer = db.query(Printer).filter(Printer.id == body.printer_id, Printer.is_active == True).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found or inactive")
from_dt = datetime.fromisoformat(body.from_dt)
to_dt = datetime.fromisoformat(body.to_dt)
# Gather orders for this waiter in time range
waiter = db.query(User).filter(User.username == body.waiter_name).first()
q = db.query(Order).filter(
Order.opened_at >= from_dt,
Order.opened_at <= to_dt,
)
if waiter:
q = q.filter(Order.opened_by == waiter.id)
else:
q = q.filter(False)
orders = q.all()
# Enrich with table names
tables = {t.id: (t.label or f"T{t.number}") for t in db.query(Table).all()}
order_data = []
for o in orders:
active_items = [i for i in o.items if i.status in ("active", "paid")]
total = sum(i.unit_price * i.quantity for i in active_items)
order_data.append({
"id": o.id,
"time_open": o.opened_at.strftime("%H:%M"),
"time_close": o.closed_at.strftime("%H:%M") if o.closed_at else "",
"table": tables.get(o.table_id, f"#{o.table_id}"),
"total": total,
"items": [
{"name": (i.product.name if i.product else f"#{i.product_id}"), "quantity": i.quantity}
for i in active_items
],
})
items_count = sum(
i.quantity for o in orders for i in o.items if i.status in ("active", "paid")
)
grand_total = sum(d["total"] for d in order_data)
report = {
"waiter_name": body.waiter_name,
"orders": len(orders),
"items": items_count,
"total": grand_total,
"order_data": order_data if body.mode == "extensive" else [],
"from_dt": from_dt.strftime("%d/%m/%Y %H:%M"),
"to_dt": to_dt.strftime("%d/%m/%Y %H:%M"),
}
background_tasks.add_task(print_waiter_report, printer.ip_address, printer.port, report, body.mode)
return {"status": "printing"}
@router.post("/print/printer")
def print_printer_totals(
body: PrintPrinterReportBody,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
printer = db.query(Printer).filter(Printer.id == body.printer_id, Printer.is_active == True).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found or inactive")
target_printer = db.query(Printer).filter(Printer.id == body.printer_target_id).first()
target_name = target_printer.name if target_printer else f"Printer #{body.printer_target_id}"
from_dt = datetime.fromisoformat(body.from_dt)
to_dt = datetime.fromisoformat(body.to_dt)
logs = db.query(PrintLog).filter(
PrintLog.printer_id == body.printer_target_id,
PrintLog.success == True,
PrintLog.printed_at >= from_dt,
PrintLog.printed_at <= to_dt,
).all()
tables = {t.id: (t.label or f"T{t.number}") for t in db.query(Table).all()}
# Build per-order entries keyed by order_id; each log may add more items
order_map: dict = {}
items_count = 0
grand_total = 0.0
for log in logs:
oid = log.order_id
if oid not in order_map:
order = db.query(Order).filter(Order.id == oid).first()
if order:
order_map[oid] = {
"id": oid,
"time": log.printed_at.strftime("%H:%M"),
"table": tables.get(order.table_id, f"#{order.table_id}"),
"total": 0.0,
"items": [],
}
try:
item_ids = json.loads(log.item_ids)
except Exception:
item_ids = []
for item_id in item_ids:
item = db.query(OrderItem).filter(OrderItem.id == item_id).first()
if item and item.status in ("active", "paid"):
items_count += item.quantity
val = item.unit_price * item.quantity
grand_total += val
if oid in order_map:
order_map[oid]["total"] += val
product_name = item.product.name if item.product else f"#{item.product_id}"
order_map[oid]["items"].append({"name": product_name, "quantity": item.quantity})
order_data = list(order_map.values())
report = {
"printer_name": target_name,
"print_jobs": len(logs),
"orders": len(order_map),
"items": items_count,
"total": grand_total,
"order_data": order_data if body.mode == "extensive" else [],
"from_dt": from_dt.strftime("%d/%m/%Y %H:%M"),
"to_dt": to_dt.strftime("%d/%m/%Y %H:%M"),
}
background_tasks.add_task(print_printer_report, printer.ip_address, printer.port, report, body.mode)
return {"status": "printing"}
# ---------------------------------------------------------------------------
# Shift history report
# ---------------------------------------------------------------------------
@router.get("/shifts")
def shifts_report(
waiter_id: Optional[int] = None,
business_day_id: Optional[int] = None,
from_dt: Optional[str] = Query(default=None, alias="from"),
to_dt: Optional[str] = Query(default=None, alias="to"),
active_only: bool = False,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
from routers.shifts import compute_shift_total
q = db.query(WaiterShift)
if waiter_id:
q = q.filter(WaiterShift.waiter_id == waiter_id)
if business_day_id:
q = q.filter(WaiterShift.business_day_id == business_day_id)
if from_dt:
q = q.filter(WaiterShift.started_at >= datetime.fromisoformat(from_dt))
if to_dt:
q = q.filter(WaiterShift.started_at <= datetime.fromisoformat(to_dt))
if active_only:
q = q.filter(WaiterShift.ended_at == None)
shifts = q.order_by(WaiterShift.started_at.desc()).all()
waiters_db = {u.id: u for u in db.query(User).all()}
result = []
for shift in shifts:
w = waiters_db.get(shift.waiter_id)
wname = (w.full_name or w.username) if w else f"#{shift.waiter_id}"
total = compute_shift_total(shift.id, db) if shift.ended_at is None else (shift.total_collected or 0.0)
result.append({
"id": shift.id,
"waiter_id": shift.waiter_id,
"waiter_name": wname,
"business_day_id": shift.business_day_id,
"started_at": _dt(shift.started_at),
"ended_at": _dt(shift.ended_at),
"starting_cash": shift.starting_cash,
"total_collected": total,
"net_to_deliver": round(total + (shift.starting_cash or 0.0), 2),
"is_active": shift.ended_at is None,
"notes": shift.notes,
})
return {"shifts": result}
# ---------------------------------------------------------------------------
# Product performance analytics
# ---------------------------------------------------------------------------
@router.get("/products/performance")
def product_performance(
from_dt: Optional[str] = Query(default=None, alias="from"),
to_dt: Optional[str] = Query(default=None, alias="to"),
business_day_id: Optional[int] = None,
category_id: Optional[int] = None,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
from models.product import Product
q = db.query(OrderItem).filter(OrderItem.status.in_(["active", "paid"]))
if from_dt:
q = q.filter(OrderItem.added_at >= datetime.fromisoformat(from_dt))
if to_dt:
q = q.filter(OrderItem.added_at <= datetime.fromisoformat(to_dt))
if business_day_id:
q = q.join(Order).filter(Order.business_day_id == business_day_id)
items = q.all()
products_db = {p.id: p for p in db.query(Product).all()}
summary: dict = {}
for item in items:
pid = item.product_id
product = products_db.get(pid)
if category_id and (not product or product.category_id != category_id):
continue
if pid not in summary:
summary[pid] = {
"product_id": pid,
"product_name": product.name if product else f"#{pid}",
"category_id": product.category_id if product else None,
"qty_sold": 0,
"revenue": 0.0,
"order_ids": set(),
}
summary[pid]["qty_sold"] += item.quantity
summary[pid]["revenue"] += item.unit_price * item.quantity
summary[pid]["order_ids"].add(item.order_id)
result = []
for entry in summary.values():
entry["order_count"] = len(entry.pop("order_ids"))
entry["revenue"] = round(entry["revenue"], 2)
result.append(entry)
result.sort(key=lambda x: x["qty_sold"], reverse=True)
return {"products": result}
# ---------------------------------------------------------------------------
# Table performance analytics
# ---------------------------------------------------------------------------
@router.get("/tables/performance")
def table_performance(
from_dt: Optional[str] = Query(default=None, alias="from"),
to_dt: Optional[str] = Query(default=None, alias="to"),
business_day_id: Optional[int] = None,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
q = db.query(Order).filter(Order.status.in_(["closed", "paid"]))
if from_dt:
q = q.filter(Order.opened_at >= datetime.fromisoformat(from_dt))
if to_dt:
q = q.filter(Order.opened_at <= datetime.fromisoformat(to_dt))
if business_day_id:
q = q.filter(Order.business_day_id == business_day_id)
orders = q.all()
tables_db = {t.id: t for t in db.query(Table).all()}
summary: dict = {}
for order in orders:
tid = order.table_id
if tid not in summary:
t = tables_db.get(tid)
summary[tid] = {
"table_id": tid,
"table_name": (t.label or f"T{t.number}") if t else f"#{tid}",
"order_count": 0,
"revenue": 0.0,
"durations": [],
}
summary[tid]["order_count"] += 1
summary[tid]["revenue"] += sum(
i.unit_price * i.quantity for i in order.items if i.status in ("active", "paid")
)
if order.closed_at and order.opened_at:
summary[tid]["durations"].append(
(order.closed_at - order.opened_at).total_seconds() / 60
)
result = []
for entry in summary.values():
durations = entry.pop("durations")
entry["avg_duration_minutes"] = round(sum(durations) / len(durations), 1) if durations else None
entry["revenue"] = round(entry["revenue"], 2)
result.append(entry)
result.sort(key=lambda x: x["revenue"], reverse=True)
return {"tables": result}
# ---------------------------------------------------------------------------
# Traffic analysis (hour-of-day / day-of-week)
# ---------------------------------------------------------------------------
@router.get("/traffic")
def traffic_analysis(
from_dt: Optional[str] = Query(default=None, alias="from"),
to_dt: Optional[str] = Query(default=None, alias="to"),
business_day_id: Optional[int] = None,
db: Session = Depends(get_db),
user: User = Depends(require_manager),
):
q = db.query(Order)
if from_dt:
q = q.filter(Order.opened_at >= datetime.fromisoformat(from_dt))
if to_dt:
q = q.filter(Order.opened_at <= datetime.fromisoformat(to_dt))
if business_day_id:
q = q.filter(Order.business_day_id == business_day_id)
orders = q.all()
by_hour = {h: {"hour": h, "orders": 0, "revenue": 0.0} for h in range(24)}
day_labels = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
by_weekday = {d: {"day": d, "label": day_labels[d], "orders": 0, "revenue": 0.0} for d in range(7)}
for order in orders:
revenue = sum(
i.unit_price * i.quantity for i in order.items if i.status in ("active", "paid")
)
h = order.opened_at.hour
d = order.opened_at.weekday()
by_hour[h]["orders"] += 1
by_hour[h]["revenue"] += revenue
by_weekday[d]["orders"] += 1
by_weekday[d]["revenue"] += revenue
for h in by_hour:
by_hour[h]["revenue"] = round(by_hour[h]["revenue"], 2)
for d in by_weekday:
by_weekday[d]["revenue"] = round(by_weekday[d]["revenue"], 2)
return {
"by_hour": list(by_hour.values()),
"by_weekday": list(by_weekday.values()),
}