""" IMAP email sync and SMTP email send for CRM. Uses only stdlib imaplib/smtplib — no extra dependencies. Sync is run in an executor to avoid blocking the event loop. """ import asyncio import base64 import email import email.header import email.utils import html.parser import imaplib import json import logging import re import smtplib import uuid from datetime import datetime, timezone from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email import encoders from typing import List, Optional, Tuple from config import settings import database as mqtt_db from crm.mail_accounts import get_mail_accounts, account_by_key, account_by_email logger = logging.getLogger("crm.email_sync") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _decode_header(raw: str) -> str: """Decode an RFC2047-encoded email header value.""" if not raw: return "" parts = email.header.decode_header(raw) decoded = [] for part, enc in parts: if isinstance(part, bytes): decoded.append(part.decode(enc or "utf-8", errors="replace")) else: decoded.append(part) return " ".join(decoded) class _HTMLStripper(html.parser.HTMLParser): def __init__(self): super().__init__() self._text = [] def handle_data(self, data): self._text.append(data) def get_text(self): return " ".join(self._text) def _strip_html(html_str: str) -> str: s = _HTMLStripper() s.feed(html_str) return s.get_text() def _extract_inline_data_images(html_body: str) -> tuple[str, list[tuple[str, bytes, str]]]: """Replace data-URI images in HTML with cid: references and return inline parts. Returns: (new_html, [(cid, image_bytes, mime_type), ...]) """ if not html_body: return "", [] inline_parts: list[tuple[str, bytes, str]] = [] seen: dict[str, str] = {} # data-uri -> cid src_pattern = re.compile(r"""src=(['"])(data:image/[^'"]+)\1""", re.IGNORECASE) data_pattern = re.compile(r"^data:(image/[a-zA-Z0-9.+-]+);base64,(.+)$", re.IGNORECASE | re.DOTALL) def _replace(match: re.Match) -> str: quote = match.group(1) data_uri = match.group(2) if data_uri in seen: cid = seen[data_uri] return f"src={quote}cid:{cid}{quote}" parsed = data_pattern.match(data_uri) if not parsed: return match.group(0) mime_type = parsed.group(1).lower() b64_data = parsed.group(2).strip() try: payload = base64.b64decode(b64_data, validate=False) except Exception: return match.group(0) cid = f"inline-{uuid.uuid4().hex}" seen[data_uri] = cid inline_parts.append((cid, payload, mime_type)) return f"src={quote}cid:{cid}{quote}" return src_pattern.sub(_replace, html_body), inline_parts def _load_customer_email_map() -> dict[str, str]: """Build a lookup of customer email -> customer_id from Firestore.""" from shared.firebase import get_db as get_firestore firestore_db = get_firestore() addr_to_customer: dict[str, str] = {} for doc in firestore_db.collection("crm_customers").stream(): data = doc.to_dict() or {} for contact in (data.get("contacts") or []): if contact.get("type") == "email" and contact.get("value"): addr_to_customer[str(contact["value"]).strip().lower()] = doc.id return addr_to_customer def _get_body(msg: email.message.Message) -> tuple[str, str]: """Extract (plain_text, html_body) from an email message. Inline images (cid: references) are substituted with data-URIs so they render correctly in a sandboxed iframe without external requests. """ import base64 as _b64 plain = None html_body = None # Map Content-ID → data-URI for inline images cid_map: dict[str, str] = {} if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() cd = str(part.get("Content-Disposition", "")) cid = part.get("Content-ID", "").strip().strip("<>") if "attachment" in cd: continue if ct == "text/plain" and plain is None: raw = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" plain = raw.decode(charset, errors="replace") elif ct == "text/html" and html_body is None: raw = part.get_payload(decode=True) charset = part.get_content_charset() or "utf-8" html_body = raw.decode(charset, errors="replace") elif ct.startswith("image/") and cid: raw = part.get_payload(decode=True) if raw: b64 = _b64.b64encode(raw).decode("ascii") cid_map[cid] = f"data:{ct};base64,{b64}" else: ct = msg.get_content_type() payload = msg.get_payload(decode=True) charset = msg.get_content_charset() or "utf-8" if payload: text = payload.decode(charset, errors="replace") if ct == "text/plain": plain = text elif ct == "text/html": html_body = text # Substitute cid: references with data-URIs if html_body and cid_map: for cid, data_uri in cid_map.items(): html_body = html_body.replace(f"cid:{cid}", data_uri) plain_text = (plain or (html_body and _strip_html(html_body)) or "").strip() return plain_text, (html_body or "").strip() def _get_attachments(msg: email.message.Message) -> list[dict]: """Extract attachment info (filename, content_type, size) without storing content.""" attachments = [] if msg.is_multipart(): for part in msg.walk(): cd = str(part.get("Content-Disposition", "")) if "attachment" in cd: filename = part.get_filename() or "attachment" filename = _decode_header(filename) ct = part.get_content_type() or "application/octet-stream" payload = part.get_payload(decode=True) size = len(payload) if payload else 0 attachments.append({"filename": filename, "content_type": ct, "size": size}) return attachments # --------------------------------------------------------------------------- # IMAP sync (synchronous — called via run_in_executor) # --------------------------------------------------------------------------- def _sync_account_emails_sync(account: dict) -> tuple[list[dict], bool]: if not account.get("imap_host") or not account.get("imap_username") or not account.get("imap_password"): return [], False if account.get("imap_use_ssl"): imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"])) else: imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"])) imap.login(account["imap_username"], account["imap_password"]) # readonly=True prevents marking messages as \Seen while syncing. imap.select(account.get("imap_inbox", "INBOX"), readonly=True) _, data = imap.search(None, "ALL") uids = data[0].split() if data[0] else [] results = [] complete = True for uid in uids: try: _, msg_data = imap.fetch(uid, "(FLAGS RFC822)") meta = msg_data[0][0] if msg_data and isinstance(msg_data[0], tuple) else b"" raw = msg_data[0][1] msg = email.message_from_bytes(raw) message_id = msg.get("Message-ID", "").strip() from_addr = email.utils.parseaddr(msg.get("From", ""))[1] to_addrs_raw = msg.get("To", "") to_addrs = [a for _, a in email.utils.getaddresses([to_addrs_raw])] subject = _decode_header(msg.get("Subject", "")) date_str = msg.get("Date", "") try: occurred_at = email.utils.parsedate_to_datetime(date_str).isoformat() except Exception: occurred_at = datetime.now(timezone.utc).isoformat() is_read = b"\\Seen" in (meta or b"") try: body, body_html = _get_body(msg) except Exception: body, body_html = "", "" try: file_attachments = _get_attachments(msg) except Exception: file_attachments = [] results.append({ "mail_account": account["key"], "message_id": message_id, "from_addr": from_addr, "to_addrs": to_addrs, "subject": subject, "body": body, "body_html": body_html, "attachments": file_attachments, "occurred_at": occurred_at, "is_read": bool(is_read), }) except Exception as e: complete = False logger.warning(f"[EMAIL SYNC] Failed to parse message uid={uid} account={account['key']}: {e}") imap.logout() return results, complete def _sync_emails_sync() -> tuple[list[dict], bool]: all_msgs: list[dict] = [] all_complete = True # Deduplicate by physical inbox source. Aliases often share the same mailbox. seen_sources: set[tuple] = set() for acc in get_mail_accounts(): if not acc.get("sync_inbound"): continue source = ( (acc.get("imap_host") or "").lower(), int(acc.get("imap_port") or 0), (acc.get("imap_username") or "").lower(), (acc.get("imap_inbox") or "INBOX").upper(), ) if source in seen_sources: continue seen_sources.add(source) msgs, complete = _sync_account_emails_sync(acc) all_msgs.extend(msgs) all_complete = all_complete and complete return all_msgs, all_complete async def sync_emails() -> int: """ Pull emails from IMAP, match against CRM customers, store new ones. Returns count of new entries created. """ if not get_mail_accounts(): return 0 loop = asyncio.get_event_loop() try: messages, fetch_complete = await loop.run_in_executor(None, _sync_emails_sync) except Exception as e: logger.error(f"[EMAIL SYNC] IMAP connect/fetch failed: {e}") raise db = await mqtt_db.get_db() # Load all customer email contacts into a flat lookup: email -> customer_id addr_to_customer = _load_customer_email_map() # Load already-synced message-ids from DB rows = await db.execute_fetchall( "SELECT id, ext_message_id, COALESCE(mail_account, '') as mail_account, direction, is_read, customer_id " "FROM crm_comms_log WHERE type='email' AND ext_message_id IS NOT NULL" ) known_map = { (r[1], r[2] or ""): { "id": r[0], "direction": r[3], "is_read": int(r[4] or 0), "customer_id": r[5], } for r in rows } new_count = 0 now = datetime.now(timezone.utc).isoformat() server_ids_by_account: dict[str, set[str]] = {} # Global inbound IDs from server snapshot, used to avoid account-classification delete oscillation. inbound_server_ids: set[str] = set() accounts = get_mail_accounts() accounts_by_email = {a["email"].lower(): a for a in accounts} # Initialize tracked inbound accounts even if inbox is empty. for a in accounts: if a.get("sync_inbound"): server_ids_by_account[a["key"]] = set() for msg in messages: mid = msg["message_id"] fetch_account_key = (msg.get("mail_account") or "").strip().lower() from_addr = msg["from_addr"].lower() to_addrs = [a.lower() for a in msg["to_addrs"]] sender_acc = accounts_by_email.get(from_addr) if sender_acc: direction = "outbound" resolved_account_key = sender_acc["key"] customer_addrs = to_addrs else: direction = "inbound" target_acc = None for addr in to_addrs: if addr in accounts_by_email: target_acc = accounts_by_email[addr] break resolved_account_key = (target_acc["key"] if target_acc else fetch_account_key) customer_addrs = [from_addr] if target_acc and not target_acc.get("sync_inbound"): # Ignore inbound for non-synced aliases (e.g. info/news). continue if direction == "inbound" and mid and resolved_account_key in server_ids_by_account: server_ids_by_account[resolved_account_key].add(mid) inbound_server_ids.add(mid) # Find matching customer (may be None - we still store the email) customer_id = None for addr in customer_addrs: if addr in addr_to_customer: customer_id = addr_to_customer[addr] break if mid and (mid, resolved_account_key) in known_map: existing = known_map[(mid, resolved_account_key)] # Backfill customer linkage for rows created without customer_id. if customer_id and not existing.get("customer_id"): await db.execute( "UPDATE crm_comms_log SET customer_id=? WHERE id=?", (customer_id, existing["id"]), ) # Existing inbound message: sync read/unread state from server. if direction == "inbound": server_read = 1 if msg.get("is_read") else 0 await db.execute( "UPDATE crm_comms_log SET is_read=? " "WHERE type='email' AND direction='inbound' AND ext_message_id=? AND mail_account=?", (server_read, mid, resolved_account_key), ) continue # already stored attachments_json = json.dumps(msg.get("attachments") or []) to_addrs_json = json.dumps(to_addrs) entry_id = str(uuid.uuid4()) await db.execute( """INSERT INTO crm_comms_log (id, customer_id, type, mail_account, direction, subject, body, body_html, attachments, ext_message_id, from_addr, to_addrs, logged_by, occurred_at, created_at, is_read) VALUES (?, ?, 'email', ?, ?, ?, ?, ?, ?, ?, ?, ?, 'system', ?, ?, ?)""", (entry_id, customer_id, resolved_account_key, direction, msg["subject"], msg["body"], msg.get("body_html", ""), attachments_json, mid, from_addr, to_addrs_json, msg["occurred_at"], now, 1 if msg.get("is_read") else 0), ) new_count += 1 # Mirror remote deletes based on global inbound message-id snapshot. # To avoid transient IMAP inconsistency causing add/remove oscillation, # require two consecutive "missing" syncs before local deletion. sync_keys = [a["key"] for a in accounts if a.get("sync_inbound")] if sync_keys and fetch_complete: placeholders = ",".join("?" for _ in sync_keys) local_rows = await db.execute_fetchall( f"SELECT id, ext_message_id, mail_account FROM crm_comms_log " f"WHERE type='email' AND direction='inbound' AND mail_account IN ({placeholders}) " "AND ext_message_id IS NOT NULL", sync_keys, ) to_delete: list[str] = [] for row in local_rows: row_id, ext_id, acc_key = row[0], row[1], row[2] if not ext_id: continue state_key = f"missing_email::{acc_key}::{ext_id}" if ext_id in inbound_server_ids: await db.execute("DELETE FROM crm_sync_state WHERE key = ?", (state_key,)) continue prev = await db.execute_fetchall("SELECT value FROM crm_sync_state WHERE key = ?", (state_key,)) prev_count = int(prev[0][0]) if prev and (prev[0][0] or "").isdigit() else 0 new_count = prev_count + 1 await db.execute( "INSERT INTO crm_sync_state (key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (state_key, str(new_count)), ) if new_count >= 2: to_delete.append(row_id) await db.execute("DELETE FROM crm_sync_state WHERE key = ?", (state_key,)) if to_delete: del_ph = ",".join("?" for _ in to_delete) await db.execute(f"DELETE FROM crm_comms_log WHERE id IN ({del_ph})", to_delete) if new_count or server_ids_by_account: await db.commit() # Update last sync time await db.execute( "INSERT INTO crm_sync_state (key, value) VALUES ('last_email_sync', ?) " "ON CONFLICT(key) DO UPDATE SET value=excluded.value", (now,), ) await db.commit() logger.info(f"[EMAIL SYNC] Done — {new_count} new emails stored") return new_count # --------------------------------------------------------------------------- # Lightweight new-mail check (synchronous — called via run_in_executor) # --------------------------------------------------------------------------- def _check_server_count_sync() -> int: # Keep this for backward compatibility; no longer used by check_new_emails(). total = 0 seen_sources: set[tuple] = set() for acc in get_mail_accounts(): if not acc.get("sync_inbound"): continue source = ( (acc.get("imap_host") or "").lower(), int(acc.get("imap_port") or 0), (acc.get("imap_username") or "").lower(), (acc.get("imap_inbox") or "INBOX").upper(), ) if source in seen_sources: continue seen_sources.add(source) if acc.get("imap_use_ssl"): imap = imaplib.IMAP4_SSL(acc["imap_host"], int(acc["imap_port"])) else: imap = imaplib.IMAP4(acc["imap_host"], int(acc["imap_port"])) imap.login(acc["imap_username"], acc["imap_password"]) imap.select(acc.get("imap_inbox", "INBOX"), readonly=True) _, data = imap.search(None, "ALL") total += len(data[0].split()) if data[0] else 0 imap.logout() return total async def check_new_emails() -> dict: """ Compare server message count vs. locally stored count. Returns {"new_count": int} — does NOT download or store anything. """ if not get_mail_accounts(): return {"new_count": 0} loop = asyncio.get_event_loop() try: # Reuse same account-resolution logic as sync to avoid false positives. messages, _ = await loop.run_in_executor(None, _sync_emails_sync) except Exception as e: logger.warning(f"[EMAIL CHECK] IMAP check failed: {e}") raise accounts = get_mail_accounts() accounts_by_email = {a["email"].lower(): a for a in accounts} db = await mqtt_db.get_db() rows = await db.execute_fetchall( "SELECT ext_message_id, COALESCE(mail_account, '') as mail_account FROM crm_comms_log " "WHERE type='email' AND ext_message_id IS NOT NULL" ) known_ids = {(r[0], r[1] or "") for r in rows} new_count = 0 for msg in messages: mid = (msg.get("message_id") or "").strip() if not mid: continue fetch_account_key = (msg.get("mail_account") or "").strip().lower() from_addr = (msg.get("from_addr") or "").lower() to_addrs = [(a or "").lower() for a in (msg.get("to_addrs") or [])] sender_acc = accounts_by_email.get(from_addr) if sender_acc: # Outbound copy in mailbox; not part of "new inbound mail" banner. continue target_acc = None for addr in to_addrs: if addr in accounts_by_email: target_acc = accounts_by_email[addr] break resolved_account_key = (target_acc["key"] if target_acc else fetch_account_key) if target_acc and not target_acc.get("sync_inbound"): continue if (mid, resolved_account_key) not in known_ids: new_count += 1 return {"new_count": new_count} # --------------------------------------------------------------------------- # SMTP send (synchronous — called via run_in_executor) # --------------------------------------------------------------------------- def _append_to_sent_sync(account: dict, raw_message: bytes) -> None: """Best-effort append of sent MIME message to IMAP Sent folder.""" if not raw_message: return try: if account.get("imap_use_ssl"): imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"])) else: imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"])) imap.login(account["imap_username"], account["imap_password"]) preferred = str(account.get("imap_sent") or "Sent").strip() or "Sent" candidates = [preferred, "Sent", "INBOX.Sent", "Sent Items", "INBOX.Sent Items"] seen = set() ordered_candidates = [] for name in candidates: key = name.lower() if key not in seen: seen.add(key) ordered_candidates.append(name) appended = False for mailbox in ordered_candidates: try: status, _ = imap.append(mailbox, "\\Seen", None, raw_message) if status == "OK": appended = True break except Exception: continue if not appended: logger.warning("[EMAIL SEND] Sent copy append failed for account=%s", account.get("key")) imap.logout() except Exception as e: logger.warning("[EMAIL SEND] IMAP append to Sent failed for account=%s: %s", account.get("key"), e) def _send_email_sync( account: dict, to: str, subject: str, body: str, body_html: str, cc: List[str], file_attachments: Optional[List[Tuple[str, bytes, str]]] = None, ) -> str: """Send via SMTP. Returns the Message-ID header. file_attachments: list of (filename, content_bytes, mime_type) """ html_with_cids, inline_images = _extract_inline_data_images(body_html or "") # Build body tree: # - with inline images: related(alternative(text/plain, text/html), image parts) # - without inline images: alternative(text/plain, text/html) if inline_images: body_part = MIMEMultipart("related") alt_part = MIMEMultipart("alternative") alt_part.attach(MIMEText(body, "plain", "utf-8")) if html_with_cids: alt_part.attach(MIMEText(html_with_cids, "html", "utf-8")) body_part.attach(alt_part) for idx, (cid, content, mime_type) in enumerate(inline_images, start=1): maintype, _, subtype = mime_type.partition("/") img_part = MIMEBase(maintype or "image", subtype or "png") img_part.set_payload(content) encoders.encode_base64(img_part) img_part.add_header("Content-ID", f"<{cid}>") img_part.add_header("Content-Disposition", "inline", filename=f"inline-{idx}.{subtype or 'png'}") body_part.attach(img_part) else: body_part = MIMEMultipart("alternative") body_part.attach(MIMEText(body, "plain", "utf-8")) if body_html: body_part.attach(MIMEText(body_html, "html", "utf-8")) # Wrap with mixed only when classic file attachments exist. if file_attachments: msg = MIMEMultipart("mixed") msg.attach(body_part) else: msg = body_part from_addr = account["email"] msg["From"] = from_addr msg["To"] = to msg["Subject"] = subject if cc: msg["Cc"] = ", ".join(cc) msg_id = f"<{uuid.uuid4()}@bellsystems>" msg["Message-ID"] = msg_id # Attach files for filename, content, mime_type in (file_attachments or []): maintype, _, subtype = mime_type.partition("/") part = MIMEBase(maintype or "application", subtype or "octet-stream") part.set_payload(content) encoders.encode_base64(part) part.add_header("Content-Disposition", "attachment", filename=filename) msg.attach(part) recipients = [to] + cc raw_for_append = msg.as_bytes() if account.get("smtp_use_tls"): server = smtplib.SMTP(account["smtp_host"], int(account["smtp_port"])) server.starttls() else: server = smtplib.SMTP_SSL(account["smtp_host"], int(account["smtp_port"])) server.login(account["smtp_username"], account["smtp_password"]) server.sendmail(from_addr, recipients, msg.as_string()) server.quit() _append_to_sent_sync(account, raw_for_append) return msg_id async def send_email( customer_id: str | None, from_account: str | None, to: str, subject: str, body: str, body_html: str, cc: List[str], sent_by: str, file_attachments: Optional[List[Tuple[str, bytes, str]]] = None, ) -> dict: """Send an email and record it in crm_comms_log. Returns the new log entry. file_attachments: list of (filename, content_bytes, mime_type) """ accounts = get_mail_accounts() if not accounts: raise RuntimeError("SMTP not configured") account = account_by_key(from_account) if from_account else None if not account: raise RuntimeError("Please select a valid sender account") if not account.get("allow_send"): raise RuntimeError("Selected account is not allowed to send") if not account.get("smtp_host") or not account.get("smtp_username") or not account.get("smtp_password"): raise RuntimeError("SMTP not configured for selected account") # If the caller did not provide a customer_id (e.g. compose from Mail page), # auto-link by matching recipient addresses against CRM customer emails. resolved_customer_id = customer_id if not resolved_customer_id: addr_to_customer = _load_customer_email_map() rcpts = [to, *cc] parsed_rcpts = [addr for _, addr in email.utils.getaddresses(rcpts) if addr] for addr in parsed_rcpts: key = (addr or "").strip().lower() if key in addr_to_customer: resolved_customer_id = addr_to_customer[key] break loop = asyncio.get_event_loop() import functools msg_id = await loop.run_in_executor( None, functools.partial(_send_email_sync, account, to, subject, body, body_html, cc, file_attachments or []), ) # Upload attachments to Nextcloud and register in crm_media comm_attachments = [] if file_attachments and resolved_customer_id: from crm import nextcloud, service from crm.models import MediaCreate, MediaDirection from shared.firebase import get_db as get_firestore firestore_db = get_firestore() doc = firestore_db.collection("crm_customers").document(resolved_customer_id).get() if doc.exists: data = doc.to_dict() # Build a minimal CustomerInDB-like object for get_customer_nc_path folder_id = data.get("folder_id") or resolved_customer_id nc_path = folder_id for filename, content, mime_type in file_attachments: # images/video → sent_media, everything else → documents if mime_type.startswith("image/") or mime_type.startswith("video/"): subfolder = "sent_media" else: subfolder = "documents" target_folder = f"customers/{nc_path}/{subfolder}" file_path = f"{target_folder}/{filename}" try: await nextcloud.ensure_folder(target_folder) await nextcloud.upload_file(file_path, content, mime_type) await service.create_media(MediaCreate( customer_id=resolved_customer_id, filename=filename, nextcloud_path=file_path, mime_type=mime_type, direction=MediaDirection.sent, tags=["email-attachment"], uploaded_by=sent_by, )) comm_attachments.append({"filename": filename, "nextcloud_path": file_path}) except Exception as e: logger.warning(f"[EMAIL SEND] Failed to upload attachment {filename}: {e}") now = datetime.now(timezone.utc).isoformat() entry_id = str(uuid.uuid4()) db = await mqtt_db.get_db() our_addr = account["email"].lower() to_addrs_json = json.dumps([to] + cc) attachments_json = json.dumps(comm_attachments) await db.execute( """INSERT INTO crm_comms_log (id, customer_id, type, mail_account, direction, subject, body, body_html, attachments, ext_message_id, from_addr, to_addrs, logged_by, occurred_at, created_at) VALUES (?, ?, 'email', ?, 'outbound', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (entry_id, resolved_customer_id, account["key"], subject, body, body_html, attachments_json, msg_id, our_addr, to_addrs_json, sent_by, now, now), ) await db.commit() return { "id": entry_id, "customer_id": resolved_customer_id, "type": "email", "mail_account": account["key"], "direction": "outbound", "subject": subject, "body": body, "body_html": body_html, "attachments": comm_attachments, "ext_message_id": msg_id, "from_addr": our_addr, "to_addrs": [to] + cc, "logged_by": sent_by, "occurred_at": now, "created_at": now, } def _delete_remote_email_sync(account: dict, ext_message_id: str) -> bool: if not ext_message_id: return False if account.get("imap_use_ssl"): imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"])) else: imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"])) imap.login(account["imap_username"], account["imap_password"]) imap.select(account.get("imap_inbox", "INBOX")) _, data = imap.search(None, f'HEADER Message-ID "{ext_message_id}"') uids = data[0].split() if data and data[0] else [] if not uids: imap.logout() return False for uid in uids: imap.store(uid, "+FLAGS", "\\Deleted") imap.expunge() imap.logout() return True async def delete_remote_email(ext_message_id: str, mail_account: str | None, from_addr: str | None = None) -> bool: account = account_by_key(mail_account) if mail_account else None if not account: account = account_by_email(from_addr) if not account or not account.get("imap_host"): return False loop = asyncio.get_event_loop() try: return await loop.run_in_executor(None, lambda: _delete_remote_email_sync(account, ext_message_id)) except Exception as e: logger.warning(f"[EMAIL DELETE] Failed remote delete for {ext_message_id}: {e}") return False def _set_remote_read_sync(account: dict, ext_message_id: str, read: bool) -> bool: if not ext_message_id: return False if account.get("imap_use_ssl"): imap = imaplib.IMAP4_SSL(account["imap_host"], int(account["imap_port"])) else: imap = imaplib.IMAP4(account["imap_host"], int(account["imap_port"])) imap.login(account["imap_username"], account["imap_password"]) imap.select(account.get("imap_inbox", "INBOX")) _, data = imap.search(None, f'HEADER Message-ID "{ext_message_id}"') uids = data[0].split() if data and data[0] else [] if not uids: imap.logout() return False flag_op = "+FLAGS" if read else "-FLAGS" for uid in uids: imap.store(uid, flag_op, "\\Seen") imap.logout() return True async def set_remote_read(ext_message_id: str, mail_account: str | None, from_addr: str | None, read: bool) -> bool: account = account_by_key(mail_account) if mail_account else None if not account: account = account_by_email(from_addr) if not account or not account.get("imap_host"): return False loop = asyncio.get_event_loop() try: return await loop.run_in_executor(None, lambda: _set_remote_read_sync(account, ext_message_id, read)) except Exception as e: logger.warning(f"[EMAIL READ] Failed remote read update for {ext_message_id}: {e}") return False