Verified by Garnet Grid

How to Build a Data Migration Pipeline: ETL Patterns and Validation

Step-by-step guide to migrating data between systems. Covers schema mapping, ETL pipeline construction, data validation, and zero-downtime cutover strategies.

Data migration is where good intentions meet messy reality. The source system never has the data you expected, the mapping is never 1:1, and there’s always a column called misc_field_3 that everyone depends on but nobody can explain. 60% of data migrations run over schedule and over budget — not because of technical complexity, but because of insufficient profiling and validation. This guide gives you a structured approach that catches problems early.

The golden rule: profile first, migrate later. Every hour spent profiling source data saves 10 hours of debugging during migration.


Step 1: Profile Your Source Data

Never trust the documentation. Profile the actual data.

1.1 SQL-Based Profiling

-- Column-level profiling: nulls, cardinality, min/max
SELECT
    column_name,
    data_type,
    COUNT(*) AS total_rows,
    COUNT(column_name) AS non_null,
    COUNT(*) - COUNT(column_name) AS null_count,
    ROUND(100.0 * (COUNT(*) - COUNT(column_name)) / COUNT(*), 2) AS null_pct,
    COUNT(DISTINCT column_name) AS distinct_values
FROM information_schema.columns c
CROSS APPLY (
    SELECT TOP 1000000 *
    FROM your_source_table
) t
GROUP BY column_name, data_type
ORDER BY null_pct DESC;

1.2 Python Profiling

import pandas as pd

df = pd.read_sql("SELECT * FROM customers", conn)

# Generate profiling report
profile = pd.DataFrame({
    'dtype': df.dtypes,
    'non_null': df.count(),
    'null_pct': (df.isnull().sum() / len(df) * 100).round(2),
    'unique': df.nunique(),
    'sample': df.iloc[0]
})
print(profile.to_markdown())

1.3 Data Quality Red Flags

Red FlagWhat It Tells YouAction
> 50% null in a columnColumn might be unused or optionalVerify with business: is it needed in target?
Cardinality = 1Same value for every rowProbably a default — consider dropping
Dates before 1970 or after 2030Invalid data entryDefine a valid date range rule
Phone numbers with lettersFree-text entry without validationAdd cleansing transform
Duplicate primary keysSource integrity issueDeduplicate before migration
Encoding issues (mojibake)Character set mismatchConvert encoding in transform step

Step 2: Build Your Schema Mapping Document

The mapping document is the contract between source and target. Every stakeholder must sign off.

Source TableSource ColumnTypeTarget TableTarget ColumnTypeTransformNotes
cust_mastercust_idINTcustomersidUUIDGenerate UUIDKeep source_id for reconciliation
cust_mastercust_nmVARCHAR(50)customersfull_nameVARCHAR(100)Trim + Title Case
cust_masterphone_1VARCHAR(20)customersphoneVARCHAR(15)Strip non-digitsMay contain letters
cust_mastercrt_dtDATETIMEcustomerscreated_atTIMESTAMPTZConvert to UTCSource is EST
cust_mastermisc_field_3VARCHAR(100)customerslegacy_refVARCHAR(100)Pass-throughBusiness says “don’t delete”
cust_masterstatusCHAR(1)customersis_activeBOOLEANMap A→true, else→false

:::tip[Golden Rule] If a column exists in the source but you can’t map it, don’t delete it — park it in a _legacy or _unmapped table. You will need it during reconciliation. :::

Mapping Complexity Categories

CategoryDescriptionExampleRisk
1:1 DirectSame field, same typeemailemailLow
Type conversionSame field, different typeDATETIMETIMESTAMPTZLow
TransformBusiness logic appliedcust_nmfull_name (title case)Medium
Split/MergeOne field becomes many (or vice versa)full_addressstreet, city, zipHigh
DerivedNew field calculated from sourcetotal_revenue from sum of ordersHigh
Lookup/EnrichJoin with reference datacountry_codecountry_nameMedium

Step 3: Build the ETL Pipeline

3.1 Python + SQLAlchemy Pattern

from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime
import uuid
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Connection setup
source_engine = create_engine("mssql+pyodbc://...")
target_engine = create_engine("postgresql://...")

BATCH_SIZE = 5000

def extract(offset, limit):
    """Extract a batch from source"""
    query = text(f"""
        SELECT cust_id, cust_nm, phone_1, crt_dt, misc_field_3
        FROM cust_master
        ORDER BY cust_id
        OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY
    """)
    return pd.read_sql(query, source_engine,
                       params={"offset": offset, "limit": limit})

def transform(df):
    """Apply business rules and data cleansing"""
    transformed = pd.DataFrame()
    transformed['id'] = [str(uuid.uuid4()) for _ in range(len(df))]
    transformed['full_name'] = df['cust_nm'].str.strip().str.title()
    transformed['phone'] = df['phone_1'].str.replace(r'[^\d]', '', regex=True)
    transformed['created_at'] = pd.to_datetime(df['crt_dt'], utc=True)
    transformed['legacy_ref'] = df['misc_field_3']
    transformed['migrated_at'] = datetime.utcnow()
    transformed['source_id'] = df['cust_id']  # Keep for reconciliation
    return transformed

def load(df):
    """Load batch into target"""
    df.to_sql('customers', target_engine,
              if_exists='append', index=False,
              method='multi', chunksize=1000)

