How to Build a Data Migration Pipeline: ETL Patterns and Validation
Step-by-step guide to migrating data between systems. Covers schema mapping, ETL pipeline construction, data validation, and zero-downtime cutover strategies.
Data migration is where good intentions meet messy reality. The source system never has the data you expected, the mapping is never 1:1, and there’s always a column called misc_field_3 that everyone depends on but nobody can explain. 60% of data migrations run over schedule and over budget — not because of technical complexity, but because of insufficient profiling and validation. This guide gives you a structured approach that catches problems early.
The golden rule: profile first, migrate later. Every hour spent profiling source data saves 10 hours of debugging during migration.
Step 1: Profile Your Source Data
Never trust the documentation. Profile the actual data.
1.1 SQL-Based Profiling
-- Column-level profiling: nulls, cardinality, min/max
SELECT
column_name,
data_type,
COUNT(*) AS total_rows,
COUNT(column_name) AS non_null,
COUNT(*) - COUNT(column_name) AS null_count,
ROUND(100.0 * (COUNT(*) - COUNT(column_name)) / COUNT(*), 2) AS null_pct,
COUNT(DISTINCT column_name) AS distinct_values
FROM information_schema.columns c
CROSS APPLY (
SELECT TOP 1000000 *
FROM your_source_table
) t
GROUP BY column_name, data_type
ORDER BY null_pct DESC;
1.2 Python Profiling
import pandas as pd
df = pd.read_sql("SELECT * FROM customers", conn)
# Generate profiling report
profile = pd.DataFrame({
'dtype': df.dtypes,
'non_null': df.count(),
'null_pct': (df.isnull().sum() / len(df) * 100).round(2),
'unique': df.nunique(),
'sample': df.iloc[0]
})
print(profile.to_markdown())
1.3 Data Quality Red Flags
| Red Flag | What It Tells You | Action |
|---|---|---|
| > 50% null in a column | Column might be unused or optional | Verify with business: is it needed in target? |
| Cardinality = 1 | Same value for every row | Probably a default — consider dropping |
| Dates before 1970 or after 2030 | Invalid data entry | Define a valid date range rule |
| Phone numbers with letters | Free-text entry without validation | Add cleansing transform |
| Duplicate primary keys | Source integrity issue | Deduplicate before migration |
| Encoding issues (mojibake) | Character set mismatch | Convert encoding in transform step |
Step 2: Build Your Schema Mapping Document
The mapping document is the contract between source and target. Every stakeholder must sign off.
| Source Table | Source Column | Type | Target Table | Target Column | Type | Transform | Notes |
|---|---|---|---|---|---|---|---|
| cust_master | cust_id | INT | customers | id | UUID | Generate UUID | Keep source_id for reconciliation |
| cust_master | cust_nm | VARCHAR(50) | customers | full_name | VARCHAR(100) | Trim + Title Case | |
| cust_master | phone_1 | VARCHAR(20) | customers | phone | VARCHAR(15) | Strip non-digits | May contain letters |
| cust_master | crt_dt | DATETIME | customers | created_at | TIMESTAMPTZ | Convert to UTC | Source is EST |
| cust_master | misc_field_3 | VARCHAR(100) | customers | legacy_ref | VARCHAR(100) | Pass-through | Business says “don’t delete” |
| cust_master | status | CHAR(1) | customers | is_active | BOOLEAN | Map A→true, else→false |
:::tip[Golden Rule]
If a column exists in the source but you can’t map it, don’t delete it — park it in a _legacy or _unmapped table. You will need it during reconciliation.
:::
Mapping Complexity Categories
| Category | Description | Example | Risk |
|---|---|---|---|
| 1:1 Direct | Same field, same type | email → email | Low |
| Type conversion | Same field, different type | DATETIME → TIMESTAMPTZ | Low |
| Transform | Business logic applied | cust_nm → full_name (title case) | Medium |
| Split/Merge | One field becomes many (or vice versa) | full_address → street, city, zip | High |
| Derived | New field calculated from source | total_revenue from sum of orders | High |
| Lookup/Enrich | Join with reference data | country_code → country_name | Medium |
Step 3: Build the ETL Pipeline
3.1 Python + SQLAlchemy Pattern
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime
import uuid
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Connection setup
source_engine = create_engine("mssql+pyodbc://...")
target_engine = create_engine("postgresql://...")
BATCH_SIZE = 5000
def extract(offset, limit):
"""Extract a batch from source"""
query = text(f"""
SELECT cust_id, cust_nm, phone_1, crt_dt, misc_field_3
FROM cust_master
ORDER BY cust_id
OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY
""")
return pd.read_sql(query, source_engine,
params={"offset": offset, "limit": limit})
def transform(df):
"""Apply business rules and data cleansing"""
transformed = pd.DataFrame()
transformed['id'] = [str(uuid.uuid4()) for _ in range(len(df))]
transformed['full_name'] = df['cust_nm'].str.strip().str.title()
transformed['phone'] = df['phone_1'].str.replace(r'[^\d]', '', regex=True)
transformed['created_at'] = pd.to_datetime(df['crt_dt'], utc=True)
transformed['legacy_ref'] = df['misc_field_3']
transformed['migrated_at'] = datetime.utcnow()
transformed['source_id'] = df['cust_id'] # Keep for reconciliation
return transformed
def load(df):
"""Load batch into target"""
df.to_sql('customers', target_engine,
if_exists='append', index=False,
method='multi', chunksize=1000)
def run_migration():
"""Execute full migration in batches"""
offset = 0
total = 0
errors = []
while True:
batch = extract(offset, BATCH_SIZE)
if batch.empty:
break
try:
transformed = transform(batch)
load(transformed)
total += len(batch)
logger.info(f"Migrated {total} records...")
except Exception as e:
logger.error(f"Error at offset {offset}: {e}")
errors.append({"offset": offset, "error": str(e)})
offset += BATCH_SIZE
logger.info(f"Migration complete: {total} records, {len(errors)} errors")
return {"total": total, "errors": errors}
if __name__ == "__main__":
result = run_migration()
3.2 Error Handling Strategy
| Error Type | Strategy | Example |
|---|---|---|
| Data type mismatch | Log + skip row + continue | Phone number can’t convert |
| Constraint violation | Log + quarantine table + continue | Duplicate key in target |
| Connection failure | Retry 3x with backoff, then fail | Database timeout |
| Transform exception | Log + use default value + continue | Date parsing error → NULL |
| Full batch failure | Log + save to dead letter table + continue | Entire batch write fails |
# Dead letter table for failed records
def quarantine(df, error, table="migration_quarantine"):
"""Store failed records for later review"""
df['error_message'] = str(error)
df['quarantined_at'] = datetime.utcnow()
df.to_sql(table, target_engine, if_exists='append', index=False)
Step 4: Validate the Migration
Validation is not optional. Every migration needs at minimum three checks.
4.1 Row Count Reconciliation
-- Source count
SELECT COUNT(*) AS source_count FROM cust_master;
-- Target count (including quarantined)
SELECT COUNT(*) AS target_count FROM customers;
SELECT COUNT(*) AS quarantine_count FROM migration_quarantine;
-- source_count = target_count + quarantine_count
4.2 Checksum Validation
-- Source checksum (SQL Server)
SELECT CHECKSUM_AGG(CHECKSUM(cust_id, cust_nm, phone_1))
FROM cust_master;
-- Target checksum (PostgreSQL equivalent)
SELECT md5(string_agg(
id::text || full_name || phone, ''
ORDER BY source_id
)) FROM customers;
4.3 Sample-Based Spot Checks
# Random sample comparison
import random
source_ids = pd.read_sql(
"SELECT cust_id FROM cust_master", source_engine
)['cust_id'].tolist()
sample = random.sample(source_ids, min(100, len(source_ids)))
failures = []
for sid in sample:
source_row = pd.read_sql(
f"SELECT * FROM cust_master WHERE cust_id = {sid}",
source_engine
).iloc[0]
target_row = pd.read_sql(
f"SELECT * FROM customers WHERE source_id = {sid}",
target_engine
).iloc[0]
# Compare transformed values
try:
assert target_row['full_name'] == source_row['cust_nm'].strip().title()
print(f"✔ Record {sid} validated")
except AssertionError:
failures.append(sid)
print(f"✘ Record {sid} FAILED")
print(f"\nValidation: {len(sample) - len(failures)}/{len(sample)} passed")
4.4 Business Logic Validation
| Validation | Query | Expected |
|---|---|---|
| Total revenue matches | SUM(amount) source vs target | Exact match |
| Customer count by region | GROUP BY region source vs target | Exact match |
| Date range preserved | MIN(date), MAX(date) | Same range |
| No orphaned foreign keys | LEFT JOIN target FK tables | 0 orphans |
| Enum values mapped correctly | DISTINCT status in target | Only valid values |
Step 5: Plan the Cutover
Zero-Downtime Strategy
- T-7 days: Run full historical migration
- T-1 day: Run incremental sync (changed records only)
- T-4 hours: Set source to read-only mode
- T-2 hours: Final incremental sync
- T-0: Switch application to target database
- T+1 hour: Validate in production
- T+24 hours: Confirm or rollback
-- Change tracking for incremental sync (SQL Server)
ALTER DATABASE SourceDB SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 7 DAYS, AUTO_CLEANUP = ON);
-- Get changes since last sync
SELECT ct.cust_id, ct.SYS_CHANGE_OPERATION
FROM CHANGETABLE(CHANGES cust_master, @last_sync_version) AS ct;
Rollback Plan
| Scenario | Trigger | Action | RTO |
|---|---|---|---|
| Target data incorrect | Validation failure post-cutover | Switch DNS back to source | 15 minutes |
| Target performance unacceptable | Latency > 3x baseline | Switch DNS back to source | 15 minutes |
| Application compatibility issue | Error rate > 5% | Deploy previous app version + switch DNS | 30 minutes |
| Full rollback | Executive decision within 24 hours | Restore source from backup + switch DNS | 2 hours |
Common Migration Failures
| Failure | Frequency | Root Cause | Prevention |
|---|---|---|---|
| Data mismatch in production | 40% of migrations | Insufficient validation | 3-tier validation (count, checksum, sample) |
| Extended downtime | 35% of migrations | No incremental sync | Implement CDC/change tracking |
| Performance degradation | 25% of migrations | Missing indexes on target | Benchmark target with production-like load |
| Character encoding issues | 20% of migrations | Source/target collation mismatch | Test with international characters early |
| Timezone errors | 15% of migrations | Naive datetime handling | Standardize on UTC throughout |
Migration Checklist
- Profile source data (nulls, types, cardinality, quality red flags)
- Build and sign off schema mapping document (all stakeholders)
- Classify every mapping (direct, transform, split, derived)
- Implement ETL pipeline with batch processing and error handling
- Dead letter / quarantine table for failed records
- Row count reconciliation (source = target + quarantine)
- Checksum validation on key columns
- Sample-based spot checks (100+ records, automated comparison)
- Business logic validation (totals, distributions, constraints)
- Incremental sync mechanism tested (CDC or change tracking)
- Cutover runbook with rollback steps and RTO targets
- Performance benchmarks on target match or exceed source
- Stakeholder sign-off on validation results before cutover
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For a managed data migration engagement, visit garnetgrid.com. :::