Files

105 lines
3.8 KiB
Python

import time
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from database import get_db
from models.printer import Printer
from schemas.printer import PrinterCreate, PrinterUpdate, PrinterOut
from routers.deps import get_current_user, require_manager, require_sysadmin
from models.user import User
from services import printer_service
from middleware.license_check import license_state
router = APIRouter()
_start_time = time.time()
@router.get("/health")
def health():
return {"status": "ok"}
@router.get("/status")
def system_status(db: Session = Depends(get_db), user: User = Depends(get_current_user)):
printers = db.query(Printer).filter(Printer.is_active == True).all()
printer_statuses = []
for p in printers:
reachable = printer_service.check_printer(p.ip_address, p.port)
printer_statuses.append({"id": p.id, "name": p.name, "reachable": reachable})
return {
"uptime_seconds": int(time.time() - _start_time),
"licensed": license_state.get("licensed", True),
"locked": license_state.get("locked", False),
"expires_at": license_state.get("expires_at"),
"printers": printer_statuses,
}
@router.get("/printers", response_model=List[PrinterOut])
def list_printers(db: Session = Depends(get_db), user: User = Depends(require_manager)):
return db.query(Printer).all()
@router.post("/printers", response_model=PrinterOut)
def create_printer(body: PrinterCreate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = Printer(**body.model_dump())
db.add(printer)
db.commit()
db.refresh(printer)
return printer
@router.post("/printers/test")
def test_printer(printer_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = db.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found")
success, error = printer_service.send_test_print(printer.ip_address, printer.port, printer.name)
return {"success": success, "error": error}
@router.post("/printers/test-order")
def test_order_print(printer_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = db.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found")
success, error = printer_service.send_test_order_print(printer.ip_address, printer.port, db)
return {"success": success, "error": error}
@router.put("/printers/{printer_id}", response_model=PrinterOut)
def update_printer(printer_id: int, body: PrinterUpdate, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = db.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found")
for field, value in body.model_dump(exclude_none=True).items():
setattr(printer, field, value)
db.commit()
db.refresh(printer)
return printer
@router.delete("/printers/{printer_id}")
def delete_printer(printer_id: int, db: Session = Depends(get_db), user: User = Depends(require_manager)):
printer = db.query(Printer).filter(Printer.id == printer_id).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found")
db.delete(printer)
db.commit()
return {"ok": True}
@router.post("/lock")
def lock_system(token: str, user: User = Depends(require_sysadmin)):
license_state["locked"] = True
return {"status": "locked"}
@router.post("/unlock")
def unlock_system(token: str, user: User = Depends(require_sysadmin)):
license_state["locked"] = False
return {"status": "unlocked"}