Phase 1 of Migration. Running Scripts
This commit is contained in:
93
backend/migration/migrate_device_logs.py
Normal file
93
backend/migration/migrate_device_logs.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""
|
||||
Phase 1 — Step 1.12: device_logs (SQLite → Postgres)
|
||||
|
||||
Largest table — migrated in batches of 10,000 rows to avoid memory issues.
|
||||
device_logs is a partitioned table; rows route automatically to the correct
|
||||
monthly partition based on received_at.
|
||||
|
||||
Run on VPS:
|
||||
docker compose exec backend python -m migration.migrate_device_logs
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from migration.utils import open_sqlite, AsyncPgSession, parse_dt, log_run, pg_count
|
||||
|
||||
SCRIPT = "migrate_device_logs"
|
||||
BATCH_SIZE = 10_000
|
||||
|
||||
|
||||
async def run() -> None:
|
||||
sqlite = await open_sqlite()
|
||||
|
||||
# Total count first
|
||||
count_row = await sqlite.execute_fetchall("SELECT COUNT(*) FROM device_logs")
|
||||
source_count = count_row[0][0]
|
||||
print(f"Source (SQLite): {source_count} device_logs rows")
|
||||
|
||||
if source_count == 0:
|
||||
await sqlite.close()
|
||||
print("Nothing to migrate.")
|
||||
await log_run(SCRIPT, 0, 0, notes="source empty")
|
||||
return
|
||||
|
||||
offset = 0
|
||||
total_inserted = 0
|
||||
|
||||
while offset < source_count:
|
||||
rows = await sqlite.execute_fetchall(
|
||||
"SELECT * FROM device_logs ORDER BY received_at LIMIT ? OFFSET ?",
|
||||
(BATCH_SIZE, offset),
|
||||
)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
records = [
|
||||
{
|
||||
"device_serial": r["device_serial"],
|
||||
"level": r["level"],
|
||||
"message": r["message"],
|
||||
"device_timestamp": r["device_timestamp"],
|
||||
"received_at": parse_dt(r["received_at"]),
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
async with AsyncPgSession() as session:
|
||||
async with session.begin():
|
||||
await session.execute(
|
||||
text("""
|
||||
INSERT INTO device_logs
|
||||
(device_serial, level, message, device_timestamp, received_at)
|
||||
VALUES
|
||||
(:device_serial, :level, :message, :device_timestamp, :received_at)
|
||||
"""),
|
||||
records,
|
||||
)
|
||||
|
||||
total_inserted += len(records)
|
||||
offset += BATCH_SIZE
|
||||
pct = min(100, int(total_inserted / source_count * 100))
|
||||
print(f" {total_inserted}/{source_count} rows inserted ({pct}%)")
|
||||
|
||||
await sqlite.close()
|
||||
|
||||
# Final count verify
|
||||
async with AsyncPgSession() as session:
|
||||
dest_count = await pg_count(session, "device_logs")
|
||||
|
||||
if dest_count < source_count:
|
||||
msg = f"Count mismatch: source={source_count} postgres={dest_count}"
|
||||
print(f"ERROR: {msg}", file=sys.stderr)
|
||||
await log_run(SCRIPT, source_count, dest_count, success=False, notes=msg)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Postgres: {dest_count} rows ✓")
|
||||
await log_run(SCRIPT, source_count, dest_count)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user