Idempotency, Atomicity, and Pipeline Restartability
The three properties that separate reliable pipelines from fragile ones — precise definitions, implementation at every layer, and automatic failure recovery.
Why These Three Properties Define the Difference Between a Pipeline and a Liability
A pipeline that works is not the same as a pipeline that is reliable. A pipeline that runs successfully 95% of the time is not a pipeline — it is a source of data corruption and operational anxiety. The 5% of runs that fail are not just an inconvenience; they produce incomplete, partial, or duplicated data that analysts act on and decisions are made from.
Three properties distinguish a reliable pipeline from a fragile one. Idempotency means running the pipeline multiple times with the same input always produces the same correct output — no duplicates, no data loss. Atomicity means each unit of work either completes fully or not at all — no partial states that leave the destination in an inconsistent condition. Restartability means a pipeline that fails at any point can resume from exactly where it stopped — no reprocessing data already written, no skipping data not yet written.
Idempotency — Every Form It Takes in Data Engineering
In mathematics, a function f is idempotent if f(f(x)) = f(x) — applying it twice gives the same result as applying it once. In data engineering, an idempotent pipeline run produces the same destination state whether it executes once or twenty times for the same input parameters. This property is not optional — it is the difference between a pipeline that can be operated and one that requires a human to babysit every rerun.
The three forms of idempotency a pipeline needs
# ── FORM 1: WRITE-LAYER IDEMPOTENCY ─────────────────────────────────────────
# Every write operation to the destination must be idempotent.
# The mechanism: upsert (INSERT with conflict handling) + UNIQUE constraint.
# BAD: plain INSERT — NOT idempotent
INSERT INTO silver.orders (order_id, status, amount)
VALUES (9284751, 'delivered', 380.00);
-- Run this twice → two rows with order_id = 9284751
-- Run after a failure and partial write → inconsistent duplicates
# GOOD: upsert — idempotent
INSERT INTO silver.orders (order_id, status, amount, updated_at, ingested_at)
VALUES (9284751, 'delivered', 380.00, '2026-03-17 20:14:32', NOW())
ON CONFLICT (order_id)
DO UPDATE SET
status = EXCLUDED.status,
amount = EXCLUDED.amount,
updated_at = EXCLUDED.updated_at,
ingested_at = NOW()
WHERE silver.orders.updated_at < EXCLUDED.updated_at;
-- The WHERE clause prevents overwriting newer data with older data.
-- Run this N times → exactly one row, always with the latest values.
-- REQUIRES: UNIQUE constraint or PRIMARY KEY on order_id.
-- Without the UNIQUE constraint, ON CONFLICT has nothing to conflict on:
-- PostgreSQL silently inserts a duplicate instead of updating.
-- Always verify:
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'orders' AND constraint_type IN ('PRIMARY KEY', 'UNIQUE');
# ── FORM 2: EXTRACTION-LAYER IDEMPOTENCY ──────────────────────────────────────
# The extraction query must produce the same result for the same parameters.
# Fixed time windows, not relative windows.
# BAD: relative window — NOT idempotent
SELECT * FROM orders WHERE updated_at > NOW() - INTERVAL '15 minutes';
-- A run at 06:00 extracts data from 05:45.
-- A rerun at 06:10 extracts data from 05:55 — DIFFERENT DATA.
-- Rows between 05:45 and 05:55 are missed on the rerun.
# GOOD: fixed window from checkpoint — idempotent
# checkpoint = '2026-03-17 05:45:00'
# source_now = '2026-03-17 06:00:00' (fixed at run start, stored in run record)
SELECT * FROM orders
WHERE updated_at > '2026-03-17 05:45:00'
AND updated_at <= '2026-03-17 06:00:00';
-- Run this at 06:00 or 06:10 or 06:30 — always extracts the same rows.
-- The upper bound is fixed when the run starts, not re-computed on retry.
# ── FORM 3: FILE-OUTPUT IDEMPOTENCY ────────────────────────────────────────────
# Writing files to S3 or a data lake must be idempotent.
# BAD: append to existing file — NOT idempotent
# with open('s3://bucket/orders/2026-03-17.csv', 'a') as f:
# f.write(new_rows) # rerun appends duplicate rows to same file
# GOOD: overwrite partition — idempotent
df.write .mode('overwrite') # replace the partition, never append
.partitionBy('order_date') .parquet('s3://freshmart-lake/silver/orders')
# Rerunning for 2026-03-17 overwrites the date=2026-03-17 partition.
# The output is identical regardless of how many times it runs.
# GOOD: include run_id in filename — idempotent per run
# output_path = f's3://bucket/orders/date=2026-03-17/run-{run_id}.parquet'
# If run succeeds: file exists with correct data.
# If run reruns (new run_id): new file written, old cleaned up by compaction.
# Downstream reads the partition (all files in date=2026-03-17/) — correct.Idempotency keys — the pattern for APIs and message systems
# When a pipeline calls an external API or writes to a message queue,
# the operation may be delivered more than once (at-least-once delivery).
# Idempotency keys prevent the duplicate from having side effects.
# ── SENDING TO AN API WITH IDEMPOTENCY KEY ────────────────────────────────────
import hashlib, json, requests
def create_payment_idempotency_key(payment_id: str, amount: float, ts: str) -> str:
"""
Generate a deterministic idempotency key from the operation's unique inputs.
Same inputs → same key every time → API recognises duplicate and ignores it.
"""
payload = f'${payment_id}:${amount}:${ts}'
return hashlib.sha256(payload.encode()).hexdigest()[:32]
idempotency_key = create_payment_idempotency_key('pay_xxx', 380.00, '2026-03-17T20:14:32Z')
response = requests.post(
'https://api.razorpay.com/v1/payments',
headers={
'X-Idempotency-Key': idempotency_key,
'Authorization': f'Bearer ${api_key}',
},
json={'amount': 38000, 'currency': 'INR'},
)
# If this request is retried with the same idempotency key:
# Razorpay returns the SAME response as the first successful call.
# The payment is NOT created twice. ✓
# ── CONSUMER-SIDE IDEMPOTENCY — TRACKING PROCESSED EVENT IDs ─────────────────
# When consuming from Kafka or a webhook, the same event may arrive twice.
# Track processed event IDs to detect and skip duplicates.
# Simple in-memory set (for single-process consumers):
processed_ids: set[str] = set()
def process_event(event: dict) -> None:
event_id = event['event_id'] # or source.lsn for CDC events
if event_id in processed_ids:
log.info('Duplicate event ${s} — skipping', event_id)
return
# Process the event
upsert_to_silver(event)
processed_ids.add(event_id)
# Distributed in-memory (Redis SET NX — for multi-process consumers):
def is_duplicate(event_id: str, redis_client) -> bool:
"""
Returns True if this event was already processed.
SET NX: set if not exists — atomic, safe for concurrent consumers.
"""
# nx=True: only set if key does not exist
# ex=86400: expire after 24 hours (events older than 24h assumed unique)
result = redis_client.set(
f'processed:${event_id}',
'1',
nx=True,
ex=86400,
)
return result is None # None = key already existed = duplicate
# ── DATABASE-LEVEL IDEMPOTENCY TRACKING ───────────────────────────────────────
# For pipelines that must guarantee exactly-once processing:
# Record processed event IDs in the destination database.
CREATE TABLE IF NOT EXISTS pipeline.processed_events (
event_id VARCHAR(100) PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
pipeline VARCHAR(100) NOT NULL
);
-- Before processing each event:
INSERT INTO pipeline.processed_events (event_id, pipeline)
VALUES ('evt_xxx', 'orders_cdc')
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id;
-- If returns a row: first time seeing this event → process it
-- If returns nothing: duplicate → skip itAtomicity — No Partial States, Ever
Atomicity in a pipeline means each logical unit of work either completes fully or leaves no trace. The destination never contains partial results that represent an incomplete operation — never half a batch, never a truncated table that lost its data, never a file that was 60% written when the process was killed.
The challenge is that most pipeline operations span multiple steps. Truncating a table and reloading it are two steps. Writing a Parquet file and moving it to the final location are two steps. Updating a row and recording the event are two steps. Atomicity is about making these multi-step operations appear as a single indivisible unit.
Atomicity at the database layer
# ── TRANSACTION BATCHING ───────────────────────────────────────────────────────
# Every write to a relational database should be batched in a transaction.
# Without transactions, each row auto-commits individually.
# BAD: auto-commit per row — NOT atomic
conn.autocommit = True # default in many drivers
for row in rows:
cur.execute("INSERT INTO silver.orders ...", row)
# If process crashes after row 23,000 of 50,000:
# 23,000 rows in destination, 27,000 missing. No way to know where to restart.
# GOOD: batch transaction — atomic
conn.autocommit = False # explicit transaction management
with conn: # context manager: commits on exit, rolls back on exception
for row in rows:
cur.execute("INSERT INTO silver.orders ...", row)
# If crash: entire batch rolled back. Destination unchanged. Rerun = correct.
# EVEN BETTER: executemany for bulk insert (10-100× faster than row loop):
with conn:
psycopg2.extras.execute_values(
cur,
"INSERT INTO silver.orders (order_id, status, amount) VALUES %s "
"ON CONFLICT (order_id) DO UPDATE SET "
"status = EXCLUDED.status, amount = EXCLUDED.amount",
[(row['order_id'], row['status'], row['amount']) for row in rows],
page_size=5000, # rows per INSERT statement
)
# SNOWFLAKE: every statement is auto-committed unless inside explicit transaction
# For multi-statement atomicity in Snowflake:
conn.execute("BEGIN")
conn.execute("INSERT INTO silver.orders_staging ...")
conn.execute("MERGE INTO silver.orders USING silver.orders_staging ...")
conn.execute("DROP TABLE silver.orders_staging")
conn.execute("COMMIT")
# On any error: conn.execute("ROLLBACK")
# ── STAGING TABLE SWAP — zero-downtime full reload ─────────────────────────────
# For full-load pipelines: write to new table, then atomically swap.
# The OLD table serves queries until the instant of swap.
# PostgreSQL (DDL is transactional — rare among databases):
with conn:
# Step 1: load new data into staging (queries still hit old table)
cur.execute("CREATE TABLE silver.store_master_new AS "
"SELECT * FROM source.stores")
# Step 2: atomic swap — both renames in same transaction
cur.execute("ALTER TABLE silver.store_master RENAME TO store_master_old")
cur.execute("ALTER TABLE silver.store_master_new RENAME TO store_master")
# ↑ From this line forward, ALL queries see new data.
# Zero window where the table is empty or has partial data.
# Step 3: drop old (still in same transaction — safe)
cur.execute("DROP TABLE silver.store_master_old")
# COMMIT: rename becomes permanent. Readers see clean transition.
# What readers see:
# Before transaction commits: store_master_old (old data)
# After transaction commits: store_master (new data)
# During transaction: store_master_old (due to MVCC)
# NEVER: empty table, partial table, two tables simultaneously
# Snowflake equivalent (atomic DDL):
conn.execute("CREATE TABLE silver.store_master_new AS SELECT * FROM source.stores")
conn.execute("ALTER TABLE silver.store_master SWAP WITH silver.store_master_new")
# SWAP is atomic in Snowflake — instant switch, no downtime
conn.execute("DROP TABLE silver.store_master_new")Atomicity for file operations
# Files cannot be partially written and remain consistent.
# A file being written can be observed in an incomplete state
# by other processes unless atomicity is enforced explicitly.
# ── WRITE-THEN-RENAME (the standard pattern) ──────────────────────────────────
import os
from pathlib import Path
def write_parquet_atomically(df, final_path: str) -> None:
"""
Write a DataFrame to Parquet atomically.
Other processes never see a partial file.
"""
final = Path(final_path)
tmp = final.with_suffix('.tmp.parquet')
try:
# Write to temporary location (potentially slow)
df.to_parquet(tmp, compression='zstd', index=False)
# Rename to final location (atomic on POSIX filesystems)
# On local filesystems: guaranteed atomic if same filesystem
tmp.rename(final)
# ↑ This is atomic: readers either see the old file or the new file.
# They NEVER see a partial write.
except Exception:
# Clean up incomplete temp file on failure
if tmp.exists():
tmp.unlink()
raise
# ── S3 ATOMIC WRITES ──────────────────────────────────────────────────────────
# S3 PUT operations are atomic for a single object — either the full
# object exists or it does not. S3 does not expose partial uploads.
# However, multipart uploads (> 5 MB) have a non-atomic assembly step.
# SAFE: write complete file in one PUT (< 5 MB):
import boto3
s3 = boto3.client('s3')
s3.put_object(Bucket='freshmart-lake', Key='bronze/orders/part-001.parquet', Body=data)
# Atomic: readers see old key value or new key value, never partial content.
# SAFE: multipart upload with explicit completion:
# boto3 TransferManager handles multipart automatically and atomically.
# The CompleteMultipartUpload API call is atomic — file becomes visible
# only when ALL parts are committed.
# IMPORTANT: use a distinct temporary prefix for in-progress writes:
# Write to: s3://bucket/tmp/run-{run_id}/part-001.parquet
# Then copy: s3://bucket/bronze/orders/date=2026-03-17/part-001.parquet
# Then delete: s3://bucket/tmp/run-{run_id}/part-001.parquet
# Downstream readers only scan bronze/orders/date=2026-03-17/ — they
# never see partial in-progress files from the tmp/ prefix.
# ── DELTA LAKE ATOMICITY ───────────────────────────────────────────────────────
# Delta Lake uses a transaction log for atomic multi-file commits.
# Every Delta write is an atomic transaction at the table level.
# Writing 3 Parquet files to a Delta table:
# 1. Write all 3 Parquet files to the table directory (not yet visible)
# 2. Write a new entry to _delta_log/000000000000000042.json
# containing references to all 3 files
# 3. The log entry is written atomically (single file PUT to S3)
# Once the log entry exists: ALL 3 files become visible simultaneously.
# If step 1 completes but step 2 fails: the 3 Parquet files exist
# but are invisible to readers (not referenced in any log entry).
# Next successful write: Delta VACUUM cleans up the unreferenced files.
# Result: no partial state ever visible to readers.Atomicity at the pipeline level — the two-phase pattern
# A multi-step pipeline needs atomicity at the pipeline level, not just
# at individual writes. The write-validate-commit pattern achieves this.
# PATTERN: Write to staging → validate → atomically promote to production
# If validation fails: staging is deleted, production is unchanged.
# Readers always see either the old production data or the new production data.
def write_with_validation(
rows: list[dict],
dest_conn,
run_id: str,
) -> None:
"""
Write rows to production only if validation passes.
Production table is never modified if validation fails.
"""
staging_table = f'silver.orders_staging_{run_id.replace("-", "_")}'
try:
# ── Phase 1: Write to staging (can fail — production unaffected) ──────
with dest_conn:
dest_conn.execute(f'CREATE TABLE {staging_table} AS '
f'SELECT * FROM silver.orders WHERE 1=0') # empty table, same schema
psycopg2.extras.execute_values(
dest_conn.cursor(),
f'INSERT INTO {staging_table} VALUES %s',
[tuple(row.values()) for row in rows],
)
# ── Phase 2: Validate staging data ────────────────────────────────────
with dest_conn.cursor() as cur:
# Check: no negative amounts
cur.execute(f'SELECT COUNT(*) FROM {staging_table} WHERE order_amount < 0')
if cur.fetchone()[0] > 0:
raise ValueError('Staging has negative order amounts')
# Check: no NULL required fields
cur.execute(f'SELECT COUNT(*) FROM {staging_table} '
f'WHERE order_id IS NULL OR customer_id IS NULL')
if cur.fetchone()[0] > 0:
raise ValueError('Staging has NULL required fields')
# Check: row count is reasonable (within 10% of last 7 days average)
cur.execute("""
SELECT AVG(daily_count) FROM (
SELECT DATE(ingested_at) AS day, COUNT(*) AS daily_count
FROM silver.orders
WHERE ingested_at > NOW() - INTERVAL '7 days'
GROUP BY 1
) counts
""")
avg_daily = cur.fetchone()[0] or 0
staging_count = len(rows)
if avg_daily > 0 and abs(staging_count - avg_daily) / avg_daily > 0.5:
raise ValueError(
f'Staging row count ${staging_count} deviates >50%% from '
f'7-day average ${avg_daily:.0f}'
)
# ── Phase 3: Atomically promote staging to production ─────────────────
with dest_conn:
dest_conn.execute(f"""
INSERT INTO silver.orders
SELECT * FROM {staging_table}
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
order_amount = EXCLUDED.order_amount,
updated_at = EXCLUDED.updated_at,
ingested_at = EXCLUDED.ingested_at
WHERE silver.orders.updated_at < EXCLUDED.updated_at
""")
except Exception:
# Validation or promotion failed — staging still exists, production unchanged
raise
finally:
# Always clean up staging regardless of success or failure
try:
dest_conn.execute(f'DROP TABLE IF EXISTS {staging_table}')
dest_conn.commit()
except Exception:
pass # best effort cleanupRestartability — Automatic Recovery From Any Failure Point
A restartable pipeline picks up exactly where it left off after any failure — at any point in the pipeline, at any time, with any reason for failure. No human involvement needed to determine what was processed, what was not, and what needs to be re-run. The pipeline figures this out automatically.
Restartability requires two things: a checkpoint that records progress accurately, and idempotent writes that make re-processing safe. Without idempotency, restartability cannot be achieved — if re-running already-written data creates duplicates, you cannot restart from a checkpoint that includes any already-written data.
Granularity of checkpointing — coarse vs fine-grained
CHECKPOINT GRANULARITY: how much work is lost on failure and restarted?
COARSE-GRAINED (one checkpoint per full run):
Save checkpoint at the END of the entire run.
If run processes 10,000 rows and fails on row 9,847:
→ Checkpoint still shows the watermark from before this run started
→ Next run re-processes all 10,000 rows from scratch
Cost: O(run_size) work lost on failure
Complexity: low — one checkpoint write per run
Use when: runs are fast (< 5 minutes), reprocessing is cheap
from checkpoint import save_watermark
# Inside run():
extract_rows()
transform_rows()
load_rows()
save_watermark(until) # save only at end — all or nothing
MEDIUM-GRAINED (one checkpoint per batch):
Save checkpoint after each batch of BATCH_SIZE rows.
If run processes 10,000 rows in 10 batches and fails on batch 8:
→ Checkpoint shows end of batch 7 (7,000 rows processed)
→ Next run re-processes only batches 8-10 (3,000 rows)
Cost: O(BATCH_SIZE) work lost on failure
Complexity: medium — N checkpoint writes per run
Use when: runs are long (> 10 minutes), batches are large
batch_watermark = since # start of this run
for batch in extract_batches(since, until):
transform_and_load(batch)
batch_watermark = batch[-1]['updated_at'] # max updated_at in this batch
save_watermark(batch_watermark) # checkpoint after each batch
# On failure during batch 8: checkpoint shows end of batch 7 watermark
FINE-GRAINED (one checkpoint per row group or file):
Save checkpoint after writing each Parquet file or row group.
If run fails mid-file:
→ Checkpoint shows last successfully written file
→ Next run continues from that file
Cost: O(file_size) work lost on failure
Complexity: high — many checkpoint writes, must track file inventory
Use when: files are large (500 MB+), each file takes minutes to write
# Used by Spark Structured Streaming automatically
# checkpointLocation stores last committed offset after each trigger
SPARK CHECKPOINT SEMANTICS:
Spark Structured Streaming saves checkpoint state after every trigger.
If Spark crashes mid-trigger: the trigger is re-executed from its start.
The trigger is the atomic unit — writes within one trigger are either
all committed (if writeStream uses Delta with MERGE) or all rolled back.
checkpointLocation stores:
- Last committed Kafka offsets (where to resume reading from)
- Aggregation state (for stateful streaming operations)
- Metadata about the last committed batchDesigning for restartability — the checklist
Non-Idempotent Patterns — Recognising and Fixing Them
Non-idempotent pipeline patterns are often not obvious — they look reasonable on first read. The test is always: what happens if this pipeline runs twice for the same input? If the answer is "different from running it once," the pattern is non-idempotent.
| Anti-pattern | What goes wrong on rerun | The fix |
|---|---|---|
| Plain INSERT without ON CONFLICT | Duplicate rows in destination. COUNT(*) doubles on every rerun. All downstream aggregations are wrong. | Add ON CONFLICT (pk) DO UPDATE. Add UNIQUE constraint on business key. Every rerun produces the same row count. |
| TRUNCATE then INSERT in separate transactions | A failure after TRUNCATE but before INSERT leaves the table empty. Queries see zero rows. Next run starts from an empty table — correct, but downstream was served empty data. | Use staging table swap: load new data into a staging table, then atomically rename staging to production in one transaction. Readers always see either old or new, never empty. |
| Relative time windows (NOW() - INTERVAL '15 min') | A rerun at a different time of day extracts a different window. Rows between the original run's window and the rerun's window are either missed or double-processed. | Store the extraction window's upper bound at run start. On retry, reuse the stored upper bound. Run parameters are immutable once set. |
| Append mode file writes | Each rerun appends new files to the partition. After N reruns, the partition has N copies of the same data. Queries return N× too many rows. | Use overwrite mode per partition. Rerun overwrites the partition entirely. The output is always exactly one copy of the data regardless of rerun count. |
| Saving checkpoint before write | If the write fails after the checkpoint advances, the next run starts from after the failed data. The unwritten rows are permanently skipped. Silent data loss. | Write to destination first, save checkpoint second. If write fails, checkpoint does not advance. Next run re-processes same data. Upsert semantics handle duplicates. |
| Side effects in transformation (email, payment, webhook) | Transformation sends an email notification per row. On rerun, every row triggers a duplicate email. Customers receive duplicate notifications. | Separate side effects from transformation. Record the intent to send (write to an outbox table) rather than sending directly. A separate idempotent consumer processes the outbox with deduplication. |
| Auto-increment sequence used as business key | On rerun, new rows get new auto-increment IDs even though they represent the same business event. Downstream joins by ID fail to match. Aggregations count same events twice. | Use the source system's business key (order_id from the source) as the UNIQUE constraint for conflict detection, not the destination's auto-increment surrogate key. |
Idempotency Across System Boundaries — The Hardest Case
Idempotency within a single database is straightforward — ON CONFLICT handles it. Idempotency across multiple systems is fundamentally harder because there is no single transaction coordinator. A pipeline step that writes to a database AND sends a Kafka message AND calls an API cannot use a single transaction — each system has its own commit protocol.
# SCENARIO: Order completion pipeline
# Must: 1) Update silver.orders (Snowflake)
# 2) Publish event to Kafka
# 3) Call delivery service API
# If ANY step fails: must be safe to retry entire sequence
# ── THE PROBLEM ───────────────────────────────────────────────────────────────
def complete_order_UNSAFE(order_id: int, conn, kafka_producer, api_client):
# Step 1: Update DB
conn.execute("UPDATE silver.orders SET status='completed' WHERE order_id=%s",
(order_id,))
conn.commit() # committed
# Step 2: Publish event (network error here?)
kafka_producer.produce('orders.completed', key=str(order_id), value={...})
kafka_producer.flush() # if this fails: DB is committed, Kafka not
# Step 3: Call API (timeout here?)
api_client.notify_delivery_service(order_id) # if this fails: both above done
# ANY STEP FAILING AND RETRYING = inconsistent state
# Step 1 retry: duplicate DB update (idempotent if using upsert — OK)
# Step 2 retry: duplicate Kafka message
# Step 3 retry: duplicate API call — may charge the merchant twice!
# ── THE FIX: idempotency at every external call ────────────────────────────────
def complete_order_SAFE(order_id: int, run_id: str, conn, kafka_producer, api_client):
# Step 1: Upsert (idempotent DB write)
conn.execute("""
INSERT INTO silver.orders (order_id, status, completed_at)
VALUES (%s, 'completed', NOW())
ON CONFLICT (order_id) DO UPDATE SET
status = 'completed',
completed_at = EXCLUDED.completed_at
WHERE silver.orders.status != 'completed'
""", (order_id,))
conn.commit()
# Step 2: Kafka publish with idempotent producer config
# enable.idempotence=True: Kafka guarantees exactly-once delivery
# within a single producer session (retries do not produce duplicates)
kafka_producer.produce(
'orders.completed',
key=str(order_id),
value={'order_id': order_id, 'idempotency_key': f'${run_id}:${order_id}'},
# Consumer-side: check idempotency_key before processing
)
# Step 3: API call with idempotency key
idempotency_key = f'order-complete-${order_id}-${run_id[:8]}'
api_client.notify_delivery_service(
order_id=order_id,
headers={'X-Idempotency-Key': idempotency_key},
# If API supports idempotency keys: second call with same key is a no-op
)
# ── SAGA PATTERN: track state across multi-step operations ────────────────────
# For long multi-step pipelines where each step calls an external system:
# Record the completion of each step, and skip already-completed steps on retry.
CREATE TABLE pipeline.order_completion_sagas (
order_id BIGINT PRIMARY KEY,
run_id VARCHAR(36) NOT NULL,
db_updated BOOLEAN NOT NULL DEFAULT FALSE,
kafka_published BOOLEAN NOT NULL DEFAULT FALSE,
api_notified BOOLEAN NOT NULL DEFAULT FALSE,
completed_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
def complete_order_with_saga(order_id: int, run_id: str, ...):
saga = load_or_create_saga(order_id, run_id)
if not saga.db_updated:
update_db(order_id)
mark_saga_step(order_id, 'db_updated')
if not saga.kafka_published:
publish_kafka(order_id)
mark_saga_step(order_id, 'kafka_published')
if not saga.api_notified:
notify_api(order_id)
mark_saga_step(order_id, 'api_notified')
mark_saga_complete(order_id)
# On retry: already-completed steps are skipped entirelyHow to Test That Your Pipeline Is Actually Idempotent
Claiming a pipeline is idempotent is easy. Verifying it is idempotent requires specific tests. These tests should be part of every pipeline's integration test suite — run them before production deployment, after any significant change, and as part of the CI pipeline.
"""
tests/test_idempotency.py
Tests that the pipeline produces correct results when run multiple times.
Requires: test PostgreSQL + test Snowflake (or SQLite equivalent)
"""
import pytest
from datetime import datetime, timezone
from pipeline.main import run_pipeline
class TestIdempotency:
"""
Tests that verify the pipeline is idempotent:
running it N times produces the same result as running it once.
"""
def test_double_run_produces_same_row_count(self, test_db, test_dest):
"""Running the pipeline twice must not duplicate rows."""
run_date = '2026-03-17'
# Run 1
result1 = run_pipeline(run_date, source_conn=test_db, dest_conn=test_dest)
count_after_run1 = test_dest.execute(
"SELECT COUNT(*) FROM silver.orders"
).fetchone()[0]
# Run 2 (same date, same data)
result2 = run_pipeline(run_date, source_conn=test_db, dest_conn=test_dest)
count_after_run2 = test_dest.execute(
"SELECT COUNT(*) FROM silver.orders"
).fetchone()[0]
assert count_after_run1 == count_after_run2, (
f'Row count changed on second run: '
f'${count_after_run1} → ${count_after_run2} (duplicates created?)'
)
def test_rerun_after_source_update_uses_latest_values(self, test_db, test_dest):
"""If source data changes between runs, destination reflects latest."""
run_date = '2026-03-17'
# Run 1: order 9284751 has status='placed'
run_pipeline(run_date, source_conn=test_db, dest_conn=test_dest)
status_after_run1 = test_dest.execute(
"SELECT status FROM silver.orders WHERE order_id = 9284751"
).fetchone()[0]
assert status_after_run1 == 'placed'
# Source updates order status
test_db.execute(
"UPDATE orders SET status='delivered', updated_at=NOW() "
"WHERE order_id = 9284751"
)
# Reset checkpoint to before run1's window
reset_checkpoint_to_before_run1()
# Run 2: should pick up the update
run_pipeline(run_date, source_conn=test_db, dest_conn=test_dest)
status_after_run2 = test_dest.execute(
"SELECT status FROM silver.orders WHERE order_id = 9284751"
).fetchone()[0]
assert status_after_run2 == 'delivered'
def test_pipeline_recovers_correctly_after_mid_run_failure(self, test_db, test_dest):
"""
Simulates a failure after writing half the batches.
The next run should complete correctly without duplicates.
"""
# Insert 10,000 test orders
insert_test_orders(test_db, count=10_000)
# Patch the load function to fail after batch 3
call_count = 0
original_upsert = pipeline.load.upsert_batch
def upsert_that_fails_on_batch_4(rows, conn):
nonlocal call_count
call_count += 1
if call_count == 4:
raise RuntimeError('Simulated failure on batch 4')
return original_upsert(rows, conn)
with pytest.raises(RuntimeError, match='Simulated failure'):
with patch('pipeline.load.upsert_batch', side_effect=upsert_that_fails_on_batch_4):
run_pipeline('2026-03-17', source_conn=test_db, dest_conn=test_dest)
count_after_failure = test_dest.execute(
"SELECT COUNT(*) FROM silver.orders"
).fetchone()[0]
# Some batches were written before the failure
assert 0 < count_after_failure < 10_000
# Recovery run: complete the pipeline
run_pipeline('2026-03-17', source_conn=test_db, dest_conn=test_dest)
count_after_recovery = test_dest.execute(
"SELECT COUNT(*) FROM silver.orders"
).fetchone()[0]
# Should have exactly 10,000 rows — no duplicates, no gaps
assert count_after_recovery == 10_000
def test_ten_runs_same_result(self, test_db, test_dest):
"""The most direct idempotency test: run 10 times, same result."""
run_date = '2026-03-17'
results = []
for i in range(10):
reset_checkpoint_for_run(run_date)
run_pipeline(run_date, source_conn=test_db, dest_conn=test_dest)
count = test_dest.execute(
"SELECT COUNT(*) FROM silver.orders"
).fetchone()[0]
checksum = test_dest.execute(
"SELECT SUM(order_amount) FROM silver.orders"
).fetchone()[0]
results.append((count, checksum))
# All runs should produce identical results
assert len(set(results)) == 1, (
f'Pipeline is NOT idempotent — 10 runs produced ${len(set(results))} '
f'different results: ${results}'
)A Non-Idempotent Pipeline, a 3 AM Incident, and the Fix
At 07:15 AM, the finance team reports that yesterday's revenue figure in the dashboard shows ₹84,23,000 — exactly double the ₹42,11,500 expected from manual bank reconciliation. The data engineering team begins investigating.
# Step 1: Check when the doubling occurred
SELECT DATE(ingested_at), COUNT(*) AS row_count, SUM(order_amount) AS revenue
FROM silver.orders
WHERE order_date = '2026-03-17'
GROUP BY 1
ORDER BY 1;
# Output:
# 2026-03-17 → 48,234 rows → ₹42,11,500 (morning load — correct)
# 2026-03-17 → 96,468 rows → ₹84,23,000 (evening — doubled!)
# Step 2: Check for duplicate order IDs
SELECT order_id, COUNT(*) AS copies
FROM silver.orders
WHERE order_date = '2026-03-17'
GROUP BY order_id
HAVING COUNT(*) > 1
LIMIT 10;
# Returns 48,234 rows — every single order_id has exactly 2 copies
# Step 3: Check Airflow run history
SELECT dag_run_id, start_date, end_date, state
FROM airflow.dag_run
WHERE dag_id = 'orders_pipeline_incremental'
AND start_date::DATE = '2026-03-17'
ORDER BY start_date;
# Output shows two FULL LOAD runs at 18:00 and 18:15
# (someone had triggered a "backfill" from the Airflow UI that ran full load mode)
# Step 4: Check the Silver table's INSERT statement
SELECT query_text FROM snowflake.account_usage.query_history
WHERE query_text ILIKE '%INSERT INTO silver.orders%'
AND start_time::DATE = '2026-03-17'
LIMIT 5;
# Query: "INSERT INTO silver.orders SELECT * FROM orders_staging"
# → Plain INSERT, NO ON CONFLICT — not idempotent!
# Root cause:
# 1. The pipeline used plain INSERT without ON CONFLICT
# 2. A manual backfill ran the pipeline twice for the same date
# 3. Each run inserted all rows again → 2× duplicates
# IMMEDIATE FIX: deduplicate the table
CREATE TABLE silver.orders_deduped AS
SELECT DISTINCT ON (order_id) *
FROM silver.orders
ORDER BY order_id, ingested_at DESC;
ALTER TABLE silver.orders RENAME TO orders_duplicated_backup;
ALTER TABLE silver.orders_deduped RENAME TO orders;
# VERIFY:
SELECT COUNT(*), SUM(order_amount) FROM silver.orders
WHERE order_date = '2026-03-17';
# Returns: 48,234 rows, ₹42,11,500 ← correct
# PERMANENT FIX: make the pipeline idempotent
# 1. Change INSERT to INSERT ... ON CONFLICT DO UPDATE
# 2. Add UNIQUE constraint: ALTER TABLE silver.orders ADD CONSTRAINT uq_order_id UNIQUE (order_id);
# 3. Add idempotency test to CI that fails if running twice increases row count
# 4. Enable Airflow max_active_runs=1 and require code review for manual backfills
# TOTAL IMPACT:
# Duration: 07:15 AM alert → 07:52 AM fully resolved (37 minutes)
# Finance report delayed: 52 minutes past SLA
# Revenue reports for the day: correct in production by 08:00 AMThe incident happened because one failure mode — a manual trigger of the pipeline for an already-processed date — was never considered. The plain INSERT that worked fine for the first run created duplicates on the second. Adding ON CONFLICT DO UPDATE and a UNIQUE constraint took 15 minutes. The idempotency test would have caught this before the first production deployment.
5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓Idempotency means running a pipeline N times produces the same result as running it once. The three mechanisms: upserts (ON CONFLICT DO UPDATE) with UNIQUE constraints for database writes, fixed extraction windows (not relative NOW() windows) for extraction, and overwrite mode (not append) for file writes.
- ✓Atomicity means each unit of work either completes fully or leaves no trace. For databases: wrap each batch in a transaction. For table swaps: use ALTER TABLE RENAME in a single transaction (PostgreSQL DDL is transactional) or ALTER TABLE SWAP WITH (Snowflake). For files: write to temp then rename; use Delta Lake for multi-file atomicity.
- ✓Restartability requires both idempotency and correct checkpoint ordering. Save the checkpoint after the destination write succeeds, never before. A checkpoint that advances before the write succeeds causes permanent data loss on failure. A checkpoint that stays at the pre-write position allows safe restart.
- ✓The staging table swap pattern eliminates the empty-table window of truncate-and-reload. Load new data into a staging table completely, then atomically rename staging to production in one transaction. Readers see old data until the instant of swap, then new data — zero window of empty or partial data.
- ✓Idempotency keys solve the duplicate-call problem for external APIs and message queues. Generate a deterministic key from the operation's inputs (hash of order_id + action). Include it in the request header. APIs that support idempotency keys treat duplicate requests with the same key as no-ops.
- ✓The UNIQUE constraint is required for ON CONFLICT to work. Without it, INSERT ... ON CONFLICT (order_id) silently inserts a duplicate as if the clause were not present. Always verify the constraint exists: query information_schema.table_constraints before assuming ON CONFLICT will protect against duplicates.
- ✓Non-idempotent patterns to recognise: plain INSERT (duplicates on rerun), TRUNCATE in separate transaction from INSERT (empty-table window), relative time windows (different data on rerun), append mode file writes (duplicate files on rerun), checkpoint saved before write (data loss on failure), side effects in transformation (duplicate emails/charges on rerun).
- ✓Idempotency across system boundaries requires tracking each step's completion. The saga pattern records which steps have been executed, and skips already-completed steps on retry. Each external call uses an idempotency key derived from the operation's unique inputs.
- ✓Test idempotency explicitly: run the pipeline twice and assert row counts are identical, run after a simulated mid-batch failure and assert complete correct data, run ten times and assert results are unchanged. These tests belong in CI and should run before every production deployment.
- ✓The root cause of most data quality incidents is non-idempotent pipelines combined with a trigger that causes a rerun: manual backfill, Airflow bug, infrastructure restart, or test run in production. The defence is making every pipeline idempotent by default — not as an afterthought when the incident happens.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.