def run_migration():
    """Execute full migration in batches"""
    offset = 0
    total = 0
    errors = []

    while True:
        batch = extract(offset, BATCH_SIZE)
        if batch.empty:
            break

        try:
            transformed = transform(batch)
            load(transformed)
            total += len(batch)
            logger.info(f"Migrated {total} records...")
        except Exception as e:
            logger.error(f"Error at offset {offset}: {e}")
            errors.append({"offset": offset, "error": str(e)})

        offset += BATCH_SIZE

    logger.info(f"Migration complete: {total} records, {len(errors)} errors")
    return {"total": total, "errors": errors}

if __name__ == "__main__":
    result = run_migration()

3.2 Error Handling Strategy

Error TypeStrategyExample
Data type mismatchLog + skip row + continuePhone number can’t convert
Constraint violationLog + quarantine table + continueDuplicate key in target
Connection failureRetry 3x with backoff, then failDatabase timeout
Transform exceptionLog + use default value + continueDate parsing error → NULL
Full batch failureLog + save to dead letter table + continueEntire batch write fails
# Dead letter table for failed records
def quarantine(df, error, table="migration_quarantine"):
    """Store failed records for later review"""
    df['error_message'] = str(error)
    df['quarantined_at'] = datetime.utcnow()
    df.to_sql(table, target_engine, if_exists='append', index=False)

Step 4: Validate the Migration

Validation is not optional. Every migration needs at minimum three checks.

4.1 Row Count Reconciliation

-- Source count
SELECT COUNT(*) AS source_count FROM cust_master;

-- Target count (including quarantined)
SELECT COUNT(*) AS target_count FROM customers;
SELECT COUNT(*) AS quarantine_count FROM migration_quarantine;

-- source_count = target_count + quarantine_count

4.2 Checksum Validation

-- Source checksum (SQL Server)
SELECT CHECKSUM_AGG(CHECKSUM(cust_id, cust_nm, phone_1))
FROM cust_master;

-- Target checksum (PostgreSQL equivalent)
SELECT md5(string_agg(
    id::text || full_name || phone, ''
    ORDER BY source_id
)) FROM customers;

4.3 Sample-Based Spot Checks

# Random sample comparison
import random

source_ids = pd.read_sql(
    "SELECT cust_id FROM cust_master", source_engine
)['cust_id'].tolist()

sample = random.sample(source_ids, min(100, len(source_ids)))
failures = []

for sid in sample:
    source_row = pd.read_sql(
        f"SELECT * FROM cust_master WHERE cust_id = {sid}",
        source_engine
    ).iloc[0]

    target_row = pd.read_sql(
        f"SELECT * FROM customers WHERE source_id = {sid}",
        target_engine
    ).iloc[0]

    # Compare transformed values
    try:
        assert target_row['full_name'] == source_row['cust_nm'].strip().title()
        print(f"✔ Record {sid} validated")
    except AssertionError:
        failures.append(sid)
        print(f"✘ Record {sid} FAILED")

print(f"\nValidation: {len(sample) - len(failures)}/{len(sample)} passed")

4.4 Business Logic Validation

ValidationQueryExpected
Total revenue matchesSUM(amount) source vs targetExact match
Customer count by regionGROUP BY region source vs targetExact match
Date range preservedMIN(date), MAX(date)Same range
No orphaned foreign keysLEFT JOIN target FK tables0 orphans
Enum values mapped correctlyDISTINCT status in targetOnly valid values

Step 5: Plan the Cutover

Zero-Downtime Strategy

  1. T-7 days: Run full historical migration
  2. T-1 day: Run incremental sync (changed records only)
  3. T-4 hours: Set source to read-only mode
  4. T-2 hours: Final incremental sync
  5. T-0: Switch application to target database
  6. T+1 hour: Validate in production
  7. T+24 hours: Confirm or rollback
-- Change tracking for incremental sync (SQL Server)
ALTER DATABASE SourceDB SET CHANGE_TRACKING = ON
    (CHANGE_RETENTION = 7 DAYS, AUTO_CLEANUP = ON);

-- Get changes since last sync
SELECT ct.cust_id, ct.SYS_CHANGE_OPERATION
FROM CHANGETABLE(CHANGES cust_master, @last_sync_version) AS ct;

Rollback Plan

ScenarioTriggerActionRTO
Target data incorrectValidation failure post-cutoverSwitch DNS back to source15 minutes
Target performance unacceptableLatency > 3x baselineSwitch DNS back to source15 minutes
Application compatibility issueError rate > 5%Deploy previous app version + switch DNS30 minutes
Full rollbackExecutive decision within 24 hoursRestore source from backup + switch DNS2 hours

Common Migration Failures

FailureFrequencyRoot CausePrevention
Data mismatch in production40% of migrationsInsufficient validation3-tier validation (count, checksum, sample)
Extended downtime35% of migrationsNo incremental syncImplement CDC/change tracking
Performance degradation25% of migrationsMissing indexes on targetBenchmark target with production-like load
Character encoding issues20% of migrationsSource/target collation mismatchTest with international characters early
Timezone errors15% of migrationsNaive datetime handlingStandardize on UTC throughout

Migration Checklist

  • Profile source data (nulls, types, cardinality, quality red flags)
  • Build and sign off schema mapping document (all stakeholders)
  • Classify every mapping (direct, transform, split, derived)
  • Implement ETL pipeline with batch processing and error handling
  • Dead letter / quarantine table for failed records
  • Row count reconciliation (source = target + quarantine)
  • Checksum validation on key columns
  • Sample-based spot checks (100+ records, automated comparison)
  • Business logic validation (totals, distributions, constraints)
  • Incremental sync mechanism tested (CDC or change tracking)
  • Cutover runbook with rollback steps and RTO targets
  • Performance benchmarks on target match or exceed source
  • Stakeholder sign-off on validation results before cutover

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For a managed data migration engagement, visit garnetgrid.com. :::

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →