What is a Data Pipeline? Anatomy and Design Principles
The anatomy of every pipeline, the design principles that make them reliable, and the patterns that separate good from fragile.
What a Data Pipeline Actually Is
The term "data pipeline" gets used loosely — sometimes to mean a single Python script, sometimes to mean an entire data platform, sometimes to mean a Kafka stream. Before building pipelines professionally, you need a precise mental model of what a pipeline is, what it consists of, and what distinguishes a well-designed pipeline from a fragile one.
A data pipeline is a system that moves data from one or more sources to one or more destinations, performing transformations along the way. That definition has three parts: sources (where data originates), transformations (operations applied to data in transit), and sinks (where data lands). Everything else — orchestration, monitoring, error handling, retries — exists to make this movement reliable, repeatable, and observable.
A pipeline is not defined by its technology. A 50-line Python script that reads from a PostgreSQL table and writes to S3 is a pipeline. A Spark job processing 10 TB of Kafka events is a pipeline. A dbt model that transforms Silver tables into a Gold aggregate is a pipeline. What makes all of them pipelines is the same structure: source → extract → transform → load → sink, with orchestration and monitoring around it.
Each Layer of the Pipeline — In Depth
Sources — where data comes from
Every pipeline starts with a source. The source determines what extraction approach is possible, what change detection mechanism is available, and what data quality guarantees you can rely on. Understanding the source deeply — its schema, its update frequency, its scale, its consistency model — is the first thing a data engineer does before designing any pipeline.
SOURCE TYPE EXAMPLES EXTRACTION APPROACH
─────────────────────────────────────────────────────────────────────────────
Relational DB PostgreSQL, MySQL, Oracle CDC (Debezium) or SQL incremental
Document DB MongoDB, Firestore Change Streams or scheduled export
Key-Value Redis, DynamoDB Snapshot (no built-in CDC)
Column-Family Cassandra, HBase Spark connector or CDC plugin
REST API Razorpay, Salesforce HTTP pagination with cursor
Event Stream Kafka, Kinesis, Pub/Sub Kafka Consumer Group (streaming)
File Drop SFTP, S3 landing zone File event trigger or scheduled scan
SaaS Platform Stripe, HubSpot, Shopify Official connectors or REST API
Message Queue RabbitMQ, SQS Consumer subscription
Webhook Payment events, IoT HTTP endpoint + Kafka/DB write
Batch Export Partner CSV files, reports Scheduled SFTP/S3 poll
WHAT TO UNDERSTAND ABOUT EACH SOURCE:
Schema: What are the fields, types, and constraints?
Cardinality: How many rows? How fast does it grow?
Change rate: How many inserts/updates/deletes per hour?
Latency need: Does the business need real-time or batch is fine?
Quality: Is source data clean? Are there known gaps or inconsistencies?
Access: Read replica? Production only? Rate limited?
History: How far back can we pull? Is there a retention policy?Extraction — getting data out of the source
Extraction is the mechanism by which data moves from the source into the pipeline. The two fundamental extraction patterns are full extraction(read everything every time) and incremental extraction(read only what changed since the last run). The choice between them has enormous consequences for pipeline performance and source system load.
# ── FULL EXTRACTION ──────────────────────────────────────────────────────────
# Read every row in the source table on every run.
# Simple. Correct. Expensive at scale.
SELECT * FROM orders; -- every row, every time
# When to use full extraction:
# Small tables (< 1M rows, < 100 MB)
# Tables that have no reliable "changed at" timestamp
# Reference/dimension tables (product categories, store master)
# When the source cannot be queried incrementally safely
# When NOT to use:
# Large transaction tables (billions of rows)
# High-velocity sources (thousands of inserts/minute)
# Sources with rate limits or shared connection pools
# ── INCREMENTAL EXTRACTION (high-watermark) ───────────────────────────────────
# Read only rows created or modified since the last run.
# Requires a monotonically increasing column (timestamp or auto-increment ID).
-- Get all orders modified since the last checkpoint:
SELECT *
FROM orders
WHERE updated_at > '2026-03-16 06:00:00' -- last successful run timestamp
AND updated_at <= '2026-03-17 06:00:00'; -- current run timestamp
# Checkpoint management in Python:
import json
from pathlib import Path
from datetime import datetime, timezone
CHECKPOINT = Path('/data/checkpoints/orders.json')
def load_checkpoint() -> datetime:
if CHECKPOINT.exists():
data = json.loads(CHECKPOINT.read_text())
return datetime.fromisoformat(data['last_updated_at'])
return datetime(2020, 1, 1, tzinfo=timezone.utc) # beginning of time
def save_checkpoint(ts: datetime) -> None:
CHECKPOINT.write_text(json.dumps({'last_updated_at': ts.isoformat()}))
last_run = load_checkpoint()
current_run = datetime.now(timezone.utc)
# Extract rows modified between last_run and current_run:
rows = db.query(
"SELECT * FROM orders WHERE updated_at > %s AND updated_at <= %s",
(last_run, current_run),
)
# Only save checkpoint after successful write to destination:
write_to_destination(rows)
save_checkpoint(current_run) # advance checkpoint only on success
# ── INCREMENTAL PITFALLS ──────────────────────────────────────────────────────
# 1. Late-arriving data: rows written with a past timestamp after the window closed
# Solution: overlap windows by 30 minutes and use upsert at destination
# 2. Deletes: incremental queries only see modified rows, not deleted ones
# Solution: CDC (Change Data Capture) — see Module 24
# 3. Clock skew: source DB clock differs from pipeline clock
# Solution: always use the source DB's NOW() as the upper bound
# 4. No updated_at column: some tables have only created_at
# Solution: use max(id) as watermark if auto-increment; otherwise full extractTransformation — the heart of the pipeline
Transformation is where raw source data becomes clean, typed, validated, business-ready data. Transformations range from trivial (renaming a column) to complex (computing 90-day cohort retention across billions of events). Every transformation in a pipeline is a business decision encoded in code — and every transformation is a potential source of bugs.
TRANSFORMATION TYPE WHAT IT DOES EXAMPLE
─────────────────────────────────────────────────────────────────────────────
Type casting Convert string to correct type "380.00" → DECIMAL
Null handling Replace, filter, or flag nulls COALESCE(amount, 0)
Deduplication Remove duplicate rows ROW_NUMBER() OVER (PARTITION BY id)
Filtering Remove invalid/out-of-scope rows WHERE status != 'test'
Normalisation Standardise values LOWER(status), TRIM(name)
Enrichment Add data from another source JOIN to customers table
Aggregation Compute metrics SUM, COUNT, AVG, PERCENTILE
Flattening Expand nested structures UNNEST(items), JSON extraction
Pivoting Reshape wide-to-long or long-to-wide PIVOT(status values)
Business rules Apply domain logic IF amount > threshold THEN tier = 'high'
Anonymisation Mask or hash PII for compliance SHA256(email)
Window calculations Running totals, moving averages SUM OVER (PARTITION BY ... ORDER BY ...)
WHERE TRANSFORMATIONS HAPPEN:
Python (Pandas/PySpark): general-purpose, imperative, easy to test
SQL/dbt: set-based, declarative, best for tabular data
Spark: large-scale distributed, complex transformations
Stream processors: Flink, Spark Streaming — real-time transformationsLoading — writing to the destination
# The loading pattern determines how new data interacts with existing data
# ── FULL REPLACE (TRUNCATE AND RELOAD) ───────────────────────────────────────
# Delete everything in the destination, reload from source.
# Simple. Safe. Only works for full extraction.
# Use for: small dimension tables, reference tables, daily full snapshots
BEGIN;
TRUNCATE TABLE silver.store_master;
INSERT INTO silver.store_master SELECT * FROM source_store_master;
COMMIT;
# Risk: window between TRUNCATE and INSERT where table is empty
# Fix: use a staging table + atomic swap:
CREATE TABLE silver.store_master_staging AS SELECT * FROM source_store_master;
ALTER TABLE silver.store_master_staging RENAME TO store_master_new;
ALTER TABLE silver.store_master RENAME TO store_master_old;
ALTER TABLE store_master_new RENAME TO store_master;
DROP TABLE silver.store_master_old;
# ── APPEND ONLY ───────────────────────────────────────────────────────────────
# Only add new rows. Never update or delete.
# Use for: event logs, immutable facts, audit trails
INSERT INTO silver.events (event_id, user_id, event_type, ts)
SELECT event_id, user_id, event_type, ts
FROM staging.events
WHERE ts > (SELECT MAX(ts) FROM silver.events);
# Risk: duplicates on rerun (if some rows already inserted)
# Fix: add UNIQUE constraint on event_id + use ON CONFLICT DO NOTHING
# ── UPSERT (INSERT OR UPDATE) ────────────────────────────────────────────────
# Insert new rows. Update existing rows if they changed.
# The workhorse of incremental loading.
# Use for: mutable entities (orders, customers, products)
-- PostgreSQL:
INSERT INTO silver.orders (order_id, status, amount, updated_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (order_id)
DO UPDATE SET
status = EXCLUDED.status,
amount = EXCLUDED.amount,
updated_at = EXCLUDED.updated_at
WHERE silver.orders.updated_at < EXCLUDED.updated_at;
-- The WHERE clause prevents overwriting newer data with older data (important!)
-- Snowflake MERGE:
MERGE INTO silver.orders AS target
USING staging.orders AS source
ON target.order_id = source.order_id
WHEN MATCHED AND target.updated_at < source.updated_at THEN
UPDATE SET status = source.status, amount = source.amount
WHEN NOT MATCHED THEN
INSERT (order_id, status, amount, updated_at)
VALUES (source.order_id, source.status, source.amount, source.updated_at);
# ── DELTA MERGE (for lakehouses) ─────────────────────────────────────────────
from delta.tables import DeltaTable
DeltaTable.forPath(spark, 's3://freshmart-lake/silver/orders').alias('target') .merge(
source = staging_df.alias('source'),
condition = 'target.order_id = source.order_id',
) .whenMatchedUpdate(
condition = 'target.updated_at < source.updated_at',
set = {
'status': 'source.status',
'amount': 'source.amount',
'updated_at': 'source.updated_at',
},
) .whenNotMatchedInsertAll() .execute()The Eight Design Principles of Reliable Pipelines
Two pipelines can be functionally identical — they move the same data from the same source to the same destination — but have dramatically different reliability profiles. One fails once a month and recovers automatically in 15 minutes. The other fails weekly, requires manual intervention, and sometimes produces wrong data.
The difference is design principles. These eight principles are what senior data engineers apply when designing pipelines and what they look for when reviewing pipeline code. Apply them and pipelines become reliable infrastructure. Ignore them and pipelines become technical debt.
Pipeline Topologies — The Shapes Data Flows Take
Real data platforms are not single linear pipelines. They are networks of pipelines with different shapes. Recognising the topology of a data flow immediately tells you its failure modes, its parallelism opportunities, and its monitoring requirements.
# ── LINEAR PIPELINE ───────────────────────────────────────────────────────────
# Source → Transform → Sink
# The simplest topology. One input, one output, sequential stages.
[PostgreSQL orders] → [Python cleaner] → [S3 Bronze Parquet]
# Properties:
# Simple failure model: one failure point, clear restart path
# No parallelism between stages
# Used for: simple batch ingestion, API-to-warehouse pipelines
# ── FAN-OUT PIPELINE ─────────────────────────────────────────────────────────
# One source, multiple sinks.
# Same data written to multiple destinations simultaneously or sequentially.
┌→ [S3 data lake (Parquet)]
[Kafka payments] ─────┤→ [PostgreSQL (OLTP write-through)]
└→ [Elasticsearch (search index)]
# Properties:
# If one sink fails, others may succeed → inconsistency across sinks
# Must decide: fail all if any fail, or allow partial success?
# Used for: CDC fan-out, event-driven architectures, dual-write patterns
# ── FAN-IN PIPELINE ──────────────────────────────────────────────────────────
# Multiple sources, one sink.
# Data from different sources merged into one unified destination.
[Razorpay payments] ─┐
[Paytm payments] ─┤→ [UNION ALL] → [silver.all_payments]
[PhonePe payments] ─┘
# Properties:
# If one source fails, do you write partial data or wait for all sources?
# Must dedup after union (same transaction ID from multiple sources?)
# Used for: multi-source consolidation, polyglot persistence → unified lake
# ── DAG PIPELINE ─────────────────────────────────────────────────────────────
# Multiple stages with dependencies. Some stages can run in parallel.
# A Directed Acyclic Graph (DAG) — no cycles.
[Extract orders] ─────┬──────────────────┐
[Extract customers] ──┤→ [Silver orders] →┤→ [Gold daily revenue]
[Extract restaurants] ─┘ └→ [Gold customer LTV]
# Properties:
# Stages without dependencies can run in parallel (faster)
# A failed upstream stage blocks all downstream stages
# This is exactly what Airflow DAGs model
# Used for: dbt projects, complex multi-source transformations
# ── STREAMING PIPELINE ───────────────────────────────────────────────────────
# Continuous, event-driven. Data is processed as it arrives, not in batches.
[Kafka topic: orders] → [Flink/Spark Streaming] → [Kafka topic: enriched_orders]
→ [Cassandra (real-time store)]
# Properties:
# No concept of "a run" — continuous execution
# Failure means falling behind, not stopping completely (consumer lag)
# State management is complex (windowing, watermarks)
# Used for: real-time dashboards, fraud detection, CDC materialisation
# ── LAMBDA ARCHITECTURE (batch + streaming combined) ─────────────────────────
# Two paths: slow batch path for accuracy, fast streaming path for low latency.
[Source data] ──┬─ [Batch (Spark, nightly)] ──────────→ [Batch layer (accurate)]
└─ [Streaming (Flink, real-time)] ──────→ [Speed layer (fast)]
↓
[Serving layer: merge both]
# Properties:
# Complex to maintain: two codebases for same logic
# Kappa architecture (streaming only) is the modern alternative
# Used for: systems that need both historical accuracy and real-time freshnessETL vs ELT vs EL — Why the Order Matters
The three-letter acronyms ETL, ELT, and EL describe where transformation happens in the pipeline. This is not a trivial naming distinction — the position of the transformation step determines what tools you use, who can see and change the transformation logic, and how you debug when data is wrong.
| Pattern | Full name | Where transform happens | When to use |
|---|---|---|---|
| ETL | Extract → Transform → Load | Before loading. Data is cleaned and shaped BEFORE it reaches the destination. Python/Spark pipeline does the transformation. | Source data is sensitive (PII must be masked before landing), destination has strict schema enforcement, transformation is complex and requires Python/ML. |
| ELT | Extract → Load → Transform | After loading. Raw data lands in the warehouse/lake first, THEN SQL/dbt transforms it in place. The destination does the transformation work. | Modern data warehouse (Snowflake/BigQuery) is the compute engine. Transformation logic is primarily SQL. Analysts need access to raw data. Schema flexibility is needed at load time. |
| EL | Extract → Load (no transform) | No transformation. Raw data is landed exactly as received in the destination. | Landing zone / Bronze layer ingestion. Transformation happens later in a separate pipeline. Need to preserve the exact original data for audit, debugging, or reprocessing. |
# ── ETL: transform BEFORE loading (Python pipeline) ─────────────────────────
# Python pipeline does all transformation:
def etl_orders(source_conn, dest_conn):
# Extract
raw = pd.read_sql("SELECT * FROM orders WHERE updated_at > %s", source_conn)
# Transform (Python/Pandas)
raw = raw.drop_duplicates(subset=['order_id'])
raw = raw[raw['amount'] > 0]
raw['status'] = raw['status'].str.lower().str.strip()
raw['created_at'] = pd.to_datetime(raw['created_at'], utc=True)
raw['customer_city'] = raw['customer'].apply(lambda x: x.get('city')) # flatten JSON
raw = raw[raw['status'].isin(['placed','confirmed','delivered','cancelled'])]
# Load — destination receives clean, typed data
raw.to_sql('silver_orders', dest_conn, if_exists='append', index=False)
# ── ELT: load raw THEN transform with SQL/dbt ────────────────────────────────
# Step 1: EL — load raw data as-is
def extract_load_orders(source_conn, warehouse_conn):
raw = pd.read_sql("SELECT * FROM orders WHERE updated_at > %s", source_conn)
raw.to_sql('raw_orders', warehouse_conn, if_exists='append') # load raw, no transforms
# Step 2: dbt model transforms the raw table inside the warehouse
# models/silver/orders.sql:
# WITH source AS (
# SELECT * FROM {{ source('raw', 'orders') }}
# ),
# cleaned AS (
# SELECT
# order_id,
# amount::DECIMAL(10,2),
# LOWER(TRIM(status)) AS status,
# created_at::TIMESTAMPTZ
# FROM source
# WHERE amount > 0
# AND LOWER(status) IN ('placed','confirmed','delivered','cancelled')
# QUALIFY ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) = 1
# )
# SELECT * FROM cleaned;
# MODERN BEST PRACTICE (2026):
# EL raw data into the lake/warehouse (Bronze layer)
# dbt/SQL transforms it in place (Silver and Gold layers)
# Python ETL only for: PII masking, ML feature engineering, complex flattening
# Never transform in the extraction layer if the warehouse can do itHow Pipelines Fail — The Complete Taxonomy
Every pipeline will fail. The question is not whether but when and how badly. Understanding the complete taxonomy of pipeline failures is what lets a data engineer design pipelines that fail gracefully, recover automatically, and alert clearly when human intervention is needed.
FAILURE CATEGORY EXAMPLES DEFAULT BEHAVIOUR CORRECT BEHAVIOUR
──────────────────────────────────────────────────────────────────────────────────────────────────
Source unavailable DB connection timeout Crash with error Retry with backoff
API 503 Service Unavailable Alert if > N retries
SFTP server unreachable
Source data changed New column added to source Wrong data written Schema validation
Column renamed in source API silently Alert + DLQ bad rows
Type changed (string → number)
Source data quality NULL in required field Wrong aggregations Row-level validation
Negative amounts (silent!) DLQ invalid rows
Duplicate primary keys Alert if DLQ fills up
Transformation bug Wrong SQL logic Wrong data written dbt tests catch before deploy
Off-by-one in date range (no error!) Code review
NULL propagation in calculation Data quality checks
Destination issue Warehouse out of disk Crash with error Retry, then alert
Schema mismatch on write Schema check fails Schema validation before write
Table locked by another query Timeout or deadlock Retry + timeout config
Resource exhaustion OOM on large dataset Crash Chunked processing
Disk full mid-write Corrupt output Disk space checks before run
Rate limit on API sink Throttling error Proactive rate limiting
Infrastructure Network partition Timeout Retry with exponential backoff
Pod eviction (Kubernetes) Mid-run failure Resumable from checkpoint
Spot instance termination Data corruption SIGTERM handler + checkpoint
Orchestration Dependency task failed Downstream skipped Explicit failure propagation
Wrong schedule (timezone bug) Wrong time range Fixed UTC schedule + monitoring
Concurrent runs overlap Duplicate data Lock file / mutex
SLA breach Pipeline takes 4h instead of 1h Late data in dash Timeout + alerting
Source delivers data late Late pipeline run SLA monitoring, not just failure
Backfill job blocks daily run Daily run delayed Job priority managementThe pipeline health checklist
A pipeline is not just "running" or "not running." There are intermediate states that require attention: running too slowly (SLA risk), producing fewer rows than expected (data quality issue), or succeeding but writing wrong data (the most dangerous state because it produces no alert).
# Write these metrics after every pipeline run to a runs table:
CREATE TABLE monitoring.pipeline_runs (
run_id UUID PRIMARY KEY,
pipeline_name VARCHAR(100) NOT NULL,
run_date DATE NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ,
status VARCHAR(20) NOT NULL, -- 'running', 'success', 'failed', 'partial'
rows_extracted BIGINT,
rows_written BIGINT,
rows_rejected BIGINT,
duration_seconds DECIMAL(10,2),
error_message TEXT,
dlq_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ALERT CONDITIONS (set up in monitoring tool):
-- 1. status = 'failed' → immediate alert
-- 2. duration_seconds > expected_duration * 2 → SLA warning
-- 3. rows_written < expected_rows * 0.8 → data quality alert
-- 4. rows_rejected > total_rows * 0.05 → data quality alert (>5% rejected)
-- 5. dlq_count > 100 → investigate DLQ
-- 6. No row inserted for today at 8 AM → pipeline did not run
-- DATA QUALITY CHECK after every run:
SELECT
run_date,
rows_written,
LAG(rows_written) OVER (ORDER BY run_date) AS prev_day_rows,
ABS(rows_written - LAG(rows_written) OVER (ORDER BY run_date))
/ NULLIF(LAG(rows_written) OVER (ORDER BY run_date), 0) AS pct_change
FROM monitoring.pipeline_runs
WHERE pipeline_name = 'orders_ingestion'
ORDER BY run_date DESC
LIMIT 30;
-- Alert if pct_change > 0.3 (30% day-over-day change is suspicious)Pipeline vs Workflow vs DAG vs Job — Precise Terminology
These terms are often used interchangeably but have distinct meanings in professional data engineering. Using them precisely in conversations, documentation, and code makes communication clearer.
| Term | Precise meaning | Example |
|---|---|---|
| Task | The smallest unit of work — one atomic operation that succeeds or fails as a whole. | Run dbt model fct_orders. Extract one day of orders from API. Write one batch to S3. |
| Job | A single executable unit — a script, a Spark application, a dbt model run. One process, one purpose. | orders_ingestion.py — a Python script that runs once and exits. spark-submit process_events.jar. |
| Pipeline | A sequence of tasks or jobs that move data from source to sink. May be a single job or multiple jobs in sequence. | Extract orders → Bronze Parquet → Silver cleaning → Gold aggregation. |
| Workflow | A coordinated set of pipelines with dependencies, schedules, and error handling. A workflow defines what runs when and in what order. | The daily FreshMart workflow: ingest orders + customers + products, then run dbt Silver, then run Gold models. |
| DAG | Directed Acyclic Graph — the specific representation of a workflow as a graph where nodes are tasks and edges are dependencies. Used in Airflow. | An Airflow DAG with 12 tasks: 3 extraction tasks → 2 validation tasks → 4 dbt tasks → 3 alert tasks. |
| Orchestrator | The system that schedules and executes workflows — manages dependencies, retries, alerting, and history. | Apache Airflow, Prefect, Dagster, dbt Cloud, AWS Step Functions, GitHub Actions. |
What Good Pipeline Code Looks Like
A pipeline that is correct but unreadable, untestable, and unmaintainable is a liability. Production pipelines run for years. The team that maintains them changes. The data engineer who wrote it three years ago is not available to explain why a particular branch condition exists. Good pipeline code is self-documenting, testable at every layer, and structured so that changes can be made safely.
"""
orders_ingestion_pipeline.py
Daily orders ingestion: PostgreSQL source → S3 Bronze Parquet
Schedule: 06:00 AM IST daily (00:30 UTC)
Owner: data-team@freshmart.com
SLA: complete by 07:00 AM IST
Dependencies: none (first pipeline in daily DAG)
Idempotent: yes (upserts on order_id)
Resumable: yes (checkpoint per S3 partition written)
"""
# ── IMPORTS: clear separation of standard, third-party, local ─────────────────
import os
import json
import logging
import uuid
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Iterator
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
# ── CONSTANTS: at the top, named, never magic numbers ─────────────────────────
BATCH_SIZE = 100_000
TARGET_FILE_MB = 512
CHECKPOINT_DIR = Path('/data/checkpoints')
DLQ_DIR = Path('/data/dlq')
# ── CONFIGURATION: from environment, validated at startup ─────────────────────
class Config:
db_url: str = os.environ['SOURCE_DB_URL']
s3_path: str = os.environ['S3_OUTPUT_PATH']
# ── FUNCTIONS: each one does one thing, has a clear name ──────────────────────
def extract_orders(conn, run_date: date) -> Iterator[dict]:
"""
Extract all orders for run_date from PostgreSQL.
Uses run_date as the fixed window — idempotent for the same date.
Yields one row at a time — constant memory regardless of volume.
"""
start_ts = datetime(run_date.year, run_date.month, run_date.day, tzinfo=timezone.utc)
end_ts = start_ts + timedelta(days=1)
with conn.cursor('orders_cursor') as cur: # server-side cursor: streams rows
cur.execute(
"SELECT * FROM orders WHERE created_at >= %s AND created_at < %s",
(start_ts, end_ts),
)
for row in cur:
yield dict(zip([desc[0] for desc in cur.description], row))
def validate_row(row: dict) -> tuple[dict | None, str | None]:
"""
Validate one order row. Returns (clean_row, None) or (None, error_reason).
Pure function — no I/O, fully unit-testable.
"""
if not row.get('order_id'):
return None, 'missing_order_id'
if (row.get('amount') or 0) <= 0:
return None, f'invalid_amount: {row.get("amount")}'
if row.get('status') not in ('placed', 'confirmed', 'delivered', 'cancelled'):
return None, f'invalid_status: {row.get("status")}'
return row, None
def write_parquet_batch(rows: list[dict], path: str) -> None:
"""Write a list of row dicts to a Parquet file. Single responsibility."""
table = pa.Table.from_pylist(rows)
pq.write_table(table, path, compression='zstd')
def run(run_date: date) -> dict:
"""
Main pipeline function. Orchestrates extract → validate → load.
Returns run statistics.
"""
run_id = str(uuid.uuid4())
log = logging.getLogger('orders_ingestion')
stats = {'run_id': run_id, 'rows_extracted': 0, 'rows_written': 0, 'rows_rejected': 0}
log.info('Pipeline started', extra={'run_date': str(run_date), 'run_id': run_id})
conn = psycopg2.connect(Config.db_url)
batch: list[dict] = []
chunk = 0
try:
for row in extract_orders(conn, run_date):
stats['rows_extracted'] += 1
clean, error = validate_row(row)
if error:
stats['rows_rejected'] += 1
# Write to DLQ — do not crash the whole pipeline for one bad row
with open(DLQ_DIR / f'orders_{run_date}_{run_id}.ndjson', 'a') as f:
f.write(json.dumps({'error': error, 'row': row}) + '
')
continue
batch.append(clean)
if len(batch) >= BATCH_SIZE:
chunk += 1
output_path = f'{Config.s3_path}/date={run_date}/part-{chunk:05d}.parquet'
write_parquet_batch(batch, output_path)
stats['rows_written'] += len(batch)
log.info('Batch written', extra={'chunk': chunk, 'cumulative': stats['rows_written']})
batch = []
# Write final partial batch:
if batch:
chunk += 1
output_path = f'{Config.s3_path}/date={run_date}/part-{chunk:05d}.parquet'
write_parquet_batch(batch, output_path)
stats['rows_written'] += len(batch)
finally:
conn.close()
log.info('Pipeline complete', extra=stats)
return stats
# ── ENTRY POINT: handles CLI arguments, calls run() ───────────────────────────
if __name__ == '__main__':
import sys
logging.basicConfig(level=logging.INFO, format='%(message)s')
run_date = (
date.fromisoformat(sys.argv[1])
if len(sys.argv) > 1
else date.today() - timedelta(days=1)
)
result = run(run_date)
sys.exit(0 if result['rows_rejected'] / max(result['rows_extracted'], 1) < 0.05 else 1)Auditing a Fragile Pipeline and Redesigning It
You are asked to audit the existing orders pipeline and identify what is fragile about it. Here is the original pipeline code you inherit:
# ORIGINAL PIPELINE (from a junior engineer two years ago)
import psycopg2
import pandas as pd
conn = psycopg2.connect("postgresql://admin:password123@prod-db-01:5432/orders")
df = pd.read_sql("SELECT * FROM orders", conn) # PROBLEM 1
df['amount'] = df['amount'].astype(float) # PROBLEM 2
df = df.dropna() # PROBLEM 3
df.to_sql('silver_orders', warehouse_conn, if_exists='replace') # PROBLEM 4
print("done") # PROBLEM 5Problem 1 — Full extraction every run: SELECT * FROM orders reads the entire orders table (currently 180 million rows) every morning. Takes 4 hours. Slows production database. No incremental pattern.
Problem 2 — Silent type casting failure: astype(float) raises a ValueError and crashes the entire pipeline if any amount is a non-numeric string (which happens from a specific vendor once a week).
Problem 3 — Silent data deletion: dropna() drops ALL rows containing ANY null value. Orders with a null promo_code (the majority) are silently deleted. Revenue metrics are wrong.
Problem 4 — Truncate-and-replace every run: if_exists='replace' drops and recreates the entire table every run. The table is empty during the 4-hour run. Analysts see zero data all morning. No idempotency.
Problem 5 — No logging, no observability: The only output is "done." No row counts, no timing, no run ID. When something goes wrong, there is no information to debug with.
After applying the eight design principles, the pipeline becomes the structured, resumable, observable version shown in Part 08. It processes only yesterday's new orders (incremental), validates each row individually and sends failures to a DLQ (data quality enforcement), writes in batches with upserts (idempotency), logs structured metrics (observability), and takes 4 minutes instead of 4 hours (source isolation). Every principle has a direct, measurable impact.
5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓A data pipeline moves data from sources to sinks through transformations. Every pipeline has the same anatomy: Source → Extraction → Transformation → Loading → Sink, with Orchestration and Monitoring around it. The technology changes; the anatomy does not.
- ✓Extraction is either full (read everything, every run — simple, expensive) or incremental (read only changes since last run — efficient, requires a watermark column and checkpoint). Use incremental extraction for any table with more than a few million rows.
- ✓Loading patterns: full replace (truncate + reload — simple, destination empty during run), append-only (INSERT for immutable events), upsert (INSERT ... ON CONFLICT DO UPDATE — the correct default for mutable entities). Always use upserts with a UNIQUE constraint on the business key.
- ✓ETL transforms before loading — good for PII masking, complex Python logic. ELT loads raw then transforms with SQL/dbt inside the warehouse — the modern standard. Most teams in 2026 use ELT with dbt for transformations and raw data preserved in the landing zone.
- ✓The eight design principles: Idempotency, Resumability, Observability, Isolation, Data Quality Enforcement, Source Isolation, Atomicity at the right granularity, Minimal Footprint. Apply all eight and pipelines become reliable infrastructure. Ignore them and they become fragile scripts.
- ✓Idempotency is the most critical single principle. Achieved by: upserts not inserts, UNIQUE constraints on business keys, fixed time windows as parameters. An idempotent pipeline can be rerun at any time without causing data quality issues.
- ✓The most dangerous pipeline failure is silent data incorrectness — the pipeline reports success but the data is wrong. Prevent it with row count validation after every run, value range checks in dbt tests, and comparing output row counts to source row counts.
- ✓Pipeline topologies: linear (one source, one sink), fan-out (one source, multiple sinks), fan-in (multiple sources, one sink), DAG (multiple stages with dependencies). Each topology has different failure modes and parallelism opportunities.
- ✓Write pipeline runs metadata to a monitoring table: run_id, pipeline_name, started_at, finished_at, status, rows_extracted, rows_written, rows_rejected, duration_seconds. Alert on failures, SLA breaches, and anomalous row counts — not just outright failures.
- ✓A pipeline and a DAG are not the same thing. A pipeline is a data flow. A DAG is the dependency graph that orchestrates multiple pipelines or tasks. An Airflow DAG for the morning data platform may contain 15 tasks across 6 pipelines.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.