1249 lines
46 KiB
Python
1249 lines
46 KiB
Python
import asyncio
|
|
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
from fastapi import HTTPException
|
|
from shared.firebase import get_db
|
|
from shared.exceptions import NotFoundError
|
|
import re as _re
|
|
import database as mqtt_db
|
|
from crm.models import (
|
|
ProductCreate, ProductUpdate, ProductInDB,
|
|
CustomerCreate, CustomerUpdate, CustomerInDB,
|
|
OrderCreate, OrderUpdate, OrderInDB,
|
|
CommCreate, CommUpdate, CommInDB,
|
|
MediaCreate, MediaInDB,
|
|
TechnicalIssue, InstallSupportEntry, TransactionEntry,
|
|
)
|
|
|
|
COLLECTION = "crm_products"
|
|
|
|
|
|
def _doc_to_product(doc) -> ProductInDB:
|
|
data = doc.to_dict()
|
|
# Backfill bilingual fields for existing products that predate the feature
|
|
if not data.get("name_en") and data.get("name"):
|
|
data["name_en"] = data["name"]
|
|
if not data.get("name_gr") and data.get("name"):
|
|
data["name_gr"] = data["name"]
|
|
return ProductInDB(id=doc.id, **data)
|
|
|
|
|
|
def list_products(
|
|
search: str | None = None,
|
|
category: str | None = None,
|
|
active_only: bool = False,
|
|
) -> list[ProductInDB]:
|
|
db = get_db()
|
|
query = db.collection(COLLECTION)
|
|
|
|
if active_only:
|
|
query = query.where("active", "==", True)
|
|
|
|
if category:
|
|
query = query.where("category", "==", category)
|
|
|
|
results = []
|
|
for doc in query.stream():
|
|
product = _doc_to_product(doc)
|
|
|
|
if search:
|
|
s = search.lower()
|
|
if not (
|
|
s in (product.name or "").lower()
|
|
or s in (product.sku or "").lower()
|
|
or s in (product.description or "").lower()
|
|
):
|
|
continue
|
|
|
|
results.append(product)
|
|
|
|
return results
|
|
|
|
|
|
def get_product(product_id: str) -> ProductInDB:
|
|
db = get_db()
|
|
doc = db.collection(COLLECTION).document(product_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Product")
|
|
return _doc_to_product(doc)
|
|
|
|
|
|
def create_product(data: ProductCreate) -> ProductInDB:
|
|
db = get_db()
|
|
now = datetime.utcnow().isoformat()
|
|
product_id = str(uuid.uuid4())
|
|
|
|
doc_data = data.model_dump()
|
|
doc_data["created_at"] = now
|
|
doc_data["updated_at"] = now
|
|
|
|
# Serialize nested enums/models
|
|
if doc_data.get("category"):
|
|
doc_data["category"] = doc_data["category"].value if hasattr(doc_data["category"], "value") else doc_data["category"]
|
|
if doc_data.get("costs") and hasattr(doc_data["costs"], "model_dump"):
|
|
doc_data["costs"] = doc_data["costs"].model_dump()
|
|
if doc_data.get("stock") and hasattr(doc_data["stock"], "model_dump"):
|
|
doc_data["stock"] = doc_data["stock"].model_dump()
|
|
|
|
db.collection(COLLECTION).document(product_id).set(doc_data)
|
|
return ProductInDB(id=product_id, **doc_data)
|
|
|
|
|
|
def update_product(product_id: str, data: ProductUpdate) -> ProductInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(product_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Product")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
update_data["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
if "category" in update_data and hasattr(update_data["category"], "value"):
|
|
update_data["category"] = update_data["category"].value
|
|
if "costs" in update_data and hasattr(update_data["costs"], "model_dump"):
|
|
update_data["costs"] = update_data["costs"].model_dump()
|
|
if "stock" in update_data and hasattr(update_data["stock"], "model_dump"):
|
|
update_data["stock"] = update_data["stock"].model_dump()
|
|
|
|
doc_ref.update(update_data)
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_product(updated_doc)
|
|
|
|
|
|
def delete_product(product_id: str) -> None:
|
|
db = get_db()
|
|
doc_ref = db.collection(COLLECTION).document(product_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Product")
|
|
doc_ref.delete()
|
|
|
|
|
|
# ── Customers ────────────────────────────────────────────────────────────────
|
|
|
|
CUSTOMERS_COLLECTION = "crm_customers"
|
|
|
|
|
|
_LEGACY_CUSTOMER_FIELDS = {"negotiating", "has_problem"}
|
|
|
|
def _doc_to_customer(doc) -> CustomerInDB:
|
|
data = doc.to_dict()
|
|
for f in _LEGACY_CUSTOMER_FIELDS:
|
|
data.pop(f, None)
|
|
return CustomerInDB(id=doc.id, **data)
|
|
|
|
|
|
def list_customers(
|
|
search: str | None = None,
|
|
tag: str | None = None,
|
|
sort: str | None = None,
|
|
) -> list[CustomerInDB]:
|
|
db = get_db()
|
|
query = db.collection(CUSTOMERS_COLLECTION)
|
|
|
|
if tag:
|
|
query = query.where("tags", "array_contains", tag)
|
|
|
|
results = []
|
|
for doc in query.stream():
|
|
customer = _doc_to_customer(doc)
|
|
|
|
if search:
|
|
s = search.lower()
|
|
s_nospace = s.replace(" ", "")
|
|
name_match = s in (customer.name or "").lower()
|
|
surname_match = s in (customer.surname or "").lower()
|
|
org_match = s in (customer.organization or "").lower()
|
|
religion_match = s in (customer.religion or "").lower()
|
|
language_match = s in (customer.language or "").lower()
|
|
contact_match = any(
|
|
s_nospace in (c.value or "").lower().replace(" ", "")
|
|
or s in (c.value or "").lower()
|
|
for c in (customer.contacts or [])
|
|
)
|
|
loc = customer.location
|
|
loc_match = bool(loc) and (
|
|
s in (loc.address or "").lower() or
|
|
s in (loc.city or "").lower() or
|
|
s in (loc.postal_code or "").lower() or
|
|
s in (loc.region or "").lower() or
|
|
s in (loc.country or "").lower()
|
|
)
|
|
tag_match = any(s in (t or "").lower() for t in (customer.tags or []))
|
|
if not (name_match or surname_match or org_match or religion_match or language_match or contact_match or loc_match or tag_match):
|
|
continue
|
|
|
|
results.append(customer)
|
|
|
|
# Sorting (non-latest_comm; latest_comm is handled by the async router wrapper)
|
|
_TITLES = {"fr.", "rev.", "archim.", "bp.", "abp.", "met.", "mr.", "mrs.", "ms.", "dr.", "prof."}
|
|
|
|
def _sort_name(c):
|
|
return (c.name or "").lower()
|
|
|
|
def _sort_surname(c):
|
|
return (c.surname or "").lower()
|
|
|
|
def _sort_default(c):
|
|
return c.created_at or ""
|
|
|
|
if sort == "name":
|
|
results.sort(key=_sort_name)
|
|
elif sort == "surname":
|
|
results.sort(key=_sort_surname)
|
|
elif sort == "default":
|
|
results.sort(key=_sort_default)
|
|
|
|
return results
|
|
|
|
|
|
def list_all_tags() -> list[str]:
|
|
db = get_db()
|
|
tags: set[str] = set()
|
|
for doc in db.collection(CUSTOMERS_COLLECTION).select(["tags"]).stream():
|
|
data = doc.to_dict()
|
|
for tag in (data.get("tags") or []):
|
|
if tag:
|
|
tags.add(tag)
|
|
return sorted(tags)
|
|
|
|
|
|
def get_customer(customer_id: str) -> CustomerInDB:
|
|
db = get_db()
|
|
doc = db.collection(CUSTOMERS_COLLECTION).document(customer_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
return _doc_to_customer(doc)
|
|
|
|
|
|
def get_customer_nc_path(customer: CustomerInDB) -> str:
|
|
"""Return the Nextcloud folder slug for a customer. Falls back to UUID for legacy records."""
|
|
return customer.folder_id if customer.folder_id else customer.id
|
|
|
|
|
|
def create_customer(data: CustomerCreate) -> CustomerInDB:
|
|
db = get_db()
|
|
|
|
# Validate folder_id
|
|
if not data.folder_id or not data.folder_id.strip():
|
|
raise HTTPException(status_code=422, detail="Internal Folder ID is required.")
|
|
folder_id = data.folder_id.strip().lower()
|
|
if not _re.match(r'^[a-z0-9][a-z0-9\-]*[a-z0-9]$', folder_id):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="Internal Folder ID must contain only lowercase letters, numbers, and hyphens, and cannot start or end with a hyphen.",
|
|
)
|
|
# Check uniqueness
|
|
existing = list(db.collection(CUSTOMERS_COLLECTION).where("folder_id", "==", folder_id).limit(1).stream())
|
|
if existing:
|
|
raise HTTPException(status_code=409, detail=f"A customer with folder ID '{folder_id}' already exists.")
|
|
|
|
now = datetime.utcnow().isoformat()
|
|
customer_id = str(uuid.uuid4())
|
|
|
|
doc_data = data.model_dump()
|
|
doc_data["folder_id"] = folder_id
|
|
doc_data["created_at"] = now
|
|
doc_data["updated_at"] = now
|
|
|
|
db.collection(CUSTOMERS_COLLECTION).document(customer_id).set(doc_data)
|
|
return CustomerInDB(id=customer_id, **doc_data)
|
|
|
|
|
|
def update_customer(customer_id: str, data: CustomerUpdate) -> CustomerInDB:
|
|
from google.cloud.firestore_v1 import DELETE_FIELD
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
update_data["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
# Fields that should be explicitly deleted from Firestore when set to None
|
|
# (exclude_none=True would just skip them, leaving the old value intact)
|
|
NULLABLE_FIELDS = {"title", "surname", "organization", "religion"}
|
|
set_fields = data.model_fields_set
|
|
for field in NULLABLE_FIELDS:
|
|
if field in set_fields and getattr(data, field) is None:
|
|
update_data[field] = DELETE_FIELD
|
|
|
|
doc_ref.update(update_data)
|
|
updated_doc = doc_ref.get()
|
|
return _doc_to_customer(updated_doc)
|
|
|
|
|
|
|
|
async def get_last_comm_direction(customer_id: str) -> dict:
|
|
"""Return direction ('inbound'/'outbound') and timestamp of the most recent comm, or None."""
|
|
db = await mqtt_db.get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT direction, COALESCE(occurred_at, created_at) as ts FROM crm_comms_log WHERE customer_id = ? "
|
|
"AND direction IN ('inbound', 'outbound') "
|
|
"ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT 1",
|
|
(customer_id,),
|
|
)
|
|
if rows:
|
|
return {"direction": rows[0][0], "occurred_at": rows[0][1]}
|
|
return {"direction": None, "occurred_at": None}
|
|
|
|
|
|
async def get_last_comm_timestamp(customer_id: str) -> str | None:
|
|
"""Return the ISO timestamp of the most recent comm for this customer, or None."""
|
|
db = await mqtt_db.get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT COALESCE(occurred_at, created_at) as ts FROM crm_comms_log "
|
|
"WHERE customer_id = ? ORDER BY ts DESC LIMIT 1",
|
|
(customer_id,),
|
|
)
|
|
if rows:
|
|
return rows[0][0]
|
|
return None
|
|
|
|
|
|
async def list_customers_sorted_by_latest_comm(customers: list[CustomerInDB]) -> list[CustomerInDB]:
|
|
"""Re-sort a list of customers so those with the most recent comm come first."""
|
|
timestamps = await asyncio.gather(
|
|
*[get_last_comm_timestamp(c.id) for c in customers]
|
|
)
|
|
paired = list(zip(customers, timestamps))
|
|
paired.sort(key=lambda x: x[1] or "", reverse=True)
|
|
return [c for c, _ in paired]
|
|
|
|
|
|
def delete_customer(customer_id: str) -> CustomerInDB:
|
|
"""Delete customer from Firestore. Returns the customer data (for NC path lookup)."""
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
customer = _doc_to_customer(doc)
|
|
doc_ref.delete()
|
|
return customer
|
|
|
|
|
|
async def delete_customer_comms(customer_id: str) -> int:
|
|
"""Delete all comm log entries for a customer. Returns count deleted."""
|
|
db = await mqtt_db.get_db()
|
|
cursor = await db.execute(
|
|
"DELETE FROM crm_comms_log WHERE customer_id = ?", (customer_id,)
|
|
)
|
|
await db.commit()
|
|
return cursor.rowcount
|
|
|
|
|
|
async def delete_customer_media_entries(customer_id: str) -> int:
|
|
"""Delete all media DB entries for a customer. Returns count deleted."""
|
|
db = await mqtt_db.get_db()
|
|
cursor = await db.execute(
|
|
"DELETE FROM crm_media WHERE customer_id = ?", (customer_id,)
|
|
)
|
|
await db.commit()
|
|
return cursor.rowcount
|
|
|
|
|
|
# ── Orders (subcollection under customers/{id}/orders) ────────────────────────
|
|
|
|
def _doc_to_order(doc) -> OrderInDB:
|
|
data = doc.to_dict()
|
|
return OrderInDB(id=doc.id, **data)
|
|
|
|
|
|
def _order_collection(customer_id: str):
|
|
db = get_db()
|
|
return db.collection(CUSTOMERS_COLLECTION).document(customer_id).collection("orders")
|
|
|
|
|
|
def _generate_order_number(customer_id: str) -> str:
|
|
"""Generate next ORD-DDMMYY-NNN across all customers using collection group query."""
|
|
db = get_db()
|
|
now = datetime.utcnow()
|
|
prefix = f"ORD-{now.strftime('%d%m%y')}-"
|
|
max_n = 0
|
|
for doc in db.collection_group("orders").stream():
|
|
data = doc.to_dict()
|
|
num = data.get("order_number", "")
|
|
if num and num.startswith(prefix):
|
|
try:
|
|
n = int(num[len(prefix):])
|
|
if n > max_n:
|
|
max_n = n
|
|
except ValueError:
|
|
pass
|
|
return f"{prefix}{max_n + 1:03d}"
|
|
|
|
|
|
def _default_payment_status() -> dict:
|
|
return {
|
|
"required_amount": 0,
|
|
"received_amount": 0,
|
|
"balance_due": 0,
|
|
"advance_required": False,
|
|
"advance_amount": None,
|
|
"payment_complete": False,
|
|
}
|
|
|
|
|
|
def _recalculate_order_payment_status(customer_id: str, order_id: str) -> None:
|
|
"""Recompute an order's payment_status from transaction_history on the customer."""
|
|
db = get_db()
|
|
cust_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
cust_data = (cust_ref.get().to_dict()) or {}
|
|
txns = cust_data.get("transaction_history") or []
|
|
required = sum(float(t.get("amount") or 0) for t in txns
|
|
if t.get("order_ref") == order_id and t.get("flow") == "invoice")
|
|
received = sum(float(t.get("amount") or 0) for t in txns
|
|
if t.get("order_ref") == order_id and t.get("flow") == "payment")
|
|
balance_due = required - received
|
|
payment_complete = (required > 0 and balance_due <= 0)
|
|
order_ref = _order_collection(customer_id).document(order_id)
|
|
if not order_ref.get().exists:
|
|
return
|
|
order_ref.update({
|
|
"payment_status": {
|
|
"required_amount": required,
|
|
"received_amount": received,
|
|
"balance_due": balance_due,
|
|
"advance_required": False,
|
|
"advance_amount": None,
|
|
"payment_complete": payment_complete,
|
|
},
|
|
"updated_at": datetime.utcnow().isoformat(),
|
|
})
|
|
|
|
|
|
def _update_crm_summary(customer_id: str) -> None:
|
|
"""Recompute and store the crm_summary field on the customer document."""
|
|
db = get_db()
|
|
customer_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
|
|
# Load customer for issue/support arrays
|
|
customer_doc = customer_ref.get()
|
|
if not customer_doc.exists:
|
|
return
|
|
customer_data = customer_doc.to_dict() or {}
|
|
|
|
# Active issues
|
|
issues = customer_data.get("technical_issues") or []
|
|
active_issues = [i for i in issues if i.get("active")]
|
|
active_issues_count = len(active_issues)
|
|
latest_issue_date = None
|
|
if active_issues:
|
|
latest_issue_date = max((i.get("opened_date") or "") for i in active_issues) or None
|
|
|
|
# Active support
|
|
support = customer_data.get("install_support") or []
|
|
active_support = [s for s in support if s.get("active")]
|
|
active_support_count = len(active_support)
|
|
latest_support_date = None
|
|
if active_support:
|
|
latest_support_date = max((s.get("opened_date") or "") for s in active_support) or None
|
|
|
|
# Active order (most recent non-terminal status)
|
|
TERMINAL_STATUSES = {"declined", "complete"}
|
|
active_order_status = None
|
|
active_order_status_date = None
|
|
active_order_title = None
|
|
active_order_number = None
|
|
latest_order_date = ""
|
|
all_order_statuses = []
|
|
for doc in _order_collection(customer_id).stream():
|
|
data = doc.to_dict() or {}
|
|
status = data.get("status", "")
|
|
all_order_statuses.append(status)
|
|
if status not in TERMINAL_STATUSES:
|
|
upd = data.get("status_updated_date") or data.get("created_at") or ""
|
|
if upd > latest_order_date:
|
|
latest_order_date = upd
|
|
active_order_status = status
|
|
active_order_status_date = upd
|
|
active_order_title = data.get("title")
|
|
active_order_number = data.get("order_number")
|
|
|
|
summary = {
|
|
"active_order_status": active_order_status,
|
|
"active_order_status_date": active_order_status_date,
|
|
"active_order_title": active_order_title,
|
|
"active_order_number": active_order_number,
|
|
"all_orders_statuses": all_order_statuses,
|
|
"active_issues_count": active_issues_count,
|
|
"latest_issue_date": latest_issue_date,
|
|
"active_support_count": active_support_count,
|
|
"latest_support_date": latest_support_date,
|
|
}
|
|
customer_ref.update({"crm_summary": summary, "updated_at": datetime.utcnow().isoformat()})
|
|
|
|
|
|
def list_orders(customer_id: str) -> list[OrderInDB]:
|
|
return [_doc_to_order(doc) for doc in _order_collection(customer_id).stream()]
|
|
|
|
|
|
def list_all_orders(status: str | None = None) -> list[OrderInDB]:
|
|
"""Query across all customers using Firestore collection group."""
|
|
db = get_db()
|
|
query = db.collection_group("orders")
|
|
if status:
|
|
query = query.where("status", "==", status)
|
|
return [_doc_to_order(doc) for doc in query.stream()]
|
|
|
|
|
|
def get_order(customer_id: str, order_id: str) -> OrderInDB:
|
|
doc = _order_collection(customer_id).document(order_id).get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Order")
|
|
return _doc_to_order(doc)
|
|
|
|
|
|
def create_order(customer_id: str, data: OrderCreate) -> OrderInDB:
|
|
col = _order_collection(customer_id)
|
|
now = datetime.utcnow().isoformat()
|
|
order_id = str(uuid.uuid4())
|
|
|
|
doc_data = data.model_dump()
|
|
doc_data["customer_id"] = customer_id
|
|
if not doc_data.get("order_number"):
|
|
doc_data["order_number"] = _generate_order_number(customer_id)
|
|
if not doc_data.get("payment_status"):
|
|
doc_data["payment_status"] = _default_payment_status()
|
|
if not doc_data.get("status_updated_date"):
|
|
doc_data["status_updated_date"] = now
|
|
doc_data["created_at"] = now
|
|
doc_data["updated_at"] = now
|
|
|
|
col.document(order_id).set(doc_data)
|
|
_update_crm_summary(customer_id)
|
|
return OrderInDB(id=order_id, **doc_data)
|
|
|
|
|
|
def update_order(customer_id: str, order_id: str, data: OrderUpdate) -> OrderInDB:
|
|
doc_ref = _order_collection(customer_id).document(order_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Order")
|
|
|
|
update_data = data.model_dump(exclude_none=True)
|
|
update_data["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
doc_ref.update(update_data)
|
|
_update_crm_summary(customer_id)
|
|
result = _doc_to_order(doc_ref.get())
|
|
|
|
# Auto-mark customer as inactive when all orders are complete
|
|
if update_data.get("status") == "complete":
|
|
all_orders = list_orders(customer_id)
|
|
if all_orders and all(o.status == "complete" for o in all_orders):
|
|
db = get_db()
|
|
db.collection(CUSTOMERS_COLLECTION).document(customer_id).update({
|
|
"relationship_status": "inactive",
|
|
"updated_at": datetime.utcnow().isoformat(),
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def delete_order(customer_id: str, order_id: str) -> None:
|
|
doc_ref = _order_collection(customer_id).document(order_id)
|
|
if not doc_ref.get().exists:
|
|
raise NotFoundError("Order")
|
|
doc_ref.delete()
|
|
_update_crm_summary(customer_id)
|
|
|
|
|
|
def append_timeline_event(customer_id: str, order_id: str, event: dict) -> OrderInDB:
|
|
from google.cloud.firestore_v1 import ArrayUnion
|
|
doc_ref = _order_collection(customer_id).document(order_id)
|
|
if not doc_ref.get().exists:
|
|
raise NotFoundError("Order")
|
|
now = datetime.utcnow().isoformat()
|
|
doc_ref.update({
|
|
"timeline": ArrayUnion([event]),
|
|
"status_updated_date": event.get("date", now),
|
|
"status_updated_by": event.get("updated_by", ""),
|
|
"updated_at": now,
|
|
})
|
|
return _doc_to_order(doc_ref.get())
|
|
|
|
|
|
def update_timeline_event(customer_id: str, order_id: str, index: int, data: dict) -> OrderInDB:
|
|
doc_ref = _order_collection(customer_id).document(order_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Order")
|
|
timeline = list(doc.to_dict().get("timeline") or [])
|
|
if index < 0 or index >= len(timeline):
|
|
raise HTTPException(status_code=404, detail="Timeline index out of range")
|
|
timeline[index] = {**timeline[index], **data}
|
|
doc_ref.update({"timeline": timeline, "updated_at": datetime.utcnow().isoformat()})
|
|
return _doc_to_order(doc_ref.get())
|
|
|
|
|
|
def delete_timeline_event(customer_id: str, order_id: str, index: int) -> OrderInDB:
|
|
doc_ref = _order_collection(customer_id).document(order_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Order")
|
|
timeline = list(doc.to_dict().get("timeline") or [])
|
|
if index < 0 or index >= len(timeline):
|
|
raise HTTPException(status_code=404, detail="Timeline index out of range")
|
|
timeline.pop(index)
|
|
doc_ref.update({"timeline": timeline, "updated_at": datetime.utcnow().isoformat()})
|
|
return _doc_to_order(doc_ref.get())
|
|
|
|
|
|
def update_order_payment_status(customer_id: str, order_id: str, payment_data: dict) -> OrderInDB:
|
|
doc_ref = _order_collection(customer_id).document(order_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Order")
|
|
existing = doc.to_dict().get("payment_status") or _default_payment_status()
|
|
existing.update({k: v for k, v in payment_data.items() if v is not None})
|
|
doc_ref.update({
|
|
"payment_status": existing,
|
|
"updated_at": datetime.utcnow().isoformat(),
|
|
})
|
|
return _doc_to_order(doc_ref.get())
|
|
|
|
|
|
def init_negotiations(customer_id: str, title: str, note: str, date: str, created_by: str) -> OrderInDB:
|
|
"""Create a new order with status=negotiating and bump customer relationship_status if needed."""
|
|
db = get_db()
|
|
customer_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
customer_doc = customer_ref.get()
|
|
if not customer_doc.exists:
|
|
raise NotFoundError("Customer")
|
|
|
|
now = datetime.utcnow().isoformat()
|
|
order_id = str(uuid.uuid4())
|
|
|
|
timeline_event = {
|
|
"date": date or now,
|
|
"type": "note",
|
|
"note": note or "",
|
|
"updated_by": created_by,
|
|
}
|
|
|
|
doc_data = {
|
|
"customer_id": customer_id,
|
|
"order_number": _generate_order_number(customer_id),
|
|
"title": title,
|
|
"created_by": created_by,
|
|
"status": "negotiating",
|
|
"status_updated_date": date or now,
|
|
"status_updated_by": created_by,
|
|
"items": [],
|
|
"subtotal": 0,
|
|
"discount": None,
|
|
"total_price": 0,
|
|
"currency": "EUR",
|
|
"shipping": None,
|
|
"payment_status": _default_payment_status(),
|
|
"invoice_path": None,
|
|
"notes": note or "",
|
|
"timeline": [timeline_event],
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
|
|
_order_collection(customer_id).document(order_id).set(doc_data)
|
|
|
|
# Upgrade relationship_status only if currently lead or prospect
|
|
current_data = customer_doc.to_dict() or {}
|
|
current_rel = current_data.get("relationship_status", "lead")
|
|
if current_rel in ("lead", "prospect"):
|
|
customer_ref.update({"relationship_status": "active", "updated_at": now})
|
|
|
|
_update_crm_summary(customer_id)
|
|
return OrderInDB(id=order_id, **doc_data)
|
|
|
|
|
|
# ── Technical Issues & Install Support ────────────────────────────────────────
|
|
|
|
def add_technical_issue(customer_id: str, note: str, opened_by: str, date: str | None = None) -> CustomerInDB:
|
|
from google.cloud.firestore_v1 import ArrayUnion
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
if not doc_ref.get().exists:
|
|
raise NotFoundError("Customer")
|
|
now = datetime.utcnow().isoformat()
|
|
issue = {
|
|
"active": True,
|
|
"opened_date": date or now,
|
|
"resolved_date": None,
|
|
"note": note,
|
|
"opened_by": opened_by,
|
|
"resolved_by": None,
|
|
}
|
|
doc_ref.update({"technical_issues": ArrayUnion([issue]), "updated_at": now})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def resolve_technical_issue(customer_id: str, index: int, resolved_by: str) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
issues = list(data.get("technical_issues") or [])
|
|
if index < 0 or index >= len(issues):
|
|
raise HTTPException(status_code=404, detail="Issue index out of range")
|
|
now = datetime.utcnow().isoformat()
|
|
issues[index] = {**issues[index], "active": False, "resolved_date": now, "resolved_by": resolved_by}
|
|
doc_ref.update({"technical_issues": issues, "updated_at": now})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def add_install_support(customer_id: str, note: str, opened_by: str, date: str | None = None) -> CustomerInDB:
|
|
from google.cloud.firestore_v1 import ArrayUnion
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
if not doc_ref.get().exists:
|
|
raise NotFoundError("Customer")
|
|
now = datetime.utcnow().isoformat()
|
|
entry = {
|
|
"active": True,
|
|
"opened_date": date or now,
|
|
"resolved_date": None,
|
|
"note": note,
|
|
"opened_by": opened_by,
|
|
"resolved_by": None,
|
|
}
|
|
doc_ref.update({"install_support": ArrayUnion([entry]), "updated_at": now})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def resolve_install_support(customer_id: str, index: int, resolved_by: str) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
entries = list(data.get("install_support") or [])
|
|
if index < 0 or index >= len(entries):
|
|
raise HTTPException(status_code=404, detail="Support index out of range")
|
|
now = datetime.utcnow().isoformat()
|
|
entries[index] = {**entries[index], "active": False, "resolved_date": now, "resolved_by": resolved_by}
|
|
doc_ref.update({"install_support": entries, "updated_at": now})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def edit_technical_issue(customer_id: str, index: int, note: str, opened_date: str | None = None) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
issues = list(data.get("technical_issues") or [])
|
|
if index < 0 or index >= len(issues):
|
|
raise HTTPException(status_code=404, detail="Issue index out of range")
|
|
issues[index] = {**issues[index], "note": note}
|
|
if opened_date:
|
|
issues[index]["opened_date"] = opened_date
|
|
doc_ref.update({"technical_issues": issues, "updated_at": datetime.utcnow().isoformat()})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def delete_technical_issue(customer_id: str, index: int) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
issues = list(data.get("technical_issues") or [])
|
|
if index < 0 or index >= len(issues):
|
|
raise HTTPException(status_code=404, detail="Issue index out of range")
|
|
issues.pop(index)
|
|
doc_ref.update({"technical_issues": issues, "updated_at": datetime.utcnow().isoformat()})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def edit_install_support(customer_id: str, index: int, note: str, opened_date: str | None = None) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
entries = list(data.get("install_support") or [])
|
|
if index < 0 or index >= len(entries):
|
|
raise HTTPException(status_code=404, detail="Support index out of range")
|
|
entries[index] = {**entries[index], "note": note}
|
|
if opened_date:
|
|
entries[index]["opened_date"] = opened_date
|
|
doc_ref.update({"install_support": entries, "updated_at": datetime.utcnow().isoformat()})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def delete_install_support(customer_id: str, index: int) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
entries = list(data.get("install_support") or [])
|
|
if index < 0 or index >= len(entries):
|
|
raise HTTPException(status_code=404, detail="Support index out of range")
|
|
entries.pop(index)
|
|
doc_ref.update({"install_support": entries, "updated_at": datetime.utcnow().isoformat()})
|
|
_update_crm_summary(customer_id)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
# ── Transactions ──────────────────────────────────────────────────────────────
|
|
|
|
def add_transaction(customer_id: str, entry: TransactionEntry) -> CustomerInDB:
|
|
from google.cloud.firestore_v1 import ArrayUnion
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
if not doc_ref.get().exists:
|
|
raise NotFoundError("Customer")
|
|
now = datetime.utcnow().isoformat()
|
|
doc_ref.update({"transaction_history": ArrayUnion([entry.model_dump()]), "updated_at": now})
|
|
if entry.order_ref:
|
|
_recalculate_order_payment_status(customer_id, entry.order_ref)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def update_transaction(customer_id: str, index: int, entry: TransactionEntry) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
txns = list(data.get("transaction_history") or [])
|
|
if index < 0 or index >= len(txns):
|
|
raise HTTPException(status_code=404, detail="Transaction index out of range")
|
|
txns[index] = entry.model_dump()
|
|
now = datetime.utcnow().isoformat()
|
|
doc_ref.update({"transaction_history": txns, "updated_at": now})
|
|
if entry.order_ref:
|
|
_recalculate_order_payment_status(customer_id, entry.order_ref)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
def delete_transaction(customer_id: str, index: int) -> CustomerInDB:
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
doc = doc_ref.get()
|
|
if not doc.exists:
|
|
raise NotFoundError("Customer")
|
|
data = doc.to_dict() or {}
|
|
txns = list(data.get("transaction_history") or [])
|
|
if index < 0 or index >= len(txns):
|
|
raise HTTPException(status_code=404, detail="Transaction index out of range")
|
|
deleted_order_ref = txns[index].get("order_ref")
|
|
txns.pop(index)
|
|
now = datetime.utcnow().isoformat()
|
|
doc_ref.update({"transaction_history": txns, "updated_at": now})
|
|
if deleted_order_ref:
|
|
_recalculate_order_payment_status(customer_id, deleted_order_ref)
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
# ── Relationship Status ───────────────────────────────────────────────────────
|
|
|
|
def update_relationship_status(customer_id: str, status: str) -> CustomerInDB:
|
|
VALID = {"lead", "prospect", "active", "inactive", "churned"}
|
|
if status not in VALID:
|
|
raise HTTPException(status_code=422, detail=f"Invalid relationship_status: {status}")
|
|
db = get_db()
|
|
doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id)
|
|
if not doc_ref.get().exists:
|
|
raise NotFoundError("Customer")
|
|
|
|
# Failsafe: cannot manually mark inactive if open (non-terminal) orders exist
|
|
if status == "inactive":
|
|
TERMINAL = {"declined", "complete"}
|
|
open_orders = [
|
|
doc for doc in _order_collection(customer_id).stream()
|
|
if (doc.to_dict() or {}).get("status", "") not in TERMINAL
|
|
]
|
|
if open_orders:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=(
|
|
f"Cannot mark as inactive: {len(open_orders)} open order(s) still exist. "
|
|
"Please resolve all orders before changing the status."
|
|
),
|
|
)
|
|
|
|
doc_ref.update({"relationship_status": status, "updated_at": datetime.utcnow().isoformat()})
|
|
return _doc_to_customer(doc_ref.get())
|
|
|
|
|
|
# ── Comms Log (SQLite, async) ─────────────────────────────────────────────────
|
|
|
|
def _row_to_comm(row: dict) -> CommInDB:
|
|
row = dict(row)
|
|
raw_attachments = json.loads(row.get("attachments") or "[]")
|
|
# Normalise attachment dicts — tolerate both synced (content_type/size) and
|
|
# sent (nextcloud_path) shapes so Pydantic never sees missing required fields.
|
|
row["attachments"] = [
|
|
{k: v for k, v in a.items() if k in ("filename", "nextcloud_path", "content_type", "size")}
|
|
for a in raw_attachments if isinstance(a, dict) and a.get("filename")
|
|
]
|
|
if row.get("to_addrs") and isinstance(row["to_addrs"], str):
|
|
try:
|
|
row["to_addrs"] = json.loads(row["to_addrs"])
|
|
except Exception:
|
|
row["to_addrs"] = []
|
|
# SQLite stores booleans as integers
|
|
row["is_important"] = bool(row.get("is_important", 0))
|
|
row["is_read"] = bool(row.get("is_read", 0))
|
|
return CommInDB(**{k: v for k, v in row.items() if k in CommInDB.model_fields})
|
|
|
|
|
|
async def list_comms(
|
|
customer_id: str,
|
|
type: str | None = None,
|
|
direction: str | None = None,
|
|
limit: int = 100,
|
|
) -> list[CommInDB]:
|
|
db = await mqtt_db.get_db()
|
|
where = ["customer_id = ?"]
|
|
params: list = [customer_id]
|
|
if type:
|
|
where.append("type = ?")
|
|
params.append(type)
|
|
if direction:
|
|
where.append("direction = ?")
|
|
params.append(direction)
|
|
clause = " AND ".join(where)
|
|
rows = await db.execute_fetchall(
|
|
f"SELECT * FROM crm_comms_log WHERE {clause} ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?",
|
|
params + [limit],
|
|
)
|
|
entries = [_row_to_comm(dict(r)) for r in rows]
|
|
|
|
# Fallback: include unlinked email rows (customer_id NULL) if addresses match this customer.
|
|
# This covers historical rows created before automatic outbound customer linking.
|
|
fs = get_db()
|
|
doc = fs.collection("crm_customers").document(customer_id).get()
|
|
if doc.exists:
|
|
data = doc.to_dict() or {}
|
|
customer_emails = {
|
|
(c.get("value") or "").strip().lower()
|
|
for c in (data.get("contacts") or [])
|
|
if c.get("type") == "email" and c.get("value")
|
|
}
|
|
else:
|
|
customer_emails = set()
|
|
|
|
if customer_emails:
|
|
extra_where = [
|
|
"type = 'email'",
|
|
"(customer_id IS NULL OR customer_id = '')",
|
|
]
|
|
extra_params: list = []
|
|
if direction:
|
|
extra_where.append("direction = ?")
|
|
extra_params.append(direction)
|
|
extra_clause = " AND ".join(extra_where)
|
|
extra_rows = await db.execute_fetchall(
|
|
f"SELECT * FROM crm_comms_log WHERE {extra_clause} "
|
|
"ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?",
|
|
extra_params + [max(limit, 300)],
|
|
)
|
|
for r in extra_rows:
|
|
e = _row_to_comm(dict(r))
|
|
from_addr = (e.from_addr or "").strip().lower()
|
|
to_addrs = [(a or "").strip().lower() for a in (e.to_addrs or [])]
|
|
matched = (from_addr in customer_emails) or any(a in customer_emails for a in to_addrs)
|
|
if matched:
|
|
entries.append(e)
|
|
|
|
# De-duplicate and sort consistently
|
|
uniq = {e.id: e for e in entries}
|
|
sorted_entries = sorted(
|
|
uniq.values(),
|
|
key=lambda e: ((e.occurred_at or e.created_at or ""), (e.created_at or ""), (e.id or "")),
|
|
reverse=True,
|
|
)
|
|
return sorted_entries[:limit]
|
|
|
|
|
|
async def list_all_emails(
|
|
direction: str | None = None,
|
|
customers_only: bool = False,
|
|
mail_accounts: list[str] | None = None,
|
|
limit: int = 500,
|
|
) -> list[CommInDB]:
|
|
db = await mqtt_db.get_db()
|
|
where = ["type = 'email'"]
|
|
params: list = []
|
|
if direction:
|
|
where.append("direction = ?")
|
|
params.append(direction)
|
|
if customers_only:
|
|
where.append("customer_id IS NOT NULL")
|
|
if mail_accounts:
|
|
placeholders = ",".join("?" for _ in mail_accounts)
|
|
where.append(f"mail_account IN ({placeholders})")
|
|
params.extend(mail_accounts)
|
|
clause = f"WHERE {' AND '.join(where)}"
|
|
rows = await db.execute_fetchall(
|
|
f"SELECT * FROM crm_comms_log {clause} ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?",
|
|
params + [limit],
|
|
)
|
|
return [_row_to_comm(dict(r)) for r in rows]
|
|
|
|
|
|
async def list_all_comms(
|
|
type: str | None = None,
|
|
direction: str | None = None,
|
|
limit: int = 200,
|
|
) -> list[CommInDB]:
|
|
db = await mqtt_db.get_db()
|
|
where = []
|
|
params: list = []
|
|
if type:
|
|
where.append("type = ?")
|
|
params.append(type)
|
|
if direction:
|
|
where.append("direction = ?")
|
|
params.append(direction)
|
|
clause = f"WHERE {' AND '.join(where)}" if where else ""
|
|
rows = await db.execute_fetchall(
|
|
f"SELECT * FROM crm_comms_log {clause} ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?",
|
|
params + [limit],
|
|
)
|
|
return [_row_to_comm(dict(r)) for r in rows]
|
|
|
|
|
|
async def get_comm(comm_id: str) -> CommInDB:
|
|
db = await mqtt_db.get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT * FROM crm_comms_log WHERE id = ?", (comm_id,)
|
|
)
|
|
if not rows:
|
|
raise HTTPException(status_code=404, detail="Comm entry not found")
|
|
return _row_to_comm(dict(rows[0]))
|
|
|
|
|
|
async def create_comm(data: CommCreate) -> CommInDB:
|
|
db = await mqtt_db.get_db()
|
|
now = datetime.utcnow().isoformat()
|
|
comm_id = str(uuid.uuid4())
|
|
occurred_at = data.occurred_at or now
|
|
attachments_json = json.dumps([a.model_dump() for a in data.attachments])
|
|
|
|
await db.execute(
|
|
"""INSERT INTO crm_comms_log
|
|
(id, customer_id, type, mail_account, direction, subject, body, attachments,
|
|
ext_message_id, logged_by, occurred_at, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(comm_id, data.customer_id, data.type.value, data.mail_account, data.direction.value,
|
|
data.subject, data.body, attachments_json,
|
|
data.ext_message_id, data.logged_by, occurred_at, now),
|
|
)
|
|
await db.commit()
|
|
return await get_comm(comm_id)
|
|
|
|
|
|
async def update_comm(comm_id: str, data: CommUpdate) -> CommInDB:
|
|
db = await mqtt_db.get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT id FROM crm_comms_log WHERE id = ?", (comm_id,)
|
|
)
|
|
if not rows:
|
|
raise HTTPException(status_code=404, detail="Comm entry not found")
|
|
|
|
updates = data.model_dump(exclude_none=True)
|
|
if not updates:
|
|
return await get_comm(comm_id)
|
|
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
await db.execute(
|
|
f"UPDATE crm_comms_log SET {set_clause} WHERE id = ?",
|
|
list(updates.values()) + [comm_id],
|
|
)
|
|
await db.commit()
|
|
return await get_comm(comm_id)
|
|
|
|
|
|
async def delete_comm(comm_id: str) -> None:
|
|
db = await mqtt_db.get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT id FROM crm_comms_log WHERE id = ?", (comm_id,)
|
|
)
|
|
if not rows:
|
|
raise HTTPException(status_code=404, detail="Comm entry not found")
|
|
await db.execute("DELETE FROM crm_comms_log WHERE id = ?", (comm_id,))
|
|
await db.commit()
|
|
|
|
|
|
async def delete_comms_bulk(ids: list[str]) -> int:
|
|
"""Delete multiple comm entries. Returns count deleted."""
|
|
if not ids:
|
|
return 0
|
|
db = await mqtt_db.get_db()
|
|
placeholders = ",".join("?" for _ in ids)
|
|
cursor = await db.execute(
|
|
f"DELETE FROM crm_comms_log WHERE id IN ({placeholders})", ids
|
|
)
|
|
await db.commit()
|
|
return cursor.rowcount
|
|
|
|
|
|
async def set_comm_important(comm_id: str, important: bool) -> CommInDB:
|
|
db = await mqtt_db.get_db()
|
|
await db.execute(
|
|
"UPDATE crm_comms_log SET is_important = ? WHERE id = ?",
|
|
(1 if important else 0, comm_id),
|
|
)
|
|
await db.commit()
|
|
return await get_comm(comm_id)
|
|
|
|
|
|
async def set_comm_read(comm_id: str, read: bool) -> CommInDB:
|
|
db = await mqtt_db.get_db()
|
|
await db.execute(
|
|
"UPDATE crm_comms_log SET is_read = ? WHERE id = ?",
|
|
(1 if read else 0, comm_id),
|
|
)
|
|
await db.commit()
|
|
return await get_comm(comm_id)
|
|
|
|
|
|
# ── Media (SQLite, async) ─────────────────────────────────────────────────────
|
|
|
|
def _row_to_media(row: dict) -> MediaInDB:
|
|
row = dict(row)
|
|
row["tags"] = json.loads(row.get("tags") or "[]")
|
|
return MediaInDB(**row)
|
|
|
|
|
|
async def list_media(
|
|
customer_id: str | None = None,
|
|
order_id: str | None = None,
|
|
) -> list[MediaInDB]:
|
|
db = await mqtt_db.get_db()
|
|
where = []
|
|
params: list = []
|
|
if customer_id:
|
|
where.append("customer_id = ?")
|
|
params.append(customer_id)
|
|
if order_id:
|
|
where.append("order_id = ?")
|
|
params.append(order_id)
|
|
clause = f"WHERE {' AND '.join(where)}" if where else ""
|
|
rows = await db.execute_fetchall(
|
|
f"SELECT * FROM crm_media {clause} ORDER BY created_at DESC",
|
|
params,
|
|
)
|
|
return [_row_to_media(dict(r)) for r in rows]
|
|
|
|
|
|
async def create_media(data: MediaCreate) -> MediaInDB:
|
|
db = await mqtt_db.get_db()
|
|
now = datetime.utcnow().isoformat()
|
|
media_id = str(uuid.uuid4())
|
|
tags_json = json.dumps(data.tags)
|
|
direction = data.direction.value if data.direction else None
|
|
|
|
await db.execute(
|
|
"""INSERT INTO crm_media
|
|
(id, customer_id, order_id, filename, nextcloud_path, mime_type,
|
|
direction, tags, uploaded_by, thumbnail_path, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(media_id, data.customer_id, data.order_id, data.filename,
|
|
data.nextcloud_path, data.mime_type, direction,
|
|
tags_json, data.uploaded_by, data.thumbnail_path, now),
|
|
)
|
|
await db.commit()
|
|
|
|
rows = await db.execute_fetchall(
|
|
"SELECT * FROM crm_media WHERE id = ?", (media_id,)
|
|
)
|
|
return _row_to_media(dict(rows[0]))
|
|
|
|
|
|
async def delete_media(media_id: str) -> None:
|
|
db = await mqtt_db.get_db()
|
|
rows = await db.execute_fetchall(
|
|
"SELECT id FROM crm_media WHERE id = ?", (media_id,)
|
|
)
|
|
if not rows:
|
|
raise HTTPException(status_code=404, detail="Media entry not found")
|
|
await db.execute("DELETE FROM crm_media WHERE id = ?", (media_id,))
|
|
await db.commit()
|
|
|
|
|
|
# ── Background polling ────────────────────────────────────────────────────────
|
|
|
|
PRE_MFG_STATUSES = {"negotiating", "awaiting_quotation", "awaiting_customer_confirmation", "awaiting_fulfilment", "awaiting_payment"}
|
|
TERMINAL_STATUSES = {"declined", "complete"}
|
|
|
|
|
|
def poll_crm_customer_statuses() -> None:
|
|
"""
|
|
Two checks run daily:
|
|
|
|
1. Active + open pre-mfg order + 12+ months since last comm → churn.
|
|
2. Inactive + has any open (non-terminal) order → flip back to active.
|
|
"""
|
|
db = get_db()
|
|
now = datetime.utcnow()
|
|
|
|
for doc in db.collection(CUSTOMERS_COLLECTION).stream():
|
|
try:
|
|
data = doc.to_dict() or {}
|
|
rel_status = data.get("relationship_status", "lead")
|
|
summary = data.get("crm_summary") or {}
|
|
all_statuses = summary.get("all_orders_statuses") or []
|
|
|
|
# ── Check 1: active + silent 12 months on a pre-mfg order → churned ──
|
|
if rel_status == "active":
|
|
has_open_pre_mfg = any(s in PRE_MFG_STATUSES for s in all_statuses)
|
|
if not has_open_pre_mfg:
|
|
continue
|
|
|
|
# Find last comm date from SQLite comms table
|
|
# (comms are stored in SQLite, keyed by customer_id)
|
|
# We rely on crm_summary not having this; use Firestore comms subcollection as fallback
|
|
# The last_comm_date is passed from the frontend; here we use the comms subcollection
|
|
comms = list(db.collection(CUSTOMERS_COLLECTION).document(doc.id).collection("comms").stream())
|
|
if not comms:
|
|
continue
|
|
latest_date_str = max((c.to_dict().get("date") or "") for c in comms)
|
|
if not latest_date_str:
|
|
continue
|
|
last_contact = datetime.fromisoformat(latest_date_str.rstrip("Z").split("+")[0])
|
|
days_since = (now - last_contact).days
|
|
if days_since >= 365:
|
|
db.collection(CUSTOMERS_COLLECTION).document(doc.id).update({
|
|
"relationship_status": "churned",
|
|
"updated_at": now.isoformat(),
|
|
})
|
|
print(f"[CRM POLL] {doc.id} → churned ({days_since}d silent, open pre-mfg order)")
|
|
|
|
# ── Check 2: inactive + open orders exist → flip back to active ──
|
|
elif rel_status == "inactive":
|
|
has_open = any(s not in TERMINAL_STATUSES for s in all_statuses)
|
|
if has_open:
|
|
db.collection(CUSTOMERS_COLLECTION).document(doc.id).update({
|
|
"relationship_status": "active",
|
|
"updated_at": now.isoformat(),
|
|
})
|
|
print(f"[CRM POLL] {doc.id} → active (inactive but has open orders)")
|
|
|
|
except Exception as e:
|
|
print(f"[CRM POLL] Error processing customer {doc.id}: {e}")
|