Data Pipeline Idempotency
Build data pipelines that produce correct results even when retried or run out of order. Covers idempotent writes, deduplication, exactly-once processing, partition-based reprocessing, and the patterns that make pipelines resilient to failures.
Every data pipeline will fail and need to be retried. The question is: does retrying produce the correct result, or does it produce duplicates, corrupt data, or missing records? Idempotent pipelines produce the same result whether they run once, twice, or ten times. This is the foundation of reliable data engineering.
What Is Idempotency
Non-Idempotent Pipeline:
Run 1: INSERT INTO orders VALUES (1, 'widget', 100) → 1 row ✓
Run 2: INSERT INTO orders VALUES (1, 'widget', 100) → 2 rows ✗ (duplicate!)
Run 3: INSERT INTO orders VALUES (1, 'widget', 100) → 3 rows ✗ (more duplicates!)
Idempotent Pipeline:
Run 1: INSERT INTO orders VALUES (1, 'widget', 100)
ON CONFLICT (id) DO UPDATE SET ... → 1 row ✓
Run 2: Same query → 1 row ✓ (upsert, no duplicate)
Run 3: Same query → 1 row ✓ (still 1 row)
Idempotent = same input → same output, always
Patterns
# Pattern 1: UPSERT (Merge)
class UpsertWriter:
def write(self, records, target_table):
"""Write records idempotently using upsert."""
for record in records:
self.db.execute(f"""
INSERT INTO {target_table} (id, data, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
data = EXCLUDED.data,
updated_at = EXCLUDED.updated_at
""", (record.id, record.data, record.updated_at))
# Pattern 2: Partition Overwrite
class PartitionOverwriter:
def write(self, records, target_table, partition_key):
"""Replace entire partition — always produces same result."""
partition_value = records[0][partition_key]
self.db.execute(f"""
DELETE FROM {target_table}
WHERE {partition_key} = %s
""", (partition_value,))
self.db.execute_batch(f"""
INSERT INTO {target_table} VALUES (%s, %s, %s, %s)
""", records)
# Pattern 3: Deduplication Key
class DeduplicatingWriter:
def write(self, records, target_table):
"""Use a deduplication key to prevent double-processing."""
for record in records:
dedup_key = f"{record.source}:{record.id}:{record.version}"
exists = self.db.query(
"SELECT 1 FROM processed_keys WHERE key = %s",
(dedup_key,)
)
if not exists:
self.db.execute("INSERT INTO processed_keys VALUES (%s)", (dedup_key,))
self.db.execute(f"INSERT INTO {target_table} VALUES ...", record)
Exactly-Once Processing
At-Most-Once:
Process message, then acknowledge
If crash after process, before ack → message lost
At-Least-Once:
Acknowledge after processing
If crash between process and ack → message reprocessed
Duplicates possible
Exactly-Once:
At-least-once delivery + idempotent processing
Message arrives → Check dedup key → Process → Record dedup key → Ack
If crash and retry:
Message arrives again → Check dedup key → ALREADY PROCESSED → Skip → Ack
Result: Each message processed exactly once (effectively)
Anti-Patterns
| Anti-Pattern | Consequence | Fix |
|---|---|---|
| INSERT without dedup | Duplicates on retry | UPSERT or dedup key |
| Append-only aggregations | Counts doubled on retry | Partition overwrite for aggregation tables |
| Non-deterministic transforms | Different results on retry | Deterministic functions, fixed timestamps |
| DELETE + INSERT without transaction | Partial state on crash | Atomic partition replacement |
| Autoincrement as dedup key | New ID on retry = duplicate with different ID | Use natural key or hash-based dedup key |
Idempotency is not a nice-to-have — it is a requirement. Every pipeline fails eventually. The only question is whether retry produces a correct result or a corrupted one.