Files
bellsystems-cp/backend/migration/migrate_device_logs.py

94 lines
2.8 KiB
Python

"""
Phase 1 — Step 1.12: device_logs (SQLite → Postgres)
Largest table — migrated in batches of 10,000 rows to avoid memory issues.
device_logs is a partitioned table; rows route automatically to the correct
monthly partition based on received_at.
Run on VPS:
docker compose exec backend python -m migration.migrate_device_logs
"""
import asyncio
import sys
from sqlalchemy import text
from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count
SCRIPT = "migrate_device_logs"
BATCH_SIZE = 10_000
async def run() -> None:
sqlite = await open_sqlite()
# Total count first
count_row = await sqlite.execute_fetchall("SELECT COUNT(*) FROM device_logs")
source_count = count_row[0][0]
print(f"Source (SQLite): {source_count} device_logs rows")
if source_count == 0:
await sqlite.close()
print("Nothing to migrate.")
await log_run(SCRIPT, 0, 0, notes="source empty")
return
offset = 0
total_inserted = 0
while offset < source_count:
rows = await sqlite.execute_fetchall(
"SELECT * FROM device_logs ORDER BY received_at LIMIT ? OFFSET ?",
(BATCH_SIZE, offset),
)
if not rows:
break
records = [
{
"device_serial": r["device_serial"],
"level": r["level"],
"message": r["message"],
"device_timestamp": r["device_timestamp"],
"received_at": parse_dt(r["received_at"]),
}
for r in rows
]
async with AsyncPgSession() as session:
async with session.begin():
await session.execute(
text("""
INSERT INTO device_logs
(device_serial, level, message, device_timestamp, received_at)
VALUES
(:device_serial, :level, :message, :device_timestamp, :received_at)
"""),
records,
)
total_inserted += len(records)
offset += BATCH_SIZE
pct = min(100, int(total_inserted / source_count * 100))
print(f" {total_inserted}/{source_count} rows inserted ({pct}%)")
await sqlite.close()
# Final count verify
async with AsyncPgSession() as session:
dest_count = await pg_count(session, "device_logs")
if dest_count < source_count:
msg = f"Count mismatch: source={source_count} postgres={dest_count}"
print(f"ERROR: {msg}", file=sys.stderr)
await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg)
sys.exit(1)
print(f"Postgres: {dest_count} rows ✓")
await log_run(SCRIPT, source_count, dest_count)
if __name__ == "__main__":
asyncio.run(run())