Case Study
Multi-Tenant Migration Engine
Zero-dependency Python 2PC orchestrator — runs SQL migrations in parallel across isolated SQLite tenant databases with fleet-wide commit or rollback.
- Python
- sqlite3
- ThreadPoolExecutor
- Two-Phase Commit
- python
- multi-tenant
- crm
- sqlite
- distributed-systems
- migration
- zero-dependency
Overview
Building a CRM like Gnomad CRM introduces one of the hardest problems in SaaS architecture: tenant isolation. When every client has their own isolated database, deploying a single structural update (like adding a new column) means running migrations across dozens or hundreds of databases simultaneously.
If tenant #42 fails due to a lock or corruption, you cannot leave the system out of sync. You must roll back tenants #1 through #41 so the entire fleet maintains identical schemas.
This project implements a Two-Phase Commit (2PC) orchestrator. It uses Python’s concurrent.futures to execute SQL migrations in parallel across an array of independent SQLite databases. If a single node reports a failure, the engine automatically triggers a fleet-wide rollback.
Part of the DIY build series: Projects 1 · 2 · 3 · 4 · 5 · 6.
What it implements
- Fleet discovery — scans a
tenants/directory for.dbfiles - Phase 1 (Prepare) — concurrent
BEGIN+ migration script per tenant, no commit - Phase 2 (Commit/Rollback) — fleet-wide resolution if any node fails
- Manual transaction control —
isolation_level=Nonefor explicit SQLite lifecycle - Thread-safe I/O —
ThreadPoolExecutorwith connection cleanup on panic paths
Project setup
1. Initialize the environment
mkdir fleet-migrator && cd fleet-migrator
mkdir tenants
touch migrate.py 001_add_subscription_tier.sql
2. Mock tenant databases
Create three identical databases representing three Gnomad CRM clients:
for i in {1..3}; do sqlite3 tenants/tenant_0$i.db "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);"; done
3. Migration SQL
Paste into 001_add_subscription_tier.sql:
-- Standard deployment: Add a subscription tier to the users table
ALTER TABLE users ADD COLUMN subscription_tier TEXT DEFAULT 'basic';
The code (migrate.py)
Standard library only — maps connections to threads, handles manual transaction lifecycles, and aggregates system states:
#!/usr/bin/env python3
import argparse
import os
import sqlite3
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- ANSI Terminal Colors ---
CLR_RESET = "\033[0m"
CLR_RED = "\033[91m"
CLR_GREEN = "\033[92m"
CLR_YELLOW = "\033[93m"
CLR_CYAN = "\033[96m"
CLR_BOLD = "\033[1m"
def get_tenant_databases(directory: str) -> list:
"""Discovers all SQLite databases in the target directory."""
if not os.path.isdir(directory):
print(f"{CLR_RED}Error: Directory '{directory}' not found.{CLR_RESET}")
sys.exit(1)
dbs = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.db')]
if not dbs:
print(f"{CLR_YELLOW}No .db files found in '{directory}'.{CLR_RESET}")
sys.exit(1)
return sorted(dbs)
def read_migration_file(filepath: str) -> str:
"""Reads the SQL migration payload."""
if not os.path.isfile(filepath):
print(f"{CLR_RED}Error: Migration file '{filepath}' not found.{CLR_RESET}")
sys.exit(1)
with open(filepath, 'r') as f:
return f.read()
def execute_phase_1(db_path: str, sql_script: str) -> dict:
"""
PHASE 1 (Prepare): Opens a connection, begins a manual transaction,
and attempts to execute the SQL. It does NOT commit.
"""
try:
conn = sqlite3.connect(db_path, isolation_level=None, timeout=5.0)
cursor = conn.cursor()
cursor.execute("BEGIN TRANSACTION;")
cursor.executescript(sql_script)
return {"db": db_path, "conn": conn, "status": "prepared", "error": None}
except Exception as e:
return {"db": db_path, "conn": conn if 'conn' in locals() else None, "status": "failed", "error": str(e)}
def finalize_phase_2(node: dict, action: str):
"""
PHASE 2 (Commit/Rollback): Resolves the holding transaction and closes the connection.
"""
conn = node.get("conn")
if not conn:
return
try:
if action == "COMMIT":
conn.execute("COMMIT;")
elif action == "ROLLBACK":
conn.execute("ROLLBACK;")
except Exception as e:
print(f"{CLR_RED}CRITICAL: Failed to {action} on {node['db']}: {e}{CLR_RESET}")
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description="Multi-Tenant 2PC Migration Orchestrator")
parser.add_argument("sql_file", help="Path to the .sql migration file")
parser.add_argument("--dir", default="tenants", help="Directory containing tenant .db files")
args = parser.parse_args()
sql_payload = read_migration_file(args.sql_file)
tenant_dbs = get_tenant_databases(args.dir)
print(f"{CLR_CYAN}{CLR_BOLD}🚀 Initializing Fleet Migration Orchestrator...{CLR_RESET}")
print(f"Targeting {len(tenant_dbs)} tenant databases.\n")
nodes = []
has_failures = False
print(f"{CLR_YELLOW}▶ PHASE 1: Applying pending transactions across fleet...{CLR_RESET}")
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(execute_phase_1, db, sql_payload): db for db in tenant_dbs}
for future in as_completed(futures):
result = future.result()
nodes.append(result)
if result["status"] == "failed":
has_failures = True
print(f" {CLR_RED}[❌ FAILED ]{CLR_RESET} {os.path.basename(result['db'])} - {result['error']}")
else:
print(f" {CLR_GREEN}[✅ PREPARED]{CLR_RESET} {os.path.basename(result['db'])}")
print("-" * 50)
if has_failures:
print(f"{CLR_RED}{CLR_BOLD}🚨 Failure detected in fleet! Triggering global ROLLBACK...{CLR_RESET}")
action = "ROLLBACK"
else:
print(f"{CLR_GREEN}{CLR_BOLD}✅ All nodes prepared successfully. Triggering global COMMIT...{CLR_RESET}")
action = "COMMIT"
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(finalize_phase_2, node, action) for node in nodes]
for _ in as_completed(futures):
pass
if action == "COMMIT":
print(f"\n{CLR_CYAN}Migration completed successfully across all tenants.{CLR_RESET}")
else:
print(f"\n{CLR_RED}Migration aborted. All tenants safely reverted to previous state.{CLR_RESET}")
sys.exit(1)
if __name__ == "__main__":
main()
Execution & testing
chmod +x migrate.py
Test 1: Happy path (successful migration)
Deploy the new column to all three databases:
./migrate.py 001_add_subscription_tier.sql
The orchestrator prepares all three databases concurrently, then issues a global COMMIT.
Test 2: Rollback simulation
Run the same script a second time:
./migrate.py 001_add_subscription_tier.sql
Because subscription_tier already exists, SQLite throws a duplicate column name error. Even if one thread hits this milliseconds before others, the orchestrator catches it, halts deployment, and issues a fleet-wide ROLLBACK.
Why this shines on a portfolio
- Solves a hard distributed systems problem — standard web developers assume databases handle their own transactions. Coordinating transactions across multiple distinct files/servers requires Two-Phase Commit thinking.
- Concurrent safety —
ThreadPoolExecutormaps I/O-bound tasks efficiently without leaking open connections during exception paths.
Related work
- Gnomad CRM — the multi-tenant architecture this engine protects
- Project 4: Recursive Hierarchy Builder — database-side hierarchy compilation
- Project 6: Local Context Server — agent tooling on the same homelab stack