import asyncio import json import uuid from datetime import datetime from fastapi import HTTPException from shared.firebase import get_db from shared.exceptions import NotFoundError import re as _re import database as mqtt_db from crm.models import ( ProductCreate, ProductUpdate, ProductInDB, CustomerCreate, CustomerUpdate, CustomerInDB, OrderCreate, OrderUpdate, OrderInDB, CommCreate, CommUpdate, CommInDB, MediaCreate, MediaInDB, TechnicalIssue, InstallSupportEntry, TransactionEntry, ) COLLECTION = "crm_products" def _doc_to_product(doc) -> ProductInDB: data = doc.to_dict() # Backfill bilingual fields for existing products that predate the feature if not data.get("name_en") and data.get("name"): data["name_en"] = data["name"] if not data.get("name_gr") and data.get("name"): data["name_gr"] = data["name"] return ProductInDB(id=doc.id, **data) def list_products( search: str | None = None, category: str | None = None, active_only: bool = False, ) -> list[ProductInDB]: db = get_db() query = db.collection(COLLECTION) if active_only: query = query.where("active", "==", True) if category: query = query.where("category", "==", category) results = [] for doc in query.stream(): product = _doc_to_product(doc) if search: s = search.lower() if not ( s in (product.name or "").lower() or s in (product.sku or "").lower() or s in (product.description or "").lower() ): continue results.append(product) return results def get_product(product_id: str) -> ProductInDB: db = get_db() doc = db.collection(COLLECTION).document(product_id).get() if not doc.exists: raise NotFoundError("Product") return _doc_to_product(doc) def create_product(data: ProductCreate) -> ProductInDB: db = get_db() now = datetime.utcnow().isoformat() product_id = str(uuid.uuid4()) doc_data = data.model_dump() doc_data["created_at"] = now doc_data["updated_at"] = now # Serialize nested enums/models if doc_data.get("category"): doc_data["category"] = doc_data["category"].value if hasattr(doc_data["category"], "value") else doc_data["category"] if doc_data.get("costs") and hasattr(doc_data["costs"], "model_dump"): doc_data["costs"] = doc_data["costs"].model_dump() if doc_data.get("stock") and hasattr(doc_data["stock"], "model_dump"): doc_data["stock"] = doc_data["stock"].model_dump() db.collection(COLLECTION).document(product_id).set(doc_data) return ProductInDB(id=product_id, **doc_data) def update_product(product_id: str, data: ProductUpdate) -> ProductInDB: db = get_db() doc_ref = db.collection(COLLECTION).document(product_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Product") update_data = data.model_dump(exclude_none=True) update_data["updated_at"] = datetime.utcnow().isoformat() if "category" in update_data and hasattr(update_data["category"], "value"): update_data["category"] = update_data["category"].value if "costs" in update_data and hasattr(update_data["costs"], "model_dump"): update_data["costs"] = update_data["costs"].model_dump() if "stock" in update_data and hasattr(update_data["stock"], "model_dump"): update_data["stock"] = update_data["stock"].model_dump() doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_product(updated_doc) def delete_product(product_id: str) -> None: db = get_db() doc_ref = db.collection(COLLECTION).document(product_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Product") doc_ref.delete() # ── Customers ──────────────────────────────────────────────────────────────── CUSTOMERS_COLLECTION = "crm_customers" _LEGACY_CUSTOMER_FIELDS = {"negotiating", "has_problem"} def _doc_to_customer(doc) -> CustomerInDB: data = doc.to_dict() for f in _LEGACY_CUSTOMER_FIELDS: data.pop(f, None) return CustomerInDB(id=doc.id, **data) def list_customers( search: str | None = None, tag: str | None = None, sort: str | None = None, ) -> list[CustomerInDB]: db = get_db() query = db.collection(CUSTOMERS_COLLECTION) if tag: query = query.where("tags", "array_contains", tag) results = [] for doc in query.stream(): customer = _doc_to_customer(doc) if search: s = search.lower() s_nospace = s.replace(" ", "") name_match = s in (customer.name or "").lower() surname_match = s in (customer.surname or "").lower() org_match = s in (customer.organization or "").lower() religion_match = s in (customer.religion or "").lower() language_match = s in (customer.language or "").lower() contact_match = any( s_nospace in (c.value or "").lower().replace(" ", "") or s in (c.value or "").lower() for c in (customer.contacts or []) ) loc = customer.location loc_match = bool(loc) and ( s in (loc.address or "").lower() or s in (loc.city or "").lower() or s in (loc.postal_code or "").lower() or s in (loc.region or "").lower() or s in (loc.country or "").lower() ) tag_match = any(s in (t or "").lower() for t in (customer.tags or [])) if not (name_match or surname_match or org_match or religion_match or language_match or contact_match or loc_match or tag_match): continue results.append(customer) # Sorting (non-latest_comm; latest_comm is handled by the async router wrapper) _TITLES = {"fr.", "rev.", "archim.", "bp.", "abp.", "met.", "mr.", "mrs.", "ms.", "dr.", "prof."} def _sort_name(c): return (c.name or "").lower() def _sort_surname(c): return (c.surname or "").lower() def _sort_default(c): return c.created_at or "" if sort == "name": results.sort(key=_sort_name) elif sort == "surname": results.sort(key=_sort_surname) elif sort == "default": results.sort(key=_sort_default) return results def list_all_tags() -> list[str]: db = get_db() tags: set[str] = set() for doc in db.collection(CUSTOMERS_COLLECTION).select(["tags"]).stream(): data = doc.to_dict() for tag in (data.get("tags") or []): if tag: tags.add(tag) return sorted(tags) def get_customer(customer_id: str) -> CustomerInDB: db = get_db() doc = db.collection(CUSTOMERS_COLLECTION).document(customer_id).get() if not doc.exists: raise NotFoundError("Customer") return _doc_to_customer(doc) def get_customer_nc_path(customer: CustomerInDB) -> str: """Return the Nextcloud folder slug for a customer. Falls back to UUID for legacy records.""" return customer.folder_id if customer.folder_id else customer.id def create_customer(data: CustomerCreate) -> CustomerInDB: db = get_db() # Validate folder_id if not data.folder_id or not data.folder_id.strip(): raise HTTPException(status_code=422, detail="Internal Folder ID is required.") folder_id = data.folder_id.strip().lower() if not _re.match(r'^[a-z0-9][a-z0-9\-]*[a-z0-9]$', folder_id): raise HTTPException( status_code=422, detail="Internal Folder ID must contain only lowercase letters, numbers, and hyphens, and cannot start or end with a hyphen.", ) # Check uniqueness existing = list(db.collection(CUSTOMERS_COLLECTION).where("folder_id", "==", folder_id).limit(1).stream()) if existing: raise HTTPException(status_code=409, detail=f"A customer with folder ID '{folder_id}' already exists.") now = datetime.utcnow().isoformat() customer_id = str(uuid.uuid4()) doc_data = data.model_dump() doc_data["folder_id"] = folder_id doc_data["created_at"] = now doc_data["updated_at"] = now db.collection(CUSTOMERS_COLLECTION).document(customer_id).set(doc_data) return CustomerInDB(id=customer_id, **doc_data) def update_customer(customer_id: str, data: CustomerUpdate) -> CustomerInDB: from google.cloud.firestore_v1 import DELETE_FIELD db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") update_data = data.model_dump(exclude_none=True) update_data["updated_at"] = datetime.utcnow().isoformat() # Fields that should be explicitly deleted from Firestore when set to None # (exclude_none=True would just skip them, leaving the old value intact) NULLABLE_FIELDS = {"title", "surname", "organization", "religion"} set_fields = data.model_fields_set for field in NULLABLE_FIELDS: if field in set_fields and getattr(data, field) is None: update_data[field] = DELETE_FIELD doc_ref.update(update_data) updated_doc = doc_ref.get() return _doc_to_customer(updated_doc) async def get_last_comm_direction(customer_id: str) -> dict: """Return direction ('inbound'/'outbound') and timestamp of the most recent comm, or None.""" db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT direction, COALESCE(occurred_at, created_at) as ts FROM crm_comms_log WHERE customer_id = ? " "AND direction IN ('inbound', 'outbound') " "ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT 1", (customer_id,), ) if rows: return {"direction": rows[0][0], "occurred_at": rows[0][1]} return {"direction": None, "occurred_at": None} async def get_last_comm_timestamp(customer_id: str) -> str | None: """Return the ISO timestamp of the most recent comm for this customer, or None.""" db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT COALESCE(occurred_at, created_at) as ts FROM crm_comms_log " "WHERE customer_id = ? ORDER BY ts DESC LIMIT 1", (customer_id,), ) if rows: return rows[0][0] return None async def list_customers_sorted_by_latest_comm(customers: list[CustomerInDB]) -> list[CustomerInDB]: """Re-sort a list of customers so those with the most recent comm come first.""" timestamps = await asyncio.gather( *[get_last_comm_timestamp(c.id) for c in customers] ) paired = list(zip(customers, timestamps)) paired.sort(key=lambda x: x[1] or "", reverse=True) return [c for c, _ in paired] def delete_customer(customer_id: str) -> CustomerInDB: """Delete customer from Firestore. Returns the customer data (for NC path lookup).""" db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") customer = _doc_to_customer(doc) doc_ref.delete() return customer async def delete_customer_comms(customer_id: str) -> int: """Delete all comm log entries for a customer. Returns count deleted.""" db = await mqtt_db.get_db() cursor = await db.execute( "DELETE FROM crm_comms_log WHERE customer_id = ?", (customer_id,) ) await db.commit() return cursor.rowcount async def delete_customer_media_entries(customer_id: str) -> int: """Delete all media DB entries for a customer. Returns count deleted.""" db = await mqtt_db.get_db() cursor = await db.execute( "DELETE FROM crm_media WHERE customer_id = ?", (customer_id,) ) await db.commit() return cursor.rowcount # ── Orders (subcollection under customers/{id}/orders) ──────────────────────── def _doc_to_order(doc) -> OrderInDB: data = doc.to_dict() return OrderInDB(id=doc.id, **data) def _order_collection(customer_id: str): db = get_db() return db.collection(CUSTOMERS_COLLECTION).document(customer_id).collection("orders") def _generate_order_number(customer_id: str) -> str: """Generate next ORD-DDMMYY-NNN across all customers using collection group query.""" db = get_db() now = datetime.utcnow() prefix = f"ORD-{now.strftime('%d%m%y')}-" max_n = 0 for doc in db.collection_group("orders").stream(): data = doc.to_dict() num = data.get("order_number", "") if num and num.startswith(prefix): try: n = int(num[len(prefix):]) if n > max_n: max_n = n except ValueError: pass return f"{prefix}{max_n + 1:03d}" def _default_payment_status() -> dict: return { "required_amount": 0, "received_amount": 0, "balance_due": 0, "advance_required": False, "advance_amount": None, "payment_complete": False, } def _recalculate_order_payment_status(customer_id: str, order_id: str) -> None: """Recompute an order's payment_status from transaction_history on the customer.""" db = get_db() cust_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) cust_data = (cust_ref.get().to_dict()) or {} txns = cust_data.get("transaction_history") or [] required = sum(float(t.get("amount") or 0) for t in txns if t.get("order_ref") == order_id and t.get("flow") == "invoice") received = sum(float(t.get("amount") or 0) for t in txns if t.get("order_ref") == order_id and t.get("flow") == "payment") balance_due = required - received payment_complete = (required > 0 and balance_due <= 0) order_ref = _order_collection(customer_id).document(order_id) if not order_ref.get().exists: return order_ref.update({ "payment_status": { "required_amount": required, "received_amount": received, "balance_due": balance_due, "advance_required": False, "advance_amount": None, "payment_complete": payment_complete, }, "updated_at": datetime.utcnow().isoformat(), }) def _update_crm_summary(customer_id: str) -> None: """Recompute and store the crm_summary field on the customer document.""" db = get_db() customer_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) # Load customer for issue/support arrays customer_doc = customer_ref.get() if not customer_doc.exists: return customer_data = customer_doc.to_dict() or {} # Active issues issues = customer_data.get("technical_issues") or [] active_issues = [i for i in issues if i.get("active")] active_issues_count = len(active_issues) latest_issue_date = None if active_issues: latest_issue_date = max((i.get("opened_date") or "") for i in active_issues) or None # Active support support = customer_data.get("install_support") or [] active_support = [s for s in support if s.get("active")] active_support_count = len(active_support) latest_support_date = None if active_support: latest_support_date = max((s.get("opened_date") or "") for s in active_support) or None # Active order (most recent non-terminal status) TERMINAL_STATUSES = {"declined", "complete"} active_order_status = None active_order_status_date = None active_order_title = None active_order_number = None latest_order_date = "" all_order_statuses = [] for doc in _order_collection(customer_id).stream(): data = doc.to_dict() or {} status = data.get("status", "") all_order_statuses.append(status) if status not in TERMINAL_STATUSES: upd = data.get("status_updated_date") or data.get("created_at") or "" if upd > latest_order_date: latest_order_date = upd active_order_status = status active_order_status_date = upd active_order_title = data.get("title") active_order_number = data.get("order_number") summary = { "active_order_status": active_order_status, "active_order_status_date": active_order_status_date, "active_order_title": active_order_title, "active_order_number": active_order_number, "all_orders_statuses": all_order_statuses, "active_issues_count": active_issues_count, "latest_issue_date": latest_issue_date, "active_support_count": active_support_count, "latest_support_date": latest_support_date, } customer_ref.update({"crm_summary": summary, "updated_at": datetime.utcnow().isoformat()}) def list_orders(customer_id: str) -> list[OrderInDB]: return [_doc_to_order(doc) for doc in _order_collection(customer_id).stream()] def list_all_orders(status: str | None = None) -> list[OrderInDB]: """Query across all customers using Firestore collection group.""" db = get_db() query = db.collection_group("orders") if status: query = query.where("status", "==", status) return [_doc_to_order(doc) for doc in query.stream()] def get_order(customer_id: str, order_id: str) -> OrderInDB: doc = _order_collection(customer_id).document(order_id).get() if not doc.exists: raise NotFoundError("Order") return _doc_to_order(doc) def create_order(customer_id: str, data: OrderCreate) -> OrderInDB: col = _order_collection(customer_id) now = datetime.utcnow().isoformat() order_id = str(uuid.uuid4()) doc_data = data.model_dump() doc_data["customer_id"] = customer_id if not doc_data.get("order_number"): doc_data["order_number"] = _generate_order_number(customer_id) if not doc_data.get("payment_status"): doc_data["payment_status"] = _default_payment_status() if not doc_data.get("status_updated_date"): doc_data["status_updated_date"] = now doc_data["created_at"] = now doc_data["updated_at"] = now col.document(order_id).set(doc_data) _update_crm_summary(customer_id) return OrderInDB(id=order_id, **doc_data) def update_order(customer_id: str, order_id: str, data: OrderUpdate) -> OrderInDB: doc_ref = _order_collection(customer_id).document(order_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Order") update_data = data.model_dump(exclude_none=True) update_data["updated_at"] = datetime.utcnow().isoformat() doc_ref.update(update_data) _update_crm_summary(customer_id) result = _doc_to_order(doc_ref.get()) # Auto-mark customer as inactive when all orders are complete if update_data.get("status") == "complete": all_orders = list_orders(customer_id) if all_orders and all(o.status == "complete" for o in all_orders): db = get_db() db.collection(CUSTOMERS_COLLECTION).document(customer_id).update({ "relationship_status": "inactive", "updated_at": datetime.utcnow().isoformat(), }) return result def delete_order(customer_id: str, order_id: str) -> None: doc_ref = _order_collection(customer_id).document(order_id) if not doc_ref.get().exists: raise NotFoundError("Order") doc_ref.delete() _update_crm_summary(customer_id) def append_timeline_event(customer_id: str, order_id: str, event: dict) -> OrderInDB: from google.cloud.firestore_v1 import ArrayUnion doc_ref = _order_collection(customer_id).document(order_id) if not doc_ref.get().exists: raise NotFoundError("Order") now = datetime.utcnow().isoformat() doc_ref.update({ "timeline": ArrayUnion([event]), "status_updated_date": event.get("date", now), "status_updated_by": event.get("updated_by", ""), "updated_at": now, }) return _doc_to_order(doc_ref.get()) def update_timeline_event(customer_id: str, order_id: str, index: int, data: dict) -> OrderInDB: doc_ref = _order_collection(customer_id).document(order_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Order") timeline = list(doc.to_dict().get("timeline") or []) if index < 0 or index >= len(timeline): raise HTTPException(status_code=404, detail="Timeline index out of range") timeline[index] = {**timeline[index], **data} doc_ref.update({"timeline": timeline, "updated_at": datetime.utcnow().isoformat()}) return _doc_to_order(doc_ref.get()) def delete_timeline_event(customer_id: str, order_id: str, index: int) -> OrderInDB: doc_ref = _order_collection(customer_id).document(order_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Order") timeline = list(doc.to_dict().get("timeline") or []) if index < 0 or index >= len(timeline): raise HTTPException(status_code=404, detail="Timeline index out of range") timeline.pop(index) doc_ref.update({"timeline": timeline, "updated_at": datetime.utcnow().isoformat()}) return _doc_to_order(doc_ref.get()) def update_order_payment_status(customer_id: str, order_id: str, payment_data: dict) -> OrderInDB: doc_ref = _order_collection(customer_id).document(order_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Order") existing = doc.to_dict().get("payment_status") or _default_payment_status() existing.update({k: v for k, v in payment_data.items() if v is not None}) doc_ref.update({ "payment_status": existing, "updated_at": datetime.utcnow().isoformat(), }) return _doc_to_order(doc_ref.get()) def init_negotiations(customer_id: str, title: str, note: str, date: str, created_by: str) -> OrderInDB: """Create a new order with status=negotiating and bump customer relationship_status if needed.""" db = get_db() customer_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) customer_doc = customer_ref.get() if not customer_doc.exists: raise NotFoundError("Customer") now = datetime.utcnow().isoformat() order_id = str(uuid.uuid4()) timeline_event = { "date": date or now, "type": "note", "note": note or "", "updated_by": created_by, } doc_data = { "customer_id": customer_id, "order_number": _generate_order_number(customer_id), "title": title, "created_by": created_by, "status": "negotiating", "status_updated_date": date or now, "status_updated_by": created_by, "items": [], "subtotal": 0, "discount": None, "total_price": 0, "currency": "EUR", "shipping": None, "payment_status": _default_payment_status(), "invoice_path": None, "notes": note or "", "timeline": [timeline_event], "created_at": now, "updated_at": now, } _order_collection(customer_id).document(order_id).set(doc_data) # Upgrade relationship_status only if currently lead or prospect current_data = customer_doc.to_dict() or {} current_rel = current_data.get("relationship_status", "lead") if current_rel in ("lead", "prospect"): customer_ref.update({"relationship_status": "active", "updated_at": now}) _update_crm_summary(customer_id) return OrderInDB(id=order_id, **doc_data) # ── Technical Issues & Install Support ──────────────────────────────────────── def add_technical_issue(customer_id: str, note: str, opened_by: str, date: str | None = None) -> CustomerInDB: from google.cloud.firestore_v1 import ArrayUnion db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) if not doc_ref.get().exists: raise NotFoundError("Customer") now = datetime.utcnow().isoformat() issue = { "active": True, "opened_date": date or now, "resolved_date": None, "note": note, "opened_by": opened_by, "resolved_by": None, } doc_ref.update({"technical_issues": ArrayUnion([issue]), "updated_at": now}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def resolve_technical_issue(customer_id: str, index: int, resolved_by: str) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} issues = list(data.get("technical_issues") or []) if index < 0 or index >= len(issues): raise HTTPException(status_code=404, detail="Issue index out of range") now = datetime.utcnow().isoformat() issues[index] = {**issues[index], "active": False, "resolved_date": now, "resolved_by": resolved_by} doc_ref.update({"technical_issues": issues, "updated_at": now}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def add_install_support(customer_id: str, note: str, opened_by: str, date: str | None = None) -> CustomerInDB: from google.cloud.firestore_v1 import ArrayUnion db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) if not doc_ref.get().exists: raise NotFoundError("Customer") now = datetime.utcnow().isoformat() entry = { "active": True, "opened_date": date or now, "resolved_date": None, "note": note, "opened_by": opened_by, "resolved_by": None, } doc_ref.update({"install_support": ArrayUnion([entry]), "updated_at": now}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def resolve_install_support(customer_id: str, index: int, resolved_by: str) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} entries = list(data.get("install_support") or []) if index < 0 or index >= len(entries): raise HTTPException(status_code=404, detail="Support index out of range") now = datetime.utcnow().isoformat() entries[index] = {**entries[index], "active": False, "resolved_date": now, "resolved_by": resolved_by} doc_ref.update({"install_support": entries, "updated_at": now}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def edit_technical_issue(customer_id: str, index: int, note: str, opened_date: str | None = None) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} issues = list(data.get("technical_issues") or []) if index < 0 or index >= len(issues): raise HTTPException(status_code=404, detail="Issue index out of range") issues[index] = {**issues[index], "note": note} if opened_date: issues[index]["opened_date"] = opened_date doc_ref.update({"technical_issues": issues, "updated_at": datetime.utcnow().isoformat()}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def delete_technical_issue(customer_id: str, index: int) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} issues = list(data.get("technical_issues") or []) if index < 0 or index >= len(issues): raise HTTPException(status_code=404, detail="Issue index out of range") issues.pop(index) doc_ref.update({"technical_issues": issues, "updated_at": datetime.utcnow().isoformat()}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def edit_install_support(customer_id: str, index: int, note: str, opened_date: str | None = None) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} entries = list(data.get("install_support") or []) if index < 0 or index >= len(entries): raise HTTPException(status_code=404, detail="Support index out of range") entries[index] = {**entries[index], "note": note} if opened_date: entries[index]["opened_date"] = opened_date doc_ref.update({"install_support": entries, "updated_at": datetime.utcnow().isoformat()}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) def delete_install_support(customer_id: str, index: int) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} entries = list(data.get("install_support") or []) if index < 0 or index >= len(entries): raise HTTPException(status_code=404, detail="Support index out of range") entries.pop(index) doc_ref.update({"install_support": entries, "updated_at": datetime.utcnow().isoformat()}) _update_crm_summary(customer_id) return _doc_to_customer(doc_ref.get()) # ── Transactions ────────────────────────────────────────────────────────────── def add_transaction(customer_id: str, entry: TransactionEntry) -> CustomerInDB: from google.cloud.firestore_v1 import ArrayUnion db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) if not doc_ref.get().exists: raise NotFoundError("Customer") now = datetime.utcnow().isoformat() doc_ref.update({"transaction_history": ArrayUnion([entry.model_dump()]), "updated_at": now}) if entry.order_ref: _recalculate_order_payment_status(customer_id, entry.order_ref) return _doc_to_customer(doc_ref.get()) def update_transaction(customer_id: str, index: int, entry: TransactionEntry) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} txns = list(data.get("transaction_history") or []) if index < 0 or index >= len(txns): raise HTTPException(status_code=404, detail="Transaction index out of range") txns[index] = entry.model_dump() now = datetime.utcnow().isoformat() doc_ref.update({"transaction_history": txns, "updated_at": now}) if entry.order_ref: _recalculate_order_payment_status(customer_id, entry.order_ref) return _doc_to_customer(doc_ref.get()) def delete_transaction(customer_id: str, index: int) -> CustomerInDB: db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) doc = doc_ref.get() if not doc.exists: raise NotFoundError("Customer") data = doc.to_dict() or {} txns = list(data.get("transaction_history") or []) if index < 0 or index >= len(txns): raise HTTPException(status_code=404, detail="Transaction index out of range") deleted_order_ref = txns[index].get("order_ref") txns.pop(index) now = datetime.utcnow().isoformat() doc_ref.update({"transaction_history": txns, "updated_at": now}) if deleted_order_ref: _recalculate_order_payment_status(customer_id, deleted_order_ref) return _doc_to_customer(doc_ref.get()) # ── Relationship Status ─────────────────────────────────────────────────────── def update_relationship_status(customer_id: str, status: str) -> CustomerInDB: VALID = {"lead", "prospect", "active", "inactive", "churned"} if status not in VALID: raise HTTPException(status_code=422, detail=f"Invalid relationship_status: {status}") db = get_db() doc_ref = db.collection(CUSTOMERS_COLLECTION).document(customer_id) if not doc_ref.get().exists: raise NotFoundError("Customer") # Failsafe: cannot manually mark inactive if open (non-terminal) orders exist if status == "inactive": TERMINAL = {"declined", "complete"} open_orders = [ doc for doc in _order_collection(customer_id).stream() if (doc.to_dict() or {}).get("status", "") not in TERMINAL ] if open_orders: raise HTTPException( status_code=409, detail=( f"Cannot mark as inactive: {len(open_orders)} open order(s) still exist. " "Please resolve all orders before changing the status." ), ) doc_ref.update({"relationship_status": status, "updated_at": datetime.utcnow().isoformat()}) return _doc_to_customer(doc_ref.get()) # ── Comms Log (SQLite, async) ───────────────────────────────────────────────── def _row_to_comm(row: dict) -> CommInDB: row = dict(row) raw_attachments = json.loads(row.get("attachments") or "[]") # Normalise attachment dicts — tolerate both synced (content_type/size) and # sent (nextcloud_path) shapes so Pydantic never sees missing required fields. row["attachments"] = [ {k: v for k, v in a.items() if k in ("filename", "nextcloud_path", "content_type", "size")} for a in raw_attachments if isinstance(a, dict) and a.get("filename") ] if row.get("to_addrs") and isinstance(row["to_addrs"], str): try: row["to_addrs"] = json.loads(row["to_addrs"]) except Exception: row["to_addrs"] = [] # SQLite stores booleans as integers row["is_important"] = bool(row.get("is_important", 0)) row["is_read"] = bool(row.get("is_read", 0)) return CommInDB(**{k: v for k, v in row.items() if k in CommInDB.model_fields}) async def list_comms( customer_id: str, type: str | None = None, direction: str | None = None, limit: int = 100, ) -> list[CommInDB]: db = await mqtt_db.get_db() where = ["customer_id = ?"] params: list = [customer_id] if type: where.append("type = ?") params.append(type) if direction: where.append("direction = ?") params.append(direction) clause = " AND ".join(where) rows = await db.execute_fetchall( f"SELECT * FROM crm_comms_log WHERE {clause} ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?", params + [limit], ) entries = [_row_to_comm(dict(r)) for r in rows] # Fallback: include unlinked email rows (customer_id NULL) if addresses match this customer. # This covers historical rows created before automatic outbound customer linking. fs = get_db() doc = fs.collection("crm_customers").document(customer_id).get() if doc.exists: data = doc.to_dict() or {} customer_emails = { (c.get("value") or "").strip().lower() for c in (data.get("contacts") or []) if c.get("type") == "email" and c.get("value") } else: customer_emails = set() if customer_emails: extra_where = [ "type = 'email'", "(customer_id IS NULL OR customer_id = '')", ] extra_params: list = [] if direction: extra_where.append("direction = ?") extra_params.append(direction) extra_clause = " AND ".join(extra_where) extra_rows = await db.execute_fetchall( f"SELECT * FROM crm_comms_log WHERE {extra_clause} " "ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?", extra_params + [max(limit, 300)], ) for r in extra_rows: e = _row_to_comm(dict(r)) from_addr = (e.from_addr or "").strip().lower() to_addrs = [(a or "").strip().lower() for a in (e.to_addrs or [])] matched = (from_addr in customer_emails) or any(a in customer_emails for a in to_addrs) if matched: entries.append(e) # De-duplicate and sort consistently uniq = {e.id: e for e in entries} sorted_entries = sorted( uniq.values(), key=lambda e: ((e.occurred_at or e.created_at or ""), (e.created_at or ""), (e.id or "")), reverse=True, ) return sorted_entries[:limit] async def list_all_emails( direction: str | None = None, customers_only: bool = False, mail_accounts: list[str] | None = None, limit: int = 500, ) -> list[CommInDB]: db = await mqtt_db.get_db() where = ["type = 'email'"] params: list = [] if direction: where.append("direction = ?") params.append(direction) if customers_only: where.append("customer_id IS NOT NULL") if mail_accounts: placeholders = ",".join("?" for _ in mail_accounts) where.append(f"mail_account IN ({placeholders})") params.extend(mail_accounts) clause = f"WHERE {' AND '.join(where)}" rows = await db.execute_fetchall( f"SELECT * FROM crm_comms_log {clause} ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?", params + [limit], ) return [_row_to_comm(dict(r)) for r in rows] async def list_all_comms( type: str | None = None, direction: str | None = None, limit: int = 200, ) -> list[CommInDB]: db = await mqtt_db.get_db() where = [] params: list = [] if type: where.append("type = ?") params.append(type) if direction: where.append("direction = ?") params.append(direction) clause = f"WHERE {' AND '.join(where)}" if where else "" rows = await db.execute_fetchall( f"SELECT * FROM crm_comms_log {clause} ORDER BY COALESCE(occurred_at, created_at) DESC, created_at DESC LIMIT ?", params + [limit], ) return [_row_to_comm(dict(r)) for r in rows] async def get_comm(comm_id: str) -> CommInDB: db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT * FROM crm_comms_log WHERE id = ?", (comm_id,) ) if not rows: raise HTTPException(status_code=404, detail="Comm entry not found") return _row_to_comm(dict(rows[0])) async def create_comm(data: CommCreate) -> CommInDB: db = await mqtt_db.get_db() now = datetime.utcnow().isoformat() comm_id = str(uuid.uuid4()) occurred_at = data.occurred_at or now attachments_json = json.dumps([a.model_dump() for a in data.attachments]) await db.execute( """INSERT INTO crm_comms_log (id, customer_id, type, mail_account, direction, subject, body, attachments, ext_message_id, logged_by, occurred_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (comm_id, data.customer_id, data.type.value, data.mail_account, data.direction.value, data.subject, data.body, attachments_json, data.ext_message_id, data.logged_by, occurred_at, now), ) await db.commit() return await get_comm(comm_id) async def update_comm(comm_id: str, data: CommUpdate) -> CommInDB: db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT id FROM crm_comms_log WHERE id = ?", (comm_id,) ) if not rows: raise HTTPException(status_code=404, detail="Comm entry not found") updates = data.model_dump(exclude_none=True) if not updates: return await get_comm(comm_id) set_clause = ", ".join(f"{k} = ?" for k in updates) await db.execute( f"UPDATE crm_comms_log SET {set_clause} WHERE id = ?", list(updates.values()) + [comm_id], ) await db.commit() return await get_comm(comm_id) async def delete_comm(comm_id: str) -> None: db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT id FROM crm_comms_log WHERE id = ?", (comm_id,) ) if not rows: raise HTTPException(status_code=404, detail="Comm entry not found") await db.execute("DELETE FROM crm_comms_log WHERE id = ?", (comm_id,)) await db.commit() async def delete_comms_bulk(ids: list[str]) -> int: """Delete multiple comm entries. Returns count deleted.""" if not ids: return 0 db = await mqtt_db.get_db() placeholders = ",".join("?" for _ in ids) cursor = await db.execute( f"DELETE FROM crm_comms_log WHERE id IN ({placeholders})", ids ) await db.commit() return cursor.rowcount async def set_comm_important(comm_id: str, important: bool) -> CommInDB: db = await mqtt_db.get_db() await db.execute( "UPDATE crm_comms_log SET is_important = ? WHERE id = ?", (1 if important else 0, comm_id), ) await db.commit() return await get_comm(comm_id) async def set_comm_read(comm_id: str, read: bool) -> CommInDB: db = await mqtt_db.get_db() await db.execute( "UPDATE crm_comms_log SET is_read = ? WHERE id = ?", (1 if read else 0, comm_id), ) await db.commit() return await get_comm(comm_id) # ── Media (SQLite, async) ───────────────────────────────────────────────────── def _row_to_media(row: dict) -> MediaInDB: row = dict(row) row["tags"] = json.loads(row.get("tags") or "[]") return MediaInDB(**row) async def list_media( customer_id: str | None = None, order_id: str | None = None, ) -> list[MediaInDB]: db = await mqtt_db.get_db() where = [] params: list = [] if customer_id: where.append("customer_id = ?") params.append(customer_id) if order_id: where.append("order_id = ?") params.append(order_id) clause = f"WHERE {' AND '.join(where)}" if where else "" rows = await db.execute_fetchall( f"SELECT * FROM crm_media {clause} ORDER BY created_at DESC", params, ) return [_row_to_media(dict(r)) for r in rows] async def create_media(data: MediaCreate) -> MediaInDB: db = await mqtt_db.get_db() now = datetime.utcnow().isoformat() media_id = str(uuid.uuid4()) tags_json = json.dumps(data.tags) direction = data.direction.value if data.direction else None await db.execute( """INSERT INTO crm_media (id, customer_id, order_id, filename, nextcloud_path, mime_type, direction, tags, uploaded_by, thumbnail_path, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (media_id, data.customer_id, data.order_id, data.filename, data.nextcloud_path, data.mime_type, direction, tags_json, data.uploaded_by, data.thumbnail_path, now), ) await db.commit() rows = await db.execute_fetchall( "SELECT * FROM crm_media WHERE id = ?", (media_id,) ) return _row_to_media(dict(rows[0])) async def delete_media(media_id: str) -> None: db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT id FROM crm_media WHERE id = ?", (media_id,) ) if not rows: raise HTTPException(status_code=404, detail="Media entry not found") await db.execute("DELETE FROM crm_media WHERE id = ?", (media_id,)) await db.commit() # ── Background polling ──────────────────────────────────────────────────────── PRE_MFG_STATUSES = {"negotiating", "awaiting_quotation", "awaiting_customer_confirmation", "awaiting_fulfilment", "awaiting_payment"} TERMINAL_STATUSES = {"declined", "complete"} def poll_crm_customer_statuses() -> None: """ Two checks run daily: 1. Active + open pre-mfg order + 12+ months since last comm → churn. 2. Inactive + has any open (non-terminal) order → flip back to active. """ db = get_db() now = datetime.utcnow() for doc in db.collection(CUSTOMERS_COLLECTION).stream(): try: data = doc.to_dict() or {} rel_status = data.get("relationship_status", "lead") summary = data.get("crm_summary") or {} all_statuses = summary.get("all_orders_statuses") or [] # ── Check 1: active + silent 12 months on a pre-mfg order → churned ── if rel_status == "active": has_open_pre_mfg = any(s in PRE_MFG_STATUSES for s in all_statuses) if not has_open_pre_mfg: continue # Find last comm date from SQLite comms table # (comms are stored in SQLite, keyed by customer_id) # We rely on crm_summary not having this; use Firestore comms subcollection as fallback # The last_comm_date is passed from the frontend; here we use the comms subcollection comms = list(db.collection(CUSTOMERS_COLLECTION).document(doc.id).collection("comms").stream()) if not comms: continue latest_date_str = max((c.to_dict().get("date") or "") for c in comms) if not latest_date_str: continue last_contact = datetime.fromisoformat(latest_date_str.rstrip("Z").split("+")[0]) days_since = (now - last_contact).days if days_since >= 365: db.collection(CUSTOMERS_COLLECTION).document(doc.id).update({ "relationship_status": "churned", "updated_at": now.isoformat(), }) print(f"[CRM POLL] {doc.id} → churned ({days_since}d silent, open pre-mfg order)") # ── Check 2: inactive + open orders exist → flip back to active ── elif rel_status == "inactive": has_open = any(s not in TERMINAL_STATUSES for s in all_statuses) if has_open: db.collection(CUSTOMERS_COLLECTION).document(doc.id).update({ "relationship_status": "active", "updated_at": now.isoformat(), }) print(f"[CRM POLL] {doc.id} → active (inactive but has open orders)") except Exception as e: print(f"[CRM POLL] Error processing customer {doc.id}: {e}")