""" Periodic cloud check-in. Runs every 6 hours as an asyncio background task. If cloud is unreachable, falls back to last known state + grace period. """ import asyncio import json import logging import os from datetime import datetime, timedelta, timezone from pathlib import Path import httpx from config import settings from middleware.license_check import license_state logger = logging.getLogger(__name__) SYNC_INTERVAL_SECONDS = 6 * 60 * 60 # 6 hours STATE_FILE = Path(__file__).parent.parent / "license_state.json" def _load_persisted_state(): if STATE_FILE.exists(): try: data = json.loads(STATE_FILE.read_text()) license_state.update(data) logger.info("Loaded persisted license state: %s", data) except Exception as e: logger.warning("Could not load license state file: %s", e) def _persist_state(): try: STATE_FILE.write_text(json.dumps(license_state)) except Exception as e: logger.warning("Could not persist license state: %s", e) async def _sync_once(): if not settings.SITE_ID or not settings.CLOUD_URL: logger.debug("No SITE_ID/CLOUD_URL configured — skipping cloud sync") return try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( f"{settings.CLOUD_URL}/api/sites/heartbeat", json={"site_id": settings.SITE_ID}, ) resp.raise_for_status() data = resp.json() license_state["licensed"] = data.get("licensed", True) license_state["locked"] = data.get("locked", False) license_state["expires_at"] = data.get("expires_at") license_state["last_sync"] = datetime.now(timezone.utc).isoformat() license_state["sync_failed"] = False _persist_state() logger.info("Cloud sync OK: %s", data) except Exception as e: logger.warning("Cloud sync failed: %s", e) license_state["sync_failed"] = True # Check grace period last_sync_str = license_state.get("last_sync") if last_sync_str: last_sync = datetime.fromisoformat(last_sync_str) grace_expires = last_sync + timedelta(hours=settings.LICENSE_GRACE_HOURS) if datetime.now(timezone.utc) > grace_expires: logger.error("License grace period expired — marking as unlicensed") license_state["licensed"] = False async def _sync_loop(): _load_persisted_state() while True: await _sync_once() await asyncio.sleep(SYNC_INTERVAL_SECONDS) async def start_cloud_sync() -> asyncio.Task: task = asyncio.create_task(_sync_loop()) return task