Files
bellsystems-cp/backend/crm/email_sync.py

838 lines
32 KiB
Python

"""
IMAP email sync and SMTP email send for CRM.
Uses only stdlib imaplib/smtplib — no extra dependencies.
Sync is run in an executor to avoid blocking the event loop.
"""
import asyncio
import base64
import email
import email.header
import email.utils
import html.parser
import imaplib
import json
import logging
import re
import smtplib
import uuid
from datetime import datetime, timezone
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
from typing import List, Optional, Tuple
from config import settings
from mqtt import database as mqtt_db
from crm.mail_accounts import get_mail_accounts, account_by_key, account_by_email
logger = logging.getLogger("crm.email_sync")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _decode_header(raw: str) -> str:
"""Decode an RFC2047-encoded email header value."""
if not raw:
return ""
parts = email.header.decode_header(raw)
decoded = []
for part, enc in parts:
if isinstance(part, bytes):
decoded.append(part.decode(enc or "utf-8", errors="replace"))
else:
decoded.append(part)
return " ".join(decoded)
class _HTMLStripper(html.parser.HTMLParser):
def __init__(self):
super().__init__()
self._text = []
def handle_data(self, data):
self._text.append(data)
def get_text(self):
return " ".join(self._text)
def _strip_html(html_str: str) -> str:
s = _HTMLStripper()
s.feed(html_str)
return s.get_text()
def _extract_inline_data_images(html_body: str) -> tuple[str, list[tuple[str, bytes, str]]]:
"""Replace data-URI images in HTML with cid: references and return inline parts.
Returns: (new_html, [(cid, image_bytes, mime_type), ...])
"""
if not html_body:
return "", []
inline_parts: list[tuple[str, bytes, str]] = []
seen: dict[str, str] = {} # data-uri -> cid
src_pattern = re.compile(r"""src=(['"])(data:image/[^'"]+)\1""", re.IGNORECASE)
data_pattern = re.compile(r"^data:(image/[a-zA-Z0-9.+-]+);base64,(.+)$", re.IGNORECASE | re.DOTALL)
def _replace(match: re.Match) -> str:
quote = match.group(1)
data_uri = match.group(2)
if data_uri in seen:
cid = seen[data_uri]
return f"src={quote}cid:{cid}{quote}"
parsed = data_pattern.match(data_uri)
if not parsed:
return match.group(0)
mime_type = parsed.group(1).lower()
b64_data = parsed.group(2).strip()
try:
payload = base64.b64decode(b64_data, validate=False)
except Exception:
return match.group(0)
cid = f"inline-{uuid.uuid4().hex}"
seen[data_uri] = cid
inline_parts.append((cid, payload, mime_type))
return f"src={quote}cid:{cid}{quote}"
return src_pattern.sub(_replace, html_body), inline_parts
def _load_customer_email_map() -> dict[str, str]:
"""Build a lookup of customer email -> customer_id from Firestore."""
from shared.firebase import get_db as get_firestore
firestore_db = get_firestore()
addr_to_customer: dict[str, str] = {}
for doc in firestore_db.collection("crm_customers").stream():
data = doc.to_dict() or {}
for contact in (data.get("contacts") or []):
if contact.get("type") == "email" and contact.get("value"):
addr_to_customer[str(contact["value"]).strip().lower()] = doc.id
return addr_to_customer
def _get_body(msg: email.message.Message) -> tuple[str, str]:
"""Extract (plain_text, html_body) from an email message.
Inline images (cid: references) are substituted with data-URIs so they
render correctly in a sandboxed iframe without external requests.
"""
import base64 as _b64
plain = None
html_body = None
# Map Content-ID → data-URI for inline images
cid_map: dict[str, str] = {}
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get("Content-Disposition", ""))
cid = part.get("Content-ID", "").strip().strip("<>")
if "attachment" in cd:
continue
if ct == "text/plain" and plain is None:
raw = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
plain = raw.decode(charset, errors="replace")
elif ct == "text/html" and html_body is None:
raw = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
html_body = raw.decode(charset, errors="replace")
elif ct.startswith("image/") and cid:
raw = part.get_payload(decode=True)
if raw:
b64 = _b64.b64encode(raw).decode("ascii")
cid_map[cid] = f"data:{ct};base64,{b64}"
else:
ct = msg.get_content_type()
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or "utf-8"
if payload:
text = payload.decode(charset, errors="replace")
if ct == "text/plain":
plain = text
elif ct == "text/html":
html_body = text
# Substitute cid: references with data-URIs
if html_body and cid_map:
for cid, data_uri in cid_map.items():
html_body = html_body.replace(f"cid:{cid}", data_uri)
plain_text = (plain or (html_body and _strip_html(html_body)) or "").strip()
return plain_text, (html_body or "").strip()
def _get_attachments(msg: email.message.Message) -> list[dict]:
"""Extract attachment info (filename, content_type, size) without storing content."""
attachments = []
if msg.is_multipart():
for part in msg.walk():
cd = str(part.get("Content-Disposition", ""))
if "attachment" in cd:
filename = part.get_filename() or "attachment"
filename = _decode_header(filename)
ct = part.get_content_type() or "application/octet-stream"
payload = part.get_payload(decode=True)
size = len(payload) if payload else 0
attachments.append({"filename": filename, "content_type": ct, "size": size})
return attachments
# ---------------------------------------------------------------------------
# IMAP sync (synchronous — called via run_in_executor)
# ---------------------------------------------------------------------------
def _sync_account_emails_sync(account: dict) -> tuple[list[dict], bool]:
if not account.get("imap_host") or not account.get("imap_username") or not account.get("imap_password"):
return [], False
if account.get("imap_use_ssl"):
imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"]))
else:
imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"]))
imap.login(account["imap_username"], account["imap_password"])
# readonly=True prevents marking messages as \Seen while syncing.
imap.select(account.get("imap_inbox", "INBOX"), readonly=True)
_, data = imap.search(None, "ALL")
uids = data[0].split() if data[0] else []
results = []
complete = True
for uid in uids:
try:
_, msg_data = imap.fetch(uid, "(FLAGS RFC822)")
meta = msg_data[0][0] if msg_data and isinstance(msg_data[0], tuple) else b""
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
message_id = msg.get("Message-ID", "").strip()
from_addr = email.utils.parseaddr(msg.get("From", ""))[1]
to_addrs_raw = msg.get("To", "")
to_addrs = [a for _, a in email.utils.getaddresses([to_addrs_raw])]
subject = _decode_header(msg.get("Subject", ""))
date_str = msg.get("Date", "")
try:
occurred_at = email.utils.parsedate_to_datetime(date_str).isoformat()
except Exception:
occurred_at = datetime.now(timezone.utc).isoformat()
is_read = b"\\Seen" in (meta or b"")
try:
body, body_html = _get_body(msg)
except Exception:
body, body_html = "", ""
try:
file_attachments = _get_attachments(msg)
except Exception:
file_attachments = []
results.append({
"mail_account": account["key"],
"message_id": message_id,
"from_addr": from_addr,
"to_addrs": to_addrs,
"subject": subject,
"body": body,
"body_html": body_html,
"attachments": file_attachments,
"occurred_at": occurred_at,
"is_read": bool(is_read),
})
except Exception as e:
complete = False
logger.warning(f"[EMAIL SYNC] Failed to parse message uid={uid} account={account['key']}: {e}")
imap.logout()
return results, complete
def _sync_emails_sync() -> tuple[list[dict], bool]:
all_msgs: list[dict] = []
all_complete = True
# Deduplicate by physical inbox source. Aliases often share the same mailbox.
seen_sources: set[tuple] = set()
for acc in get_mail_accounts():
if not acc.get("sync_inbound"):
continue
source = (
(acc.get("imap_host") or "").lower(),
int(acc.get("imap_port") or 0),
(acc.get("imap_username") or "").lower(),
(acc.get("imap_inbox") or "INBOX").upper(),
)
if source in seen_sources:
continue
seen_sources.add(source)
msgs, complete = _sync_account_emails_sync(acc)
all_msgs.extend(msgs)
all_complete = all_complete and complete
return all_msgs, all_complete
async def sync_emails() -> int:
"""
Pull emails from IMAP, match against CRM customers, store new ones.
Returns count of new entries created.
"""
if not get_mail_accounts():
return 0
loop = asyncio.get_event_loop()
try:
messages, fetch_complete = await loop.run_in_executor(None, _sync_emails_sync)
except Exception as e:
logger.error(f"[EMAIL SYNC] IMAP connect/fetch failed: {e}")
raise
db = await mqtt_db.get_db()
# Load all customer email contacts into a flat lookup: email -> customer_id
addr_to_customer = _load_customer_email_map()
# Load already-synced message-ids from DB
rows = await db.execute_fetchall(
"SELECT id, ext_message_id, COALESCE(mail_account, '') as mail_account, direction, is_read, customer_id "
"FROM crm_comms_log WHERE type='email' AND ext_message_id IS NOT NULL"
)
known_map = {
(r[1], r[2] or ""): {
"id": r[0],
"direction": r[3],
"is_read": int(r[4] or 0),
"customer_id": r[5],
}
for r in rows
}
new_count = 0
now = datetime.now(timezone.utc).isoformat()
server_ids_by_account: dict[str, set[str]] = {}
# Global inbound IDs from server snapshot, used to avoid account-classification delete oscillation.
inbound_server_ids: set[str] = set()
accounts = get_mail_accounts()
accounts_by_email = {a["email"].lower(): a for a in accounts}
# Initialize tracked inbound accounts even if inbox is empty.
for a in accounts:
if a.get("sync_inbound"):
server_ids_by_account[a["key"]] = set()
for msg in messages:
mid = msg["message_id"]
fetch_account_key = (msg.get("mail_account") or "").strip().lower()
from_addr = msg["from_addr"].lower()
to_addrs = [a.lower() for a in msg["to_addrs"]]
sender_acc = accounts_by_email.get(from_addr)
if sender_acc:
direction = "outbound"
resolved_account_key = sender_acc["key"]
customer_addrs = to_addrs
else:
direction = "inbound"
target_acc = None
for addr in to_addrs:
if addr in accounts_by_email:
target_acc = accounts_by_email[addr]
break
resolved_account_key = (target_acc["key"] if target_acc else fetch_account_key)
customer_addrs = [from_addr]
if target_acc and not target_acc.get("sync_inbound"):
# Ignore inbound for non-synced aliases (e.g. info/news).
continue
if direction == "inbound" and mid and resolved_account_key in server_ids_by_account:
server_ids_by_account[resolved_account_key].add(mid)
inbound_server_ids.add(mid)
# Find matching customer (may be None - we still store the email)
customer_id = None
for addr in customer_addrs:
if addr in addr_to_customer:
customer_id = addr_to_customer[addr]
break
if mid and (mid, resolved_account_key) in known_map:
existing = known_map[(mid, resolved_account_key)]
# Backfill customer linkage for rows created without customer_id.
if customer_id and not existing.get("customer_id"):
await db.execute(
"UPDATE crm_comms_log SET customer_id=? WHERE id=?",
(customer_id, existing["id"]),
)
# Existing inbound message: sync read/unread state from server.
if direction == "inbound":
server_read = 1 if msg.get("is_read") else 0
await db.execute(
"UPDATE crm_comms_log SET is_read=? "
"WHERE type='email' AND direction='inbound' AND ext_message_id=? AND mail_account=?",
(server_read, mid, resolved_account_key),
)
continue # already stored
attachments_json = json.dumps(msg.get("attachments") or [])
to_addrs_json = json.dumps(to_addrs)
entry_id = str(uuid.uuid4())
await db.execute(
"""INSERT INTO crm_comms_log
(id, customer_id, type, mail_account, direction, subject, body, body_html, attachments,
ext_message_id, from_addr, to_addrs, logged_by, occurred_at, created_at, is_read)
VALUES (?, ?, 'email', ?, ?, ?, ?, ?, ?, ?, ?, ?, 'system', ?, ?, ?)""",
(entry_id, customer_id, resolved_account_key, direction, msg["subject"], msg["body"],
msg.get("body_html", ""), attachments_json,
mid, from_addr, to_addrs_json, msg["occurred_at"], now, 1 if msg.get("is_read") else 0),
)
new_count += 1
# Mirror remote deletes based on global inbound message-id snapshot.
# To avoid transient IMAP inconsistency causing add/remove oscillation,
# require two consecutive "missing" syncs before local deletion.
sync_keys = [a["key"] for a in accounts if a.get("sync_inbound")]
if sync_keys and fetch_complete:
placeholders = ",".join("?" for _ in sync_keys)
local_rows = await db.execute_fetchall(
f"SELECT id, ext_message_id, mail_account FROM crm_comms_log "
f"WHERE type='email' AND direction='inbound' AND mail_account IN ({placeholders}) "
"AND ext_message_id IS NOT NULL",
sync_keys,
)
to_delete: list[str] = []
for row in local_rows:
row_id, ext_id, acc_key = row[0], row[1], row[2]
if not ext_id:
continue
state_key = f"missing_email::{acc_key}::{ext_id}"
if ext_id in inbound_server_ids:
await db.execute("DELETE FROM crm_sync_state WHERE key = ?", (state_key,))
continue
prev = await db.execute_fetchall("SELECT value FROM crm_sync_state WHERE key = ?", (state_key,))
prev_count = int(prev[0][0]) if prev and (prev[0][0] or "").isdigit() else 0
new_count = prev_count + 1
await db.execute(
"INSERT INTO crm_sync_state (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(state_key, str(new_count)),
)
if new_count >= 2:
to_delete.append(row_id)
await db.execute("DELETE FROM crm_sync_state WHERE key = ?", (state_key,))
if to_delete:
del_ph = ",".join("?" for _ in to_delete)
await db.execute(f"DELETE FROM crm_comms_log WHERE id IN ({del_ph})", to_delete)
if new_count or server_ids_by_account:
await db.commit()
# Update last sync time
await db.execute(
"INSERT INTO crm_sync_state (key, value) VALUES ('last_email_sync', ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(now,),
)
await db.commit()
logger.info(f"[EMAIL SYNC] Done — {new_count} new emails stored")
return new_count
# ---------------------------------------------------------------------------
# Lightweight new-mail check (synchronous — called via run_in_executor)
# ---------------------------------------------------------------------------
def _check_server_count_sync() -> int:
# Keep this for backward compatibility; no longer used by check_new_emails().
total = 0
seen_sources: set[tuple] = set()
for acc in get_mail_accounts():
if not acc.get("sync_inbound"):
continue
source = (
(acc.get("imap_host") or "").lower(),
int(acc.get("imap_port") or 0),
(acc.get("imap_username") or "").lower(),
(acc.get("imap_inbox") or "INBOX").upper(),
)
if source in seen_sources:
continue
seen_sources.add(source)
if acc.get("imap_use_ssl"):
imap = imaplib.IMAP4_SSL(acc["imap_host"], int(acc["imap_port"]))
else:
imap = imaplib.IMAP4(acc["imap_host"], int(acc["imap_port"]))
imap.login(acc["imap_username"], acc["imap_password"])
imap.select(acc.get("imap_inbox", "INBOX"), readonly=True)
_, data = imap.search(None, "ALL")
total += len(data[0].split()) if data[0] else 0
imap.logout()
return total
async def check_new_emails() -> dict:
"""
Compare server message count vs. locally stored count.
Returns {"new_count": int} — does NOT download or store anything.
"""
if not get_mail_accounts():
return {"new_count": 0}
loop = asyncio.get_event_loop()
try:
# Reuse same account-resolution logic as sync to avoid false positives.
messages, _ = await loop.run_in_executor(None, _sync_emails_sync)
except Exception as e:
logger.warning(f"[EMAIL CHECK] IMAP check failed: {e}")
raise
accounts = get_mail_accounts()
accounts_by_email = {a["email"].lower(): a for a in accounts}
db = await mqtt_db.get_db()
rows = await db.execute_fetchall(
"SELECT ext_message_id, COALESCE(mail_account, '') as mail_account FROM crm_comms_log "
"WHERE type='email' AND ext_message_id IS NOT NULL"
)
known_ids = {(r[0], r[1] or "") for r in rows}
new_count = 0
for msg in messages:
mid = (msg.get("message_id") or "").strip()
if not mid:
continue
fetch_account_key = (msg.get("mail_account") or "").strip().lower()
from_addr = (msg.get("from_addr") or "").lower()
to_addrs = [(a or "").lower() for a in (msg.get("to_addrs") or [])]
sender_acc = accounts_by_email.get(from_addr)
if sender_acc:
# Outbound copy in mailbox; not part of "new inbound mail" banner.
continue
target_acc = None
for addr in to_addrs:
if addr in accounts_by_email:
target_acc = accounts_by_email[addr]
break
resolved_account_key = (target_acc["key"] if target_acc else fetch_account_key)
if target_acc and not target_acc.get("sync_inbound"):
continue
if (mid, resolved_account_key) not in known_ids:
new_count += 1
return {"new_count": new_count}
# ---------------------------------------------------------------------------
# SMTP send (synchronous — called via run_in_executor)
# ---------------------------------------------------------------------------
def _append_to_sent_sync(account: dict, raw_message: bytes) -> None:
"""Best-effort append of sent MIME message to IMAP Sent folder."""
if not raw_message:
return
try:
if account.get("imap_use_ssl"):
imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"]))
else:
imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"]))
imap.login(account["imap_username"], account["imap_password"])
preferred = str(account.get("imap_sent") or "Sent").strip() or "Sent"
candidates = [preferred, "Sent", "INBOX.Sent", "Sent Items", "INBOX.Sent Items"]
seen = set()
ordered_candidates = []
for name in candidates:
key = name.lower()
if key not in seen:
seen.add(key)
ordered_candidates.append(name)
appended = False
for mailbox in ordered_candidates:
try:
status, _ = imap.append(mailbox, "\\Seen", None, raw_message)
if status == "OK":
appended = True
break
except Exception:
continue
if not appended:
logger.warning("[EMAIL SEND] Sent copy append failed for account=%s", account.get("key"))
imap.logout()
except Exception as e:
logger.warning("[EMAIL SEND] IMAP append to Sent failed for account=%s: %s", account.get("key"), e)
def _send_email_sync(
account: dict,
to: str,
subject: str,
body: str,
body_html: str,
cc: List[str],
file_attachments: Optional[List[Tuple[str, bytes, str]]] = None,
) -> str:
"""Send via SMTP. Returns the Message-ID header.
file_attachments: list of (filename, content_bytes, mime_type)
"""
html_with_cids, inline_images = _extract_inline_data_images(body_html or "")
# Build body tree:
# - with inline images: related(alternative(text/plain, text/html), image parts)
# - without inline images: alternative(text/plain, text/html)
if inline_images:
body_part = MIMEMultipart("related")
alt_part = MIMEMultipart("alternative")
alt_part.attach(MIMEText(body, "plain", "utf-8"))
if html_with_cids:
alt_part.attach(MIMEText(html_with_cids, "html", "utf-8"))
body_part.attach(alt_part)
for idx, (cid, content, mime_type) in enumerate(inline_images, start=1):
maintype, _, subtype = mime_type.partition("/")
img_part = MIMEBase(maintype or "image", subtype or "png")
img_part.set_payload(content)
encoders.encode_base64(img_part)
img_part.add_header("Content-ID", f"<{cid}>")
img_part.add_header("Content-Disposition", "inline", filename=f"inline-{idx}.{subtype or 'png'}")
body_part.attach(img_part)
else:
body_part = MIMEMultipart("alternative")
body_part.attach(MIMEText(body, "plain", "utf-8"))
if body_html:
body_part.attach(MIMEText(body_html, "html", "utf-8"))
# Wrap with mixed only when classic file attachments exist.
if file_attachments:
msg = MIMEMultipart("mixed")
msg.attach(body_part)
else:
msg = body_part
from_addr = account["email"]
msg["From"] = from_addr
msg["To"] = to
msg["Subject"] = subject
if cc:
msg["Cc"] = ", ".join(cc)
msg_id = f"<{uuid.uuid4()}@bellsystems>"
msg["Message-ID"] = msg_id
# Attach files
for filename, content, mime_type in (file_attachments or []):
maintype, _, subtype = mime_type.partition("/")
part = MIMEBase(maintype or "application", subtype or "octet-stream")
part.set_payload(content)
encoders.encode_base64(part)
part.add_header("Content-Disposition", "attachment", filename=filename)
msg.attach(part)
recipients = [to] + cc
raw_for_append = msg.as_bytes()
if account.get("smtp_use_tls"):
server = smtplib.SMTP(account["smtp_host"], int(account["smtp_port"]))
server.starttls()
else:
server = smtplib.SMTP_SSL(account["smtp_host"], int(account["smtp_port"]))
server.login(account["smtp_username"], account["smtp_password"])
server.sendmail(from_addr, recipients, msg.as_string())
server.quit()
_append_to_sent_sync(account, raw_for_append)
return msg_id
async def send_email(
customer_id: str | None,
from_account: str | None,
to: str,
subject: str,
body: str,
body_html: str,
cc: List[str],
sent_by: str,
file_attachments: Optional[List[Tuple[str, bytes, str]]] = None,
) -> dict:
"""Send an email and record it in crm_comms_log. Returns the new log entry.
file_attachments: list of (filename, content_bytes, mime_type)
"""
accounts = get_mail_accounts()
if not accounts:
raise RuntimeError("SMTP not configured")
account = account_by_key(from_account) if from_account else None
if not account:
raise RuntimeError("Please select a valid sender account")
if not account.get("allow_send"):
raise RuntimeError("Selected account is not allowed to send")
if not account.get("smtp_host") or not account.get("smtp_username") or not account.get("smtp_password"):
raise RuntimeError("SMTP not configured for selected account")
# If the caller did not provide a customer_id (e.g. compose from Mail page),
# auto-link by matching recipient addresses against CRM customer emails.
resolved_customer_id = customer_id
if not resolved_customer_id:
addr_to_customer = _load_customer_email_map()
rcpts = [to, *cc]
parsed_rcpts = [addr for _, addr in email.utils.getaddresses(rcpts) if addr]
for addr in parsed_rcpts:
key = (addr or "").strip().lower()
if key in addr_to_customer:
resolved_customer_id = addr_to_customer[key]
break
loop = asyncio.get_event_loop()
import functools
msg_id = await loop.run_in_executor(
None,
functools.partial(_send_email_sync, account, to, subject, body, body_html, cc, file_attachments or []),
)
# Upload attachments to Nextcloud and register in crm_media
comm_attachments = []
if file_attachments and resolved_customer_id:
from crm import nextcloud, service
from crm.models import MediaCreate, MediaDirection
from shared.firebase import get_db as get_firestore
firestore_db = get_firestore()
doc = firestore_db.collection("crm_customers").document(resolved_customer_id).get()
if doc.exists:
data = doc.to_dict()
# Build a minimal CustomerInDB-like object for get_customer_nc_path
folder_id = data.get("folder_id") or resolved_customer_id
nc_path = folder_id
for filename, content, mime_type in file_attachments:
# images/video → sent_media, everything else → documents
if mime_type.startswith("image/") or mime_type.startswith("video/"):
subfolder = "sent_media"
else:
subfolder = "documents"
target_folder = f"customers/{nc_path}/{subfolder}"
file_path = f"{target_folder}/{filename}"
try:
await nextcloud.ensure_folder(target_folder)
await nextcloud.upload_file(file_path, content, mime_type)
await service.create_media(MediaCreate(
customer_id=resolved_customer_id,
filename=filename,
nextcloud_path=file_path,
mime_type=mime_type,
direction=MediaDirection.sent,
tags=["email-attachment"],
uploaded_by=sent_by,
))
comm_attachments.append({"filename": filename, "nextcloud_path": file_path})
except Exception as e:
logger.warning(f"[EMAIL SEND] Failed to upload attachment {filename}: {e}")
now = datetime.now(timezone.utc).isoformat()
entry_id = str(uuid.uuid4())
db = await mqtt_db.get_db()
our_addr = account["email"].lower()
to_addrs_json = json.dumps([to] + cc)
attachments_json = json.dumps(comm_attachments)
await db.execute(
"""INSERT INTO crm_comms_log
(id, customer_id, type, mail_account, direction, subject, body, body_html, attachments,
ext_message_id, from_addr, to_addrs, logged_by, occurred_at, created_at)
VALUES (?, ?, 'email', ?, 'outbound', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(entry_id, resolved_customer_id, account["key"], subject, body, body_html, attachments_json, msg_id,
our_addr, to_addrs_json, sent_by, now, now),
)
await db.commit()
return {
"id": entry_id,
"customer_id": resolved_customer_id,
"type": "email",
"mail_account": account["key"],
"direction": "outbound",
"subject": subject,
"body": body,
"body_html": body_html,
"attachments": comm_attachments,
"ext_message_id": msg_id,
"from_addr": our_addr,
"to_addrs": [to] + cc,
"logged_by": sent_by,
"occurred_at": now,
"created_at": now,
}
def _delete_remote_email_sync(account: dict, ext_message_id: str) -> bool:
if not ext_message_id:
return False
if account.get("imap_use_ssl"):
imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"]))
else:
imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"]))
imap.login(account["imap_username"], account["imap_password"])
imap.select(account.get("imap_inbox", "INBOX"))
_, data = imap.search(None, f'HEADER Message-ID "{ext_message_id}"')
uids = data[0].split() if data and data[0] else []
if not uids:
imap.logout()
return False
for uid in uids:
imap.store(uid, "+FLAGS", "\\Deleted")
imap.expunge()
imap.logout()
return True
async def delete_remote_email(ext_message_id: str, mail_account: str | None, from_addr: str | None = None) -> bool:
account = account_by_key(mail_account) if mail_account else None
if not account:
account = account_by_email(from_addr)
if not account or not account.get("imap_host"):
return False
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(None, lambda: _delete_remote_email_sync(account, ext_message_id))
except Exception as e:
logger.warning(f"[EMAIL DELETE] Failed remote delete for {ext_message_id}: {e}")
return False
def _set_remote_read_sync(account: dict, ext_message_id: str, read: bool) -> bool:
if not ext_message_id:
return False
if account.get("imap_use_ssl"):
imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"]))
else:
imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"]))
imap.login(account["imap_username"], account["imap_password"])
imap.select(account.get("imap_inbox", "INBOX"))
_, data = imap.search(None, f'HEADER Message-ID "{ext_message_id}"')
uids = data[0].split() if data and data[0] else []
if not uids:
imap.logout()
return False
flag_op = "+FLAGS" if read else "-FLAGS"
for uid in uids:
imap.store(uid, flag_op, "\\Seen")
imap.logout()
return True
async def set_remote_read(ext_message_id: str, mail_account: str | None, from_addr: str | None, read: bool) -> bool:
account = account_by_key(mail_account) if mail_account else None
if not account:
account = account_by_email(from_addr)
if not account or not account.get("imap_host"):
return False
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(None, lambda: _set_remote_read_sync(account, ext_message_id, read))
except Exception as e:
logger.warning(f"[EMAIL READ] Failed remote read update for {ext_message_id}: {e}")
return False