ESC
Type to search guides, tutorials, and reference documentation.
Verified by Garnet Grid

Data Pipeline Idempotency

Build data pipelines that produce correct results even when retried or run out of order. Covers idempotent writes, deduplication, exactly-once processing, partition-based reprocessing, and the patterns that make pipelines resilient to failures.

Every data pipeline will fail and need to be retried. The question is: does retrying produce the correct result, or does it produce duplicates, corrupt data, or missing records? Idempotent pipelines produce the same result whether they run once, twice, or ten times. This is the foundation of reliable data engineering.


What Is Idempotency

Non-Idempotent Pipeline:
  Run 1: INSERT INTO orders VALUES (1, 'widget', 100)  → 1 row  ✓
  Run 2: INSERT INTO orders VALUES (1, 'widget', 100)  → 2 rows ✗ (duplicate!)
  Run 3: INSERT INTO orders VALUES (1, 'widget', 100)  → 3 rows ✗ (more duplicates!)

Idempotent Pipeline:
  Run 1: INSERT INTO orders VALUES (1, 'widget', 100)
         ON CONFLICT (id) DO UPDATE SET ...            → 1 row  ✓
  Run 2: Same query                                    → 1 row  ✓ (upsert, no duplicate)
  Run 3: Same query                                    → 1 row  ✓ (still 1 row)

Idempotent = same input → same output, always

Patterns

# Pattern 1: UPSERT (Merge)
class UpsertWriter:
    def write(self, records, target_table):
        """Write records idempotently using upsert."""
        for record in records:
            self.db.execute(f"""
                INSERT INTO {target_table} (id, data, updated_at)
                VALUES (%s, %s, %s)
                ON CONFLICT (id) DO UPDATE SET
                    data = EXCLUDED.data,
                    updated_at = EXCLUDED.updated_at
            """, (record.id, record.data, record.updated_at))

# Pattern 2: Partition Overwrite
class PartitionOverwriter:
    def write(self, records, target_table, partition_key):
        """Replace entire partition — always produces same result."""
        partition_value = records[0][partition_key]
        
        self.db.execute(f"""
            DELETE FROM {target_table}
            WHERE {partition_key} = %s
        """, (partition_value,))
        
        self.db.execute_batch(f"""
            INSERT INTO {target_table} VALUES (%s, %s, %s, %s)
        """, records)

# Pattern 3: Deduplication Key
class DeduplicatingWriter:
    def write(self, records, target_table):
        """Use a deduplication key to prevent double-processing."""
        for record in records:
            dedup_key = f"{record.source}:{record.id}:{record.version}"
            
            exists = self.db.query(
                "SELECT 1 FROM processed_keys WHERE key = %s", 
                (dedup_key,)
            )
            
            if not exists:
                self.db.execute("INSERT INTO processed_keys VALUES (%s)", (dedup_key,))
                self.db.execute(f"INSERT INTO {target_table} VALUES ...", record)

Exactly-Once Processing

At-Most-Once:
  Process message, then acknowledge
  If crash after process, before ack → message lost
  
At-Least-Once:
  Acknowledge after processing
  If crash between process and ack → message reprocessed
  Duplicates possible
  
Exactly-Once:
  At-least-once delivery + idempotent processing
  
  Message arrives → Check dedup key → Process → Record dedup key → Ack
  
  If crash and retry:
  Message arrives again → Check dedup key → ALREADY PROCESSED → Skip → Ack
  
  Result: Each message processed exactly once (effectively)

Anti-Patterns

Anti-PatternConsequenceFix
INSERT without dedupDuplicates on retryUPSERT or dedup key
Append-only aggregationsCounts doubled on retryPartition overwrite for aggregation tables
Non-deterministic transformsDifferent results on retryDeterministic functions, fixed timestamps
DELETE + INSERT without transactionPartial state on crashAtomic partition replacement
Autoincrement as dedup keyNew ID on retry = duplicate with different IDUse natural key or hash-based dedup key

Idempotency is not a nice-to-have — it is a requirement. Every pipeline fails eventually. The only question is whether retry produces a correct result or a corrupted one.

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →