Error Handling, Retries, and Dead Letter Queues
Classifying errors, exponential backoff with jitter, circuit breakers, DLQ design, and building pipelines that recover automatically.
The Gap Between a Pipeline That Works and One That Survives Production
A pipeline that handles the happy path is not a production pipeline. Production has network timeouts at 3 AM, API rate limits during traffic spikes, one malformed row in a batch of 50,000, Snowflake warehouse auto-suspended when the pipeline starts, a source database that returns 503 for 4 minutes during a deploy, and a vendor CSV that arrives with an entirely wrong schema once a month.
The difference between a pipeline that handles these gracefully and one that pages you at 3 AM is a well-designed error handling strategy. This module covers every layer: how to classify errors, how to retry correctly, how to implement circuit breakers, how to design dead letter queues, and how to build alerting that surfaces real problems without crying wolf.
Transient vs Permanent Errors — The Classification That Determines Everything
The single most important decision in error handling is whether to retry. Retrying a transient error recovers the pipeline automatically. Retrying a permanent error wastes time, consumes resources, and delays the alert that would trigger human intervention. The classification of an error as transient or permanent determines the entire downstream response.
The error taxonomy for data pipelines
| Error type | Examples | Retry? | Action |
|---|---|---|---|
| Network timeout | requests.Timeout, psycopg2.OperationalError, ConnectionResetError | ✓ Yes — fixed interval or backoff | Retry up to N times. Alert if all retries exhausted. |
| Rate limit (429) | HTTP 429 Too Many Requests | ✓ Yes — after Retry-After delay | Read Retry-After header. Wait exact amount. Then retry. |
| Server error (5xx) | HTTP 500, 502, 503, 504 | ✓ Yes — with exponential backoff | Backoff: 2s, 4s, 8s, 16s, 32s. Alert if 3+ consecutive 5xx. |
| Database lock/deadlock | psycopg2.errors.DeadlockDetected | ✓ Yes — immediately or short delay | Retry the transaction immediately (deadlocks resolve on retry). |
| Transient DB error | could not connect, connection refused (temporary) | ✓ Yes — exponential backoff | Backoff starting at 5s. Alert if source unreachable > 15 min. |
| Auth failure (401) | HTTP 401 Unauthorized | ✗ No — credentials are wrong | Alert immediately. Do not retry — credentials will not fix themselves. |
| Forbidden (403) | HTTP 403 Forbidden | ✗ No — permissions issue | Alert immediately. Investigate permissions. |
| Not found (404) | HTTP 404 Not Found | ✗ No — resource does not exist | Log warning. Skip this record. The resource was deleted. |
| Schema mismatch | Column "order_amount" does not exist, unexpected type | ✗ No — structural issue | Alert immediately. Pipeline cannot proceed without schema fix. |
| Bad credentials | Authentication failed for user "pipeline" | ✗ No — credentials invalid | Alert immediately. Rotate credentials. |
| Data validation failure | NULL in required field, negative amount | ✗ No — data is genuinely invalid | Write row to DLQ. Continue with rest of batch. Alert if DLQ rate high. |
| Disk full | No space left on device | ✗ No — environment problem | Alert immediately. Clean up before retrying. |
| OOM / memory error | MemoryError, Container OOMKilled | ⚡ Maybe — with smaller batch size | Reduce batch size. If still OOM: alert — resource issue. |
import requests
import psycopg2
class ErrorClassification:
RETRY_IMMEDIATELY = 'retry_immediately' # retry at once (deadlock)
RETRY_BACKOFF = 'retry_backoff' # retry after exponential backoff
RETRY_AFTER_DELAY = 'retry_after_delay' # retry after specific delay (rate limit)
PERMANENT_FAILURE = 'permanent_failure' # do not retry, alert
ROW_LEVEL_FAILURE = 'row_level_failure' # reject row to DLQ, continue
def classify_error(exc: Exception, response=None) -> tuple[str, str]:
"""
Classify an exception into a handling category.
Returns (classification, human_readable_reason).
"""
# ── HTTP response errors ───────────────────────────────────────────────────
if response is not None:
status = response.status_code
if status == 429:
retry_after = response.headers.get('Retry-After', '60')
return ErrorClassification.RETRY_AFTER_DELAY, f'Rate limited — Retry-After: ${retry_after}s'
if status in (500, 502, 503, 504):
return ErrorClassification.RETRY_BACKOFF, f'Server error ${status} — transient'
if status == 401:
return ErrorClassification.PERMANENT_FAILURE, f'Authentication failed (401) — check credentials'
if status == 403:
return ErrorClassification.PERMANENT_FAILURE, f'Forbidden (403) — check permissions'
if status == 404:
return ErrorClassification.ROW_LEVEL_FAILURE, f'Resource not found (404) — skip this record'
if 400 <= status < 500:
return ErrorClassification.PERMANENT_FAILURE, f'Client error ${status} — fix request before retrying'
# ── Network / connection errors ────────────────────────────────────────────
if isinstance(exc, (requests.Timeout, requests.ConnectionError)):
return ErrorClassification.RETRY_BACKOFF, f'Network error: ${type(exc).__name__}'
# ── PostgreSQL errors ──────────────────────────────────────────────────────
if isinstance(exc, psycopg2.errors.DeadlockDetected):
return ErrorClassification.RETRY_IMMEDIATELY, 'Deadlock detected — retry transaction'
if isinstance(exc, psycopg2.OperationalError):
msg = str(exc).lower()
if 'connection' in msg or 'timeout' in msg:
return ErrorClassification.RETRY_BACKOFF, f'DB connection error: ${exc}'
return ErrorClassification.PERMANENT_FAILURE, f'DB operational error: ${exc}'
# ── Data / schema errors ───────────────────────────────────────────────────
if isinstance(exc, (ValueError, TypeError, KeyError)):
return ErrorClassification.ROW_LEVEL_FAILURE, f'Data error: ${type(exc).__name__}: ${exc}'
if isinstance(exc, (AttributeError, ImportError, SyntaxError)):
return ErrorClassification.PERMANENT_FAILURE, f'Code error (not data): ${type(exc).__name__}: ${exc}'
# ── Memory errors ──────────────────────────────────────────────────────────
if isinstance(exc, MemoryError):
return ErrorClassification.PERMANENT_FAILURE, 'Out of memory — reduce batch size'
# ── Unknown errors — treat as permanent (fail safe) ───────────────────────
return ErrorClassification.PERMANENT_FAILURE, f'Unknown error: ${type(exc).__name__}: ${exc}'Retry Strategies — From Fixed Interval to Exponential Backoff With Jitter
Not all retries are equal. The naive approach — retry immediately three times — makes things worse when the source system is under load. All retrying clients resume simultaneously, creating a thundering herd that overwhelms the already-struggling service. Exponential backoff spaces retries out so the service has time to recover. Jitter breaks synchronisation between multiple parallel clients so they do not all retry at the same moment.
The retry decorator — production-grade implementation
import functools
import logging
import random
import time
from typing import Callable, Type
log = logging.getLogger(__name__)
def retry_with_backoff(
max_attempts: int = 5,
base_delay_s: float = 1.0,
max_delay_s: float = 60.0,
jitter_factor: float = 0.25,
retryable_exceptions: tuple[Type[Exception], ...] = (Exception,),
non_retryable_exceptions: tuple[Type[Exception], ...] = (),
) -> Callable:
"""
Decorator that retries a function on transient failures.
Backoff formula:
delay = min(base_delay * 2^attempt, max_delay) * (1 ± jitter_factor)
Attempt 1: ~1s
Attempt 2: ~2s
Attempt 3: ~4s
Attempt 4: ~8s
Attempt 5: ~16s
All with ±25% jitter to prevent thundering herd.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except non_retryable_exceptions as exc:
# These exceptions must NOT be retried under any circumstances
log.error(
'Non-retryable error in ${s} (attempt ${d}/${d}): ${s}',
func.__name__, attempt, max_attempts, str(exc),
)
raise
except retryable_exceptions as exc:
last_exc = exc
if attempt == max_attempts:
log.error(
'All ${d} attempts exhausted for ${s}: ${s}',
max_attempts, func.__name__, str(exc),
)
raise
# Exponential backoff with full jitter
raw_delay = min(base_delay_s * (2 ** (attempt - 1)), max_delay_s)
jitter = raw_delay * jitter_factor * (2 * random.random() - 1)
delay = max(0, raw_delay + jitter)
log.warning(
'${s} failed (attempt ${d}/${d}): ${s}. Retrying in ${.2f}s',
func.__name__, attempt, max_attempts, str(exc), delay,
)
time.sleep(delay)
raise last_exc # should not reach here
return wrapper
return decorator
# ── USAGE ─────────────────────────────────────────────────────────────────────
# For API calls:
@retry_with_backoff(
max_attempts=5,
base_delay_s=2.0,
max_delay_s=60.0,
retryable_exceptions=(requests.Timeout, requests.ConnectionError),
non_retryable_exceptions=(AuthenticationError, SchemaError),
)
def fetch_payments(from_ts: int, to_ts: int) -> dict:
response = requests.get(
'https://api.razorpay.com/v1/payments',
params={'from': from_ts, 'to': to_ts},
auth=HTTPBasicAuth(KEY_ID, KEY_SECRET),
timeout=30,
)
if response.status_code == 429:
wait = float(response.headers.get('Retry-After', 60))
raise RateLimitError(f'Rate limited — wait ${wait}s')
response.raise_for_status()
return response.json()
# For database operations:
@retry_with_backoff(
max_attempts=3,
base_delay_s=0.5,
retryable_exceptions=(psycopg2.errors.DeadlockDetected,
psycopg2.OperationalError),
)
def write_batch_to_db(rows: list[dict], conn) -> int:
with conn:
psycopg2.extras.execute_values(cur, UPSERT_SQL, rows)
return len(rows)Rate limit handling — the Retry-After pattern
import time
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
def handle_rate_limit_response(response) -> float:
"""
Extract the correct wait time from a 429 response.
Returns seconds to wait before the next retry.
Retry-After header can be:
- An integer number of seconds: "Retry-After: 60"
- An HTTP date string: "Retry-After: Wed, 18 Mar 2026 02:00:00 GMT"
"""
retry_after = response.headers.get('Retry-After')
if not retry_after:
# No header — use conservative default
return 30.0
try:
# Integer seconds
return float(retry_after)
except ValueError:
pass
try:
# HTTP date string
retry_dt = parsedate_to_datetime(retry_after)
now_utc = datetime.now(timezone.utc)
wait = (retry_dt - now_utc).total_seconds()
return max(0.0, wait)
except Exception:
return 30.0 # parse failed — default
def api_call_with_rate_limit_handling(
url: str,
params: dict,
auth,
max_retries: int = 5,
) -> dict:
"""
Make an API call, handling 429 rate limits with correct backoff.
Distinguishes 429 (rate limit — controllable) from 5xx (server error).
"""
for attempt in range(1, max_retries + 1):
response = requests.get(url, params=params, auth=auth, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
wait = handle_rate_limit_response(response)
# Add 10% jitter to avoid thundering herd across multiple pipeline instances
wait *= (1.0 + random.uniform(0, 0.1))
log.warning(
'Rate limited (attempt ${d}/${d}) — waiting ${.1f}s',
attempt, max_retries, wait,
)
if attempt < max_retries:
time.sleep(wait)
else:
response.raise_for_status()
elif response.status_code in (500, 502, 503, 504):
# Server error — exponential backoff
wait = min(2 ** attempt, 60) * (1 + random.uniform(-0.2, 0.2))
log.warning(
'Server error ${d} (attempt ${d}/${d}) — waiting ${.1f}s',
response.status_code, attempt, max_retries, wait,
)
if attempt < max_retries:
time.sleep(wait)
else:
response.raise_for_status()
else:
# 4xx client errors (except 429) — do not retry
response.raise_for_status()Jitter patterns — why randomisation matters
# THE THUNDERING HERD PROBLEM:
# 100 pipeline instances all fail at T=0
# All retry with identical exponential backoff (no jitter):
# Attempt 2: all 100 clients retry at T=2s → server hit with 100 requests
# Attempt 3: all 100 clients retry at T=4s → server hit with 100 requests
# The synchronised retries make the server's recovery impossible.
# Jitter desynchronises the retries.
import random
def compute_backoff_delay(
attempt: int,
base_s: float = 1.0,
max_s: float = 60.0,
strategy: str = 'full_jitter',
) -> float:
"""
Compute the backoff delay for a given attempt number.
Strategies:
fixed: base * 2^attempt (no randomisation — thundering herd risk)
equal_jitter: half fixed + half random (moderate desynchronisation)
full_jitter: random between 0 and cap (maximum desynchronisation)
decorrelated: random between base and last*3 (AWS-recommended)
"""
cap = min(base_s * (2 ** attempt), max_s)
if strategy == 'fixed':
return cap
elif strategy == 'equal_jitter':
return cap / 2 + random.uniform(0, cap / 2)
elif strategy == 'full_jitter':
# AWS recommended for high-concurrency scenarios
# Each client independently chooses a random delay between 0 and cap
# Result: retries spread evenly over the [0, cap] interval
return random.uniform(0, cap)
elif strategy == 'decorrelated':
# Good for sequential retries from a single client
# Each delay is random between base and 3× the previous delay
# Prevents very fast retries while avoiding excessively long waits
last_delay = getattr(compute_backoff_delay, '_last', base_s)
delay = random.uniform(base_s, last_delay * 3)
compute_backoff_delay._last = min(delay, max_s)
return min(delay, max_s)
return cap
# RECOMMENDATION: use full_jitter for multiple parallel pipeline instances
# Use decorrelated for a single client retrying a sequential operation
# EXAMPLE DELAYS (full_jitter, base=1s, max=60s):
# Attempt 1: random in [0, 2]s → avg ~1s
# Attempt 2: random in [0, 4]s → avg ~2s
# Attempt 3: random in [0, 8]s → avg ~4s
# Attempt 4: random in [0, 16]s → avg ~8s
# Attempt 5: random in [0, 32]s → avg ~16s
# 100 clients: evenly distributed over the window — no thundering herdCircuit Breaker — Stop Hammering a Failing System
Exponential backoff slows retries. A circuit breaker stops them entirely when a downstream system is clearly unavailable. The pattern comes from electrical engineering: when a circuit overloads, the breaker trips and cuts power to prevent damage. When a downstream API or database is failing consistently, the circuit breaker trips and rejects new requests immediately rather than queuing them up to fail.
Without a circuit breaker, a pipeline calling a failing API keeps trying, blocking threads, consuming connection pool slots, and adding load to an already-struggling service. The circuit breaker lets the service recover by taking the pressure off, and automatically retests recovery with a probe request after a cooldown period.
import threading
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = 'closed' # normal operation — requests flow through
OPEN = 'open' # tripped — requests fail immediately (no call made)
HALF_OPEN = 'half_open' # testing recovery — one probe request allowed
class CircuitBreaker:
"""
Circuit breaker for protecting downstream services.
STATE MACHINE:
CLOSED → (failure_threshold failures in window) → OPEN
OPEN → (cooldown_s elapsed) → HALF_OPEN
HALF_OPEN → (probe succeeds) → CLOSED
HALF_OPEN → (probe fails) → OPEN
CLOSED: all requests pass through; failures counted
OPEN: all requests fail immediately; service gets breathing room
HALF_OPEN: one probe request allowed; if it succeeds → CLOSED
"""
def __init__(
self,
name: str,
failure_threshold: int = 5, # failures in window before tripping
success_threshold: int = 2, # successes in half-open before closing
window_s: float = 60.0, # rolling window for failure counting
cooldown_s: float = 30.0, # time to wait in OPEN before half-open
):
self.name = name
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.window_s = window_s
self.cooldown_s = cooldown_s
self._state = CircuitState.CLOSED
self._failure_times: list[float] = []
self._half_open_success = 0
self._opened_at: float | None = None
self._lock = threading.Lock()
@property
def state(self) -> CircuitState:
with self._lock:
if self._state == CircuitState.OPEN:
# Check if cooldown has elapsed → transition to HALF_OPEN
if self._opened_at and time.monotonic() - self._opened_at >= self.cooldown_s:
self._state = CircuitState.HALF_OPEN
self._half_open_success = 0
log.info('Circuit ${s}: OPEN → HALF_OPEN (cooldown elapsed)', self.name)
return self._state
def call(self, func, *args, **kwargs):
"""
Execute func through the circuit breaker.
Raises CircuitOpenError if the circuit is OPEN.
"""
state = self.state
if state == CircuitState.OPEN:
raise CircuitOpenError(
f'Circuit breaker ${self.name} is OPEN — '
f'service unavailable, retry after cooldown'
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as exc:
self._on_failure()
raise
def _on_success(self) -> None:
with self._lock:
now = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
self._half_open_success += 1
if self._half_open_success >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_times = []
log.info('Circuit ${s}: HALF_OPEN → CLOSED (service recovered)', self.name)
elif self._state == CircuitState.CLOSED:
# Remove old failures outside the window
self._failure_times = [t for t in self._failure_times
if now - t < self.window_s]
def _on_failure(self) -> None:
with self._lock:
now = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
# Probe failed — back to OPEN
self._state = CircuitState.OPEN
self._opened_at = now
log.warning('Circuit ${s}: HALF_OPEN → OPEN (probe failed)', self.name)
return
# Record failure time
self._failure_times.append(now)
# Remove failures outside window
self._failure_times = [t for t in self._failure_times
if now - t < self.window_s]
if len(self._failure_times) >= self.failure_threshold:
self._state = CircuitState.OPEN
self._opened_at = now
log.error(
'Circuit ${s}: CLOSED → OPEN (${d} failures in ${.0f}s window)',
self.name, len(self._failure_times), self.window_s,
)
class CircuitOpenError(Exception):
pass
# ── USAGE IN A DATA PIPELINE ──────────────────────────────────────────────────
# Create one circuit breaker per downstream service:
razorpay_circuit = CircuitBreaker(
name = 'razorpay_api',
failure_threshold = 5, # trip after 5 failures in 60 seconds
cooldown_s = 30.0, # test recovery after 30 seconds
)
def fetch_payments_safe(params: dict) -> dict:
"""Fetch payments, respecting the circuit breaker."""
try:
return razorpay_circuit.call(
requests.get,
'https://api.razorpay.com/v1/payments',
params=params,
auth=HTTPBasicAuth(KEY_ID, KEY_SECRET),
timeout=30,
)
except CircuitOpenError:
log.warning('Razorpay API circuit is OPEN — skipping payment fetch')
# Return empty result or raise depending on pipeline logic
return {'items': [], 'cursor': None}
except Exception as exc:
classification, reason = classify_error(exc)
if classification == ErrorClassification.PERMANENT_FAILURE:
raise # let permanent failures propagate
raise # transient errors also propagate (circuit tracks them)Dead Letter Queue — Not a Trash Can, a Quarantine
A dead letter queue (DLQ) is where records go when they cannot be processed by the main pipeline. The word "queue" is intentional — the DLQ is not a trash can where records are discarded and forgotten. It is a quarantine: records are held with full context (the error, the original raw record, the timestamp) until a human can investigate and decide whether to fix and reprocess, discard, or escalate.
The design of the DLQ determines how useful it is in practice. A DLQ that stores records with no context is useless. A DLQ that is never monitored accumulates indefinitely. A DLQ that has no reprocessing mechanism means every DLQ record is permanent data loss.
DLQ design — what to store and how to structure it
# ── DATABASE-BACKED DLQ (preferred for analytical pipelines) ─────────────────
CREATE TABLE pipeline.dead_letter_queue (
id BIGSERIAL PRIMARY KEY,
pipeline_name VARCHAR(100) NOT NULL,
run_id UUID NOT NULL,
error_type VARCHAR(100) NOT NULL, -- 'validation', 'transform', 'schema'
error_message TEXT NOT NULL,
raw_record JSONB NOT NULL, -- the original record that failed
source_table VARCHAR(100),
source_key VARCHAR(200), -- primary key from source (for lookup)
rejected_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reprocess_count INTEGER NOT NULL DEFAULT 0,
last_reprocess TIMESTAMPTZ,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending: not yet investigated
-- reprocessed: successfully reprocessed
-- discarded: intentionally ignored
-- escalated: sent to data quality team
resolution_note TEXT, -- why it was discarded or escalated
CONSTRAINT chk_status CHECK (status IN ('pending','reprocessed','discarded','escalated'))
);
-- Index for monitoring queries:
CREATE INDEX idx_dlq_pipeline_status ON pipeline.dead_letter_queue
(pipeline_name, status, rejected_at);
-- Index for reprocessing lookups:
CREATE INDEX idx_dlq_source_key ON pipeline.dead_letter_queue
(source_key, status) WHERE status = 'pending';
# ── DLQ WRITER CLASS ──────────────────────────────────────────────────────────
import json
from datetime import datetime, timezone
class DLQWriter:
"""Writes rejected records to the dead letter queue with full context."""
def __init__(
self,
pipeline_name: str,
run_id: str,
dest_conn,
source_table: str | None = None,
):
self.pipeline_name = pipeline_name
self.run_id = run_id
self.dest_conn = dest_conn
self.source_table = source_table
self._count = 0
def write(
self,
raw_record: dict,
error_type: str,
error_message: str,
source_key: str | None = None,
) -> None:
"""Write one rejected record to the DLQ."""
# Serialise record safely — convert non-JSON types to strings
safe_record = {}
for k, v in raw_record.items():
try:
json.dumps(v)
safe_record[k] = v
except (TypeError, ValueError):
safe_record[k] = str(v)
with self.dest_conn.cursor() as cur:
cur.execute("""
INSERT INTO pipeline.dead_letter_queue
(pipeline_name, run_id, error_type, error_message,
raw_record, source_table, source_key)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
self.pipeline_name,
self.run_id,
error_type,
error_message,
json.dumps(safe_record),
self.source_table,
source_key or str(raw_record.get('order_id') or raw_record.get('id', '')),
))
self.dest_conn.commit()
self._count += 1
log.warning(
'DLQ: ${s} — ${s} (total DLQ count: ${d})',
error_type, error_message[:100], self._count,
)
@property
def count(self) -> int:
return self._count
def rejection_rate(self, total_processed: int) -> float:
if total_processed == 0:
return 0.0
return self._count / total_processed
# ── DLQ MONITORING QUERIES ─────────────────────────────────────────────────────
# Daily DLQ summary — how many records rejected per pipeline:
SELECT
pipeline_name,
DATE(rejected_at) AS date,
error_type,
COUNT(*) AS dlq_count,
COUNT(*) FILTER (WHERE status = 'pending') AS pending_count
FROM pipeline.dead_letter_queue
WHERE rejected_at > NOW() - INTERVAL '7 days'
GROUP BY 1, 2, 3
ORDER BY 2 DESC, 4 DESC;
-- ALERT: if pending_count > 100 for any pipeline on today's date
# Find the most common rejection reasons today:
SELECT error_type, error_message, COUNT(*) AS count
FROM pipeline.dead_letter_queue
WHERE rejected_at::DATE = CURRENT_DATE
AND status = 'pending'
GROUP BY 1, 2
ORDER BY 3 DESC
LIMIT 20;
# Records that need reprocessing — for the reprocessing job:
SELECT id, raw_record, source_key
FROM pipeline.dead_letter_queue
WHERE pipeline_name = 'orders_incremental'
AND status = 'pending'
AND error_type = 'validation'
ORDER BY rejected_at ASC;DLQ reprocessing — closing the loop
"""
dlq_reprocess.py — Reprocess records from the dead letter queue.
Run manually after fixing the root cause that caused rejections.
Example: vendor changed a status value — update VALID_STATUSES,
then reprocess all DLQ records with error_type='validation'.
"""
def reprocess_dlq_records(
pipeline_name: str,
error_type: str,
dest_conn,
dry_run: bool = True, # default dry_run=True — must opt in to real run
) -> dict:
"""
Fetch pending DLQ records, attempt to reprocess them,
update status to 'reprocessed' or 'escalated'.
"""
stats = {'attempted': 0, 'reprocessed': 0, 'failed_again': 0}
with dest_conn.cursor() as cur:
cur.execute("""
SELECT id, raw_record, source_key
FROM pipeline.dead_letter_queue
WHERE pipeline_name = %s
AND error_type = %s
AND status = 'pending'
ORDER BY rejected_at ASC
LIMIT 10000
""", (pipeline_name, error_type))
records = cur.fetchall()
log.info('Found ${d} DLQ records to reprocess (dry_run=${s})',
len(records), dry_run)
for dlq_id, raw_record_json, source_key in records:
stats['attempted'] += 1
raw_record = json.loads(raw_record_json)
try:
# Re-run validation with current (presumably fixed) rules
result = validate_row(raw_record)
if not result.is_valid:
# Still fails validation — escalate
if not dry_run:
dest_conn.execute("""
UPDATE pipeline.dead_letter_queue
SET status = 'escalated',
resolution_note = %s,
last_reprocess = NOW(),
reprocess_count = reprocess_count + 1
WHERE id = %s
""", (f'Still fails validation: ${result.error}', dlq_id))
dest_conn.commit()
stats['failed_again'] += 1
continue
# Validation passes — transform and load
enriched = enrich_order(result.row)
projected = project_to_dest_schema(enriched)
if not dry_run:
upsert_to_silver([projected], dest_conn)
dest_conn.execute("""
UPDATE pipeline.dead_letter_queue
SET status = 'reprocessed',
resolution_note = 'Successfully reprocessed after fix',
last_reprocess = NOW(),
reprocess_count = reprocess_count + 1
WHERE id = %s
""", (dlq_id,))
dest_conn.commit()
stats['reprocessed'] += 1
except Exception as exc:
log.error('Reprocessing failed for DLQ id ${d}: ${s}', dlq_id, str(exc))
stats['failed_again'] += 1
log.info(
'DLQ reprocessing complete: attempted=${d} reprocessed=${d} failed=${d}',
stats['attempted'], stats['reprocessed'], stats['failed_again'],
)
return statsAlerting — Signal, Not Noise
Alerting is where error handling meets operations. An alert that fires on every transient error creates alert fatigue — engineers start ignoring alerts because most of them resolve themselves. An alert that fires only on complete pipeline failure misses degraded states where the pipeline is technically running but producing wrong data. The art is choosing thresholds that surface real problems early while suppressing noise.
What to alert on — the four-tier alerting model
Alert message quality — what a good alert contains
# BAD ALERT (what not to do):
# Subject: Pipeline Error
# Body: An error occurred in the orders pipeline.
# → No context. What failed? What impact? Where to look? No idea.
# GOOD ALERT (actionable immediately):
# Subject: [P1] orders_incremental pipeline FAILED — data stale since 06:00 UTC
ALERT BODY:
Pipeline: orders_incremental (FreshMart Silver Layer)
Status: FAILED
Failed at: 2026-03-17 06:23:41 UTC
Error: psycopg2.OperationalError: could not connect to server
Impact: Silver orders table not updated since 2026-03-17 06:00:00 UTC
Analytics dashboard showing stale data — SLA BREACHED
Context:
Run ID: run-abc123
Rows processed: 47,000 / 48,234 (97% complete before failure)
Checkpoint: 2026-03-17 05:59:47 UTC (saved at row 47,000)
DLQ count: 12 rows (0.025% rejection rate — normal)
Duration: 14 min 32 sec (normal: 12-15 min)
Diagnostic links:
Airflow DAG run: https://airflow.internal/dags/orders_pipeline/.../runs/...
Snowflake history: https://app.snowflake.com/...
DLQ records: SELECT * FROM pipeline.dead_letter_queue WHERE run_id='abc123'
Source DB status: https://grafana.internal/d/postgres-health
Automated recovery:
Airflow will retry in 2 minutes (attempt 2 of 3)
If all retries fail: page on-call data engineer
Resolution steps:
1. Check source DB connectivity: psql ${SOURCE_DB_URL} -c "SELECT 1"
2. If DB down: check pg_stat_activity on primary, check replica lag
3. If network issue: check VPN/peering to replica subnet
4. Manual run: python -m pipeline.main --date 2026-03-17
# IMPLEMENT WITH:
def format_alert(run: PipelineRun, error: Exception) -> str:
return f"""
Pipeline: ${run.pipeline_name}
Status: FAILED
Error: ${type(error).__name__}: ${error}
Impact: Data stale since ${run.started_at.isoformat()} UTC
Run ID: ${run.run_id}
Rows: ${run.rows_written:,} written, ${run.rows_rejected:,} rejected
DLQ: ${run.dlq_count} records
Checkpoint: ${load_watermark().isoformat()}
See: https://airflow.internal/dags/${run.pipeline_name}/
"""Error Handling at the Orchestration Layer — Airflow
The pipeline code handles row-level and request-level errors internally. The orchestration layer (Airflow) handles task-level and DAG-level failures — deciding when to retry, when to alert, and how to propagate failures between dependent tasks.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.email import send_email
# ── TASK-LEVEL RETRY CONFIGURATION ───────────────────────────────────────────
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=2),
'retry_exponential_backoff': True, # delays: 2m, 4m, 8m
'max_retry_delay': timedelta(minutes=30),
'execution_timeout': timedelta(minutes=15), # kill if runs too long
'email_on_failure': True,
'email_on_retry': False, # don't spam on expected retries
'email': ['data-team@freshmart.com'],
}
# ── SLA MISS CALLBACK ─────────────────────────────────────────────────────────
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when a task misses its SLA — send a warning before it fully fails."""
missed_tasks = [sla.task_id for sla in slas]
send_slack_alert(
channel='#data-alerts',
message=f':warning: SLA MISS: tasks ${missed_tasks} in DAG ${dag.dag_id} '
f'exceeded their SLA. Data may be stale soon.',
urgency='warning',
)
# ── ON-FAILURE CALLBACK ────────────────────────────────────────────────────────
def task_failure_callback(context):
"""
Called when a task fails all retries.
Sends a rich alert with context, impact, and diagnostic links.
"""
dag_run = context['dag_run']
task = context['task']
ti = context['task_instance']
exc = context.get('exception')
# Pull metrics from XCom if the task published them before failing
rows_written = ti.xcom_pull(key='rows_written') or 0
rows_rejected = ti.xcom_pull(key='rows_rejected') or 0
run_id = ti.xcom_pull(key='pipeline_run_id') or 'unknown'
message = f"""
*[P1] Pipeline FAILED — Manual Intervention Required*
*DAG:* ${dag_run.dag_id}
*Task:* ${task.task_id}
*Run:* ${dag_run.run_id}
*Error:* ${type(exc).__name__}: ${exc}
*Impact:* Analytics data may be stale
*Progress before failure:*
Rows written: ${rows_written:,}
Rows rejected: ${rows_rejected:,}
Pipeline run: ${run_id}
*Actions:*
• Check Airflow: ${ti.log_url}
• Manual retry: python -m pipeline.main
• DLQ: SELECT * FROM pipeline.dead_letter_queue WHERE run_id='${run_id}'
"""
send_slack_alert(channel='#data-oncall', message=message, urgency='critical')
# Also create a PagerDuty incident if it's the primary SLA pipeline
# ── ON-SUCCESS CALLBACK (for SLA verification) ────────────────────────────────
def task_success_callback(context):
"""Verify data quality after successful task completion."""
ti = context['task_instance']
rows_written = ti.xcom_pull(key='rows_written') or 0
rows_rejected = ti.xcom_pull(key='rows_rejected') or 0
if rows_written + rows_rejected > 0:
rejection_rate = rows_rejected / (rows_written + rows_rejected)
if rejection_rate > 0.05:
send_slack_alert(
channel='#data-quality',
message=f':warning: High DLQ rate in ${ti.dag_id}: '
f'${rejection_rate:.1%} of rows rejected. '
f'Check: SELECT * FROM pipeline.dead_letter_queue',
urgency='warning',
)
# ── DAG DEFINITION WITH CALLBACKS ────────────────────────────────────────────
with DAG(
dag_id = 'orders_pipeline_incremental',
default_args = default_args,
sla_miss_callback = sla_miss_callback,
...
) as dag:
ingest = PythonOperator(
task_id = 'ingest_orders',
python_callable = run_pipeline,
on_failure_callback = task_failure_callback,
on_success_callback = task_success_callback,
sla = timedelta(minutes=10),
)A Vendor File With 3% Bad Rows — Handling It Without Stopping the Pipeline
Every Monday, a logistics partner sends a CSV file containing 180,000 delivery records for the previous week. This week's file has 5,400 rows where the delivery_fee column contains the string "N/A" instead of a decimal value — a data entry issue on the vendor's side.
Without proper error handling, the pipeline would crash on the first invalid row, process zero records, and page the on-call engineer at 06:15 AM. Here is how the error handling hierarchy handles it correctly.
PIPELINE EXECUTION LOG (abbreviated):
06:00:12 INFO Pipeline started: vendor_reconciliation run-def456
06:00:14 INFO Loaded 180,000 rows from ShipFast weekly report
06:00:14 INFO Beginning validation...
06:00:18 WARNING Row-level validation failed:
error=non_numeric_delivery_fee: 'N/A'
source_key=shipfast_delivery_id=SFD_001847
→ Written to DLQ (count: 1)
06:00:18 INFO [continues processing without stopping]
06:04:22 INFO Batch 1 complete: 5000 rows (47 rejected → DLQ)
06:04:22 INFO Batch 2 complete: 5000 rows (53 rejected → DLQ)
...
06:18:44 INFO Batch 36 complete: 5000 rows (150 rejected → DLQ)
06:18:47 WARNING DLQ count: 5,400 rows (3.0% rejection rate)
Threshold: 5.0% — within acceptable range
DLQ file: /data/dlq/vendor_recon_run-def456.ndjson
06:18:48 INFO Successfully loaded 174,600 of 180,000 rows
06:18:48 INFO Checkpoint saved: 2026-03-17 23:59:59 UTC
06:18:49 INFO Pipeline complete: duration=18m37s status=SUCCESS
# P3 alert sent (no P1 — below 5% threshold):
# 📋 [P3] vendor_reconciliation: 5,400 rows in DLQ (3.0%)
# Review: SELECT * FROM pipeline.dead_letter_queue
# WHERE run_id = 'def456'
# AND error_type = 'validation'
# Monday morning — data engineer reviews DLQ:
SELECT error_message, COUNT(*) AS count
FROM pipeline.dead_letter_queue
WHERE run_id = 'def456' AND status = 'pending'
GROUP BY 1;
# error_message count
# non_numeric_delivery_fee: 'N/A' 5,400
# Root cause: vendor sent "N/A" for NULL delivery fees (cash on delivery orders)
# Fix: treat "N/A" as 0 in the delivery_fee parser
# Update parser:
# def parse_delivery_fee(raw):
# if raw in ('N/A', 'NA', 'null', 'NULL', ''):
# return 0.0
# return float(raw)
# Reprocess DLQ:
python dlq_reprocess.py --pipeline vendor_reconciliation --error-type validation --dry-run false
# Output:
# DLQ reprocessing: attempted=5400 reprocessed=5400 failed=0
# All 5,400 rows successfully loaded to silver.vendor_deliveries
# Contact vendor to fix the source data for future files.
# Total impact: 174,600 rows loaded on time, 5,400 loaded Monday morning
# Analyst impact: delivery fee is NULL for COD orders until Monday reprocess
# Business impact: none (delivery fee not in Monday reporting)The error handling hierarchy worked exactly as designed. Row-level validation errors went to the DLQ without stopping the pipeline. The 97% of valid rows were loaded on time. The DLQ count was below the P1 threshold so no one was paged at 6 AM. The root cause was identified in 3 minutes on Monday morning and reprocessed in 5 minutes.
5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓Classify every error before deciding what to do: transient errors (network timeout, 5xx, 429, deadlock) should be retried with backoff. Permanent errors (401, 403, schema mismatch, disk full, bad credentials) should fail immediately and alert. Never retry a permanent error — it wastes time and delays the human intervention the error requires.
- ✓Exponential backoff formula: delay = min(base × 2^attempt, max_delay). Attempt 1: ~1s, attempt 2: ~2s, attempt 3: ~4s, attempt 4: ~8s. Always add jitter. Full jitter selects a random value between 0 and the computed cap, spreading retries from multiple parallel clients evenly across the window and preventing thundering herds.
- ✓Rate limit (429) responses require special handling: read the Retry-After header for the exact wait time instead of using exponential backoff. The API is telling you exactly how long to wait. Using a shorter generic backoff will result in another 429 immediately.
- ✓The circuit breaker has three states: closed (normal operation), open (all requests fail immediately — service gets time to recover), half-open (one probe request allowed to test recovery). Use circuit breakers for external third-party APIs where repeated timeouts would waste pipeline execution time and add load to a failing service.
- ✓A Dead Letter Queue is a quarantine, not a trash can. Store the complete raw record, the error type, the error message, the run ID, and the source key. Monitor pending DLQ counts. Alert at 5% rejection rate. Build a reprocessing job that can retry quarantined records after fixing the root cause.
- ✓The DLQ rejection rate threshold determines alert urgency. Below 1%: normal DLQ activity, log only. 1–5%: P3 warning, investigate next business day. Above 5%: P1 alert, investigate immediately. Above 20%: abort the pipeline — the batch has a systemic problem.
- ✓Handle errors at the right level. Row-level data errors (ValueError, invalid field) go to DLQ — catch them per row, continue processing. Infrastructure errors (connection timeout, 5xx) propagate up to the batch level for retry. High DLQ rate triggers pipeline abort rather than loading corrupted data.
- ✓Alert quality is as important as alert quantity. A good alert contains: pipeline name and run ID, error message, data impact (how stale is the data), rows processed before failure, DLQ count, checkpoint position, diagnostic links to Airflow logs and Snowflake query history, and automated recovery status.
- ✓Alert fatigue is a reliability risk. If engineers ignore alerts because 90% resolve automatically, real P1 incidents get missed. Only alert on conditions that require human action: all retries exhausted, SLA missed, permanent errors, high DLQ rate. Transient errors that resolve within the retry budget should be logged, not alerted.
- ✓The four-tier alerting model: P1 (page immediately) — SLA breach, authentication failure, schema mismatch, 5% DLQ rate. P2 (investigate within 1 hour) — all retries exhausted, DLQ growing across consecutive runs. P3 (investigate within 24 hours) — single run failed but recovered, DLQ has new records. No alert — log only — transient errors that resolved, successful runs.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.