Data Ingestion Patterns — Full Load, Incremental, CDC
The three patterns that cover every source — when each is correct, how each fails, and how to choose.
Every Ingestion Problem Falls Into One of Three Patterns
A data engineer's first job with any new source system is answering one question: how do I get data out of this reliably, completely, and without harming it? The answer is almost always a variant of one of three ingestion patterns. Recognising which pattern fits which source — and understanding precisely why — is one of the most fundamental skills in the discipline.
The three patterns exist on a spectrum from simple-but-expensive to complex-but-efficient. The simplest pattern reads everything every time. The most complex pattern reads only what changed, down to the individual database operation level. The right choice depends on the source's characteristics, the data's update frequency, the destination's freshness requirement, and the source system's tolerance for load.
Full Load — Read Everything, Every Time
Full load is the simplest ingestion pattern. Every run reads the complete source table and replaces the destination's content entirely. No watermarks, no change tracking, no complexity. For small tables that change frequently in hard-to-track ways, it is often the correct and permanent choice.
How full load works
FULL LOAD PATTERN:
Every run:
1. Read ALL rows from source
2. Truncate destination (or write to staging)
3. Insert all rows into destination
4. Done — destination is an exact copy of source at run time
VARIANT A: Truncate and reload (simple, destination empty during load)
BEGIN;
TRUNCATE TABLE silver.store_master;
INSERT INTO silver.store_master
SELECT
store_id, store_name, city, region, is_active, manager_id
FROM source.stores;
COMMIT;
-- Atomically: destination is empty for the duration of the transaction
-- Other queries see either all-old or all-new, never empty (due to MVCC)
VARIANT B: Staging table swap (zero-downtime, destination always available)
-- Step 1: load to staging table
CREATE TABLE silver.store_master_new AS
SELECT store_id, store_name, city, region, is_active, manager_id
FROM source.stores;
-- Step 2: atomic rename (milliseconds)
BEGIN;
ALTER TABLE silver.store_master RENAME TO store_master_old;
ALTER TABLE silver.store_master_new RENAME TO store_master;
COMMIT;
-- Step 3: drop old table
DROP TABLE silver.store_master_old;
-- During load: store_master_old serves queries
-- After rename: store_master (new) serves queries
-- Zero seconds where table is empty or has partial data
PYTHON IMPLEMENTATION (full load with staging swap):
def full_load_with_swap(source_conn, dest_conn, table: str) -> int:
df = pd.read_sql(f"SELECT * FROM ${table}", source_conn)
staging = f"${table}_staging"
df.to_sql(staging, dest_conn, if_exists='replace', index=False)
with dest_conn.cursor() as cur:
cur.execute(f"ALTER TABLE ${table} RENAME TO ${table}_old")
cur.execute(f"ALTER TABLE ${staging} RENAME TO ${table}")
cur.execute(f"DROP TABLE ${table}_old")
dest_conn.commit()
return len(df)When full load is genuinely the right choice
When full load breaks down
FAILURE MODE 1: Table grows too large for full extraction
Table: orders (FreshMart) — 500 million rows after 3 years
Full load time: 6 hours
Pipeline SLA: complete by 6 AM
Pipeline runtime on a bad day: started 11 PM, finishes 5 AM next day
→ Barely fits. One slow query and the SLA is breached.
→ After year 4: 700 million rows → 8.5 hours → SLA breach guaranteed.
Signal to switch: full load duration > 20% of run interval.
FAILURE MODE 2: Source load during extraction
Full extraction reads every row via a full table scan.
On a production PostgreSQL database:
→ Fills the buffer pool (evicts hot pages)
→ Application queries slow down for 30–60 minutes after
→ If source cannot provide a read replica, this causes harm
Solution: extract from a read replica, not the primary.
FAILURE MODE 3: Destination inconsistency window
Between TRUNCATE and INSERT completion, destination is empty.
If a query runs during this window, it sees no data.
Solution: staging table swap (Variant B above) eliminates this window.
FAILURE MODE 4: Reload overwrites late-arriving data
If a row was manually corrected in the destination (a data fix),
the next full load overwrites it with the uncorrected source value.
This is expected behaviour for full load — but teams get surprised by it.
If you need to preserve destination edits: use incremental or CDC instead.Incremental — Only What Changed
Incremental ingestion reads only the rows that were created or modified since the previous run. A high-watermark column — typically an updated_at timestamp or an auto-incrementing ID — tracks progress. The pipeline records the maximum watermark value it saw on the last run, and uses it as the lower bound for the next run's extraction query.
This pattern scales to arbitrarily large tables. A 1-billion-row orders table that receives 100,000 new or updated orders per day only requires reading 100,000 rows per run, not 1 billion. The extraction time is proportional to the change volume, not the total table size.
Complete incremental implementation
import json
import logging
from datetime import datetime, timezone, timedelta
from pathlib import Path
import psycopg2
import pandas as pd
log = logging.getLogger('incremental_ingestion')
CHECKPOINT_FILE = Path('/data/checkpoints/orders_watermark.json')
# ── Watermark management ───────────────────────────────────────────────────────
def load_watermark() -> datetime:
"""Load the last successfully processed watermark."""
if CHECKPOINT_FILE.exists():
data = json.loads(CHECKPOINT_FILE.read_text())
wm = datetime.fromisoformat(data['watermark'])
log.info('Loaded watermark: ${s}', wm.isoformat())
return wm
# First run — use a safe historical start
default = datetime(2020, 1, 1, tzinfo=timezone.utc)
log.info('No checkpoint found — starting from ${s}', default.isoformat())
return default
def save_watermark(watermark: datetime) -> None:
"""Save watermark atomically — write temp then rename."""
tmp = CHECKPOINT_FILE.with_suffix('.tmp')
tmp.write_text(json.dumps({'watermark': watermark.isoformat()}))
tmp.rename(CHECKPOINT_FILE) # atomic on POSIX filesystems
# ── Extraction ─────────────────────────────────────────────────────────────────
def extract_changed_orders(
conn,
since: datetime,
until: datetime,
) -> pd.DataFrame:
"""
Extract orders modified between since and until.
Uses a closed lower bound (>) to avoid re-processing the boundary row.
Uses a closed upper bound (<=) to include rows modified at exactly until.
"""
query = """
SELECT
order_id, customer_id, store_id,
order_amount, status, created_at, updated_at
FROM orders
WHERE updated_at > %s
AND updated_at <= %s
ORDER BY updated_at ASC
"""
df = pd.read_sql(query, conn, params=(since, until))
log.info('Extracted ${d} rows (updated ${s} to ${s})',
len(df), since.isoformat(), until.isoformat())
return df
# ── Loading ────────────────────────────────────────────────────────────────────
def upsert_orders(df: pd.DataFrame, dest_conn) -> int:
"""Upsert orders into Silver layer — idempotent."""
if df.empty:
return 0
with dest_conn.cursor() as cur:
for _, row in df.iterrows():
cur.execute("""
INSERT INTO silver.orders
(order_id, customer_id, store_id, order_amount, status,
created_at, updated_at, ingested_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
order_amount = EXCLUDED.order_amount,
updated_at = EXCLUDED.updated_at,
ingested_at = NOW()
WHERE silver.orders.updated_at < EXCLUDED.updated_at
""", (row.order_id, row.customer_id, row.store_id,
row.order_amount, row.status, row.created_at, row.updated_at))
dest_conn.commit()
return len(df)
# ── Main pipeline ──────────────────────────────────────────────────────────────
def run_incremental(source_conn, dest_conn) -> dict:
"""Run one incremental ingestion cycle."""
since = load_watermark()
# Use source DB's NOW() as upper bound to avoid clock skew:
until = pd.read_sql("SELECT NOW() AT TIME ZONE 'UTC'",
source_conn).iloc[0, 0].to_pydatetime()
df = extract_changed_orders(source_conn, since, until)
if df.empty:
log.info('No changes since last watermark')
return {'rows_processed': 0, 'new_watermark': since.isoformat()}
written = upsert_orders(df, dest_conn)
# Only advance watermark AFTER successful write:
save_watermark(until)
return {
'rows_processed': written,
'new_watermark': until.isoformat(),
'max_source_ts': df['updated_at'].max().isoformat() if not df.empty else None,
}The critical pitfalls of incremental ingestion
PITFALL 1: HARD DELETES ARE INVISIBLE
Scenario: order_id 9284751 is deleted from the source PostgreSQL table.
Incremental query: SELECT * FROM orders WHERE updated_at > checkpoint
What happens: the deleted row produces no result in the query.
Destination: still has order_id 9284751 from the previous ingestion run.
Impact: destination data diverges from source silently. Metrics wrong.
Solutions:
A) Use CDC instead (CDC captures DELETE operations explicitly)
B) Use a soft-delete column: deleted_at TIMESTAMPTZ or is_deleted BOOLEAN
Soft deletes update updated_at → appear in incremental query
Pipeline handles is_deleted=TRUE by marking destination row as deleted
C) Periodic full load to reconcile (run full load weekly on top of incremental)
Full load will overwrite destination to match source — deletes reconciled
Use when: deletions are rare, weekly reconciliation is acceptable
PITFALL 2: MISSING updated_at COLUMN
Many legacy tables have only created_at (immutable).
Solution A: use max(primary_key_id) as watermark if PK is auto-increment
SELECT * FROM orders WHERE order_id > last_max_id
Works when: rows are insert-only (orders are never updated after creation)
Breaks when: rows are updated (updates do not change the ID)
Solution B: use CDC (does not depend on application-managed timestamps)
Solution C: full load if the table is small enough
PITFALL 3: CLOCK SKEW BETWEEN SOURCE AND PIPELINE SERVER
Pipeline server clock: 06:00:00 UTC
Source DB clock: 06:00:02 UTC (2 seconds ahead)
Watermark saved after last run: 06:00:00 UTC (pipeline server time)
Next query: WHERE updated_at > '06:00:00'
Row inserted at 05:59:59 on source clock? INCLUDED (correct)
Row inserted at 06:00:01 on source clock? EXCLUDED (source says future)
Row inserted between 06:00:00 and 06:00:02? POTENTIALLY MISSED
Fix: always use the SOURCE DATABASE's NOW() as the upper bound:
SELECT NOW() FROM source_db -- source time, not pipeline server time
Query: WHERE updated_at > last_checkpoint AND updated_at <= source_now
Or: overlap the extraction window by 5 minutes:
since = last_watermark - timedelta(minutes=5)
until = source_now
Use upsert at destination to handle re-processed rows idempotently.
PITFALL 4: BACKFILL AND LATE-ARRIVING UPDATES
Row updated_at: 2026-03-17 11:58:00
Pipeline checkpoint at: 2026-03-17 12:00:00
Row arrives in source DB at: 2026-03-17 12:03:00 (application retry delayed)
Next pipeline run query: WHERE updated_at > 12:00:00
Row's updated_at (11:58:00) < checkpoint (12:00:00) → MISSED
Fix: use an overlap window that extends the lower bound back by a safe margin
since = last_checkpoint - timedelta(minutes=30) # 30-minute lookback
Upsert handles duplicates from the overlap idempotently.
Cost: ~30 minutes of re-processed rows per run (small if update volume is moderate)Watermark column selection — the decision matters
| Watermark type | How to query | Works for updates? | Works for deletes? | Notes |
|---|---|---|---|---|
| updated_at (TIMESTAMPTZ) | WHERE updated_at > checkpoint | ✓ Yes | ✗ No | Best option. Requires the application to maintain updated_at correctly. |
| created_at only | WHERE created_at > checkpoint | ✗ No — updates not captured | ✗ No | Only correct for append-only tables (logs, events, immutable facts). |
| Auto-increment PK | WHERE order_id > max_id | ✗ No — updates not captured | ✗ No | Only for insert-only tables. Breaks if records are inserted out of ID order. |
| Combination (created OR updated) | WHERE created_at > cp OR updated_at > cp | ✓ Yes | ✗ No | Handle tables with separate created_at and updated_at columns carefully. |
| None — use CDC | Read WAL directly | ✓ Yes | ✓ Yes | When no reliable timestamp exists. Most complete but most complex. |
Change Data Capture — The Complete Picture
Change Data Capture reads the database's own transaction log — the Write-Ahead Log (WAL) in PostgreSQL, the binlog in MySQL — and converts every insert, update, and delete into a structured event that the pipeline can consume. CDC captures everything the database records, which includes operations that are invisible to any query-based approach: hard deletes, multi-table transactions, and changes happening faster than the query polling interval.
How CDC works at the database level
HOW POSTGRESQL WAL-BASED CDC WORKS:
APPLICATION writes to PostgreSQL:
BEGIN;
UPDATE orders SET status = 'delivered' WHERE order_id = 9284751;
INSERT INTO delivery_logs (order_id, delivered_at) VALUES (9284751, NOW());
COMMIT;
POSTGRESQL WAL records (binary format, simplified):
{LSN: 0/1A3F2B8, txn: 847291, op: UPDATE, table: orders,
old: {order_id: 9284751, status: 'confirmed'},
new: {order_id: 9284751, status: 'delivered'}}
{LSN: 0/1A3F2BC, txn: 847291, op: INSERT, table: delivery_logs,
new: {order_id: 9284751, delivered_at: '2026-03-17T20:14:32Z'}}
{LSN: 0/1A3F2C0, txn: 847291, op: COMMIT}
DEBEZIUM reads WAL via PostgreSQL's logical replication protocol:
Decodes binary WAL records into structured JSON events
Publishes to Kafka topic: 'prod.public.orders'
KAFKA MESSAGE (what the pipeline consumer receives):
{
"before": {"order_id": 9284751, "status": "confirmed"},
"after": {"order_id": 9284751, "status": "delivered"},
"op": "u", // u=update, c=create, r=read/snapshot, d=delete
"ts_ms": 1710698072847,
"source": {
"db": "production",
"table": "orders",
"lsn": 28437128, // log sequence number — position in WAL
"txId": 847291
}
}
For a DELETE:
{
"before": {"order_id": 9284751, "status": "delivered"},
"after": null,
"op": "d" // delete — before image available, after is null
}
CDC CAPTURES EVERYTHING:
✓ INSERT → op: "c", before: null, after: {new row}
✓ UPDATE → op: "u", before: {old values}, after: {new values}
✓ DELETE → op: "d", before: {deleted row}, after: null
✓ Schema changes (with schema registry) → schema evolution events
✓ Transaction boundaries → group multi-table operations atomicallySetting up CDC with Debezium on PostgreSQL
# STEP 1: Configure PostgreSQL for logical replication
# Edit postgresql.conf:
wal_level = logical # must be 'logical' (not 'replica' or 'minimal')
max_replication_slots = 10 # number of CDC consumers allowed
max_wal_senders = 10 # parallel WAL streaming connections
# Restart PostgreSQL after changing wal_level.
# STEP 2: Create a dedicated replication user
CREATE USER debezium_user REPLICATION LOGIN PASSWORD 'strong_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;
# STEP 3: Create a logical replication slot (tracks CDC position)
-- Run in psql:
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
-- pgoutput is the built-in PostgreSQL logical replication plugin
# STEP 4: Configure Debezium connector (JSON config posted to Kafka Connect REST API)
# POST http://kafka-connect:8083/connectors
{
"name": "freshmart-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-primary",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "strong_password",
"database.dbname": "freshmart_prod",
"database.server.name": "freshmart",
"table.include.list": "public.orders,public.customers,public.payments",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"snapshot.mode": "initial",
"topic.prefix": "freshmart.cdc",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
# Debezium creates Kafka topics:
# freshmart.cdc.public.orders
# freshmart.cdc.public.customers
# freshmart.cdc.public.payments
# STEP 5: Consume CDC events in your pipeline
from confluent_kafka import Consumer
import json
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'freshmart-cdc-pipeline',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # manual commit — at-least-once delivery
})
consumer.subscribe(['freshmart.cdc.public.orders'])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None or msg.error():
continue
event = json.loads(msg.value())
op = event.get('op') # 'c', 'u', 'd', 'r'
after = event.get('after') # new row values (null for deletes)
before = event.get('before') # old row values (null for inserts)
if op in ('c', 'u', 'r'): # insert, update, or read (snapshot)
upsert_to_silver(after)
elif op == 'd': # delete
soft_delete_in_silver(before['order_id'])
consumer.commit() # commit after successful processingCDC initial snapshot — handling the bootstrap problem
When you first set up CDC, you need to copy the existing table data before the CDC stream begins. This is the initial snapshot — Debezium handles it automatically with snapshot.mode: initial. It reads the entire table once at startup, emitting each row as an op: "r" (read) event, then switches to streaming WAL changes. The pipeline sees a seamless sequence: snapshot rows first, then real-time changes.
SNAPSHOT MODES (Debezium configuration):
snapshot.mode = initial (default)
→ On first start: read entire table as "r" events (consistent snapshot)
→ After snapshot: stream WAL changes
→ Use when: you need historical data AND going forward changes
→ Note: snapshot can take hours for large tables
snapshot.mode = never
→ No snapshot — start streaming from current WAL position
→ Use when: you already populated the destination from a separate bulk load
and only need forward changes
→ Danger: you will miss changes that occurred before the CDC connector started
snapshot.mode = schema_only
→ Capture table schema but no data rows
→ Only stream going-forward changes
→ Use when: destination is pre-populated (e.g., restored from backup)
snapshot.mode = always
→ Full snapshot on every connector restart
→ Only use in development/testing — very expensive in production
PRACTICAL BOOTSTRAP STRATEGY FOR LARGE TABLES:
For a 500M row orders table, Debezium snapshot takes 8+ hours.
Better approach:
1. pg_dump → S3 (parallel, fast: 1-2 hours)
2. Bulk load S3 dump to destination
3. Start Debezium with snapshot.mode=schema_only from the WAL LSN
at the time the dump was taken
4. Apply WAL events from that LSN forward (catches up during/after bulk load)
This reduces bootstrap from 8 hours to 2 hours for large tables.CDC operational considerations
CONCERN 1: REPLICATION SLOT BLOAT
A PostgreSQL replication slot retains WAL segments until the consumer
has confirmed reading them. If the CDC consumer is down or slow,
WAL accumulates indefinitely on the source database.
A slow consumer can fill the source disk and crash the database.
Monitoring: SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(),
restart_lsn) AS lag_bytes FROM pg_replication_slots;
Alert when lag_bytes > 10 GB.
Action: if consumer is stuck, DROP the replication slot (accepts data loss)
rather than let the source database fill up.
CONCERN 2: SLOT LAG GROWING
pg_stat_replication shows the gap between source WAL and consumer position.
Lag grows when: high write volume, consumer processing is slow,
network between source and consumer is slow.
Monitor: set up Datadog/Prometheus alert when replication lag > 5 minutes.
CONCERN 3: TABLE SCHEMA CHANGES (DDL events)
Adding a column to the source table mid-stream:
→ Events before the column addition have the old schema
→ Events after have the new schema
→ Debezium (with Schema Registry) handles this automatically
→ Without Schema Registry: your consumer may fail to parse new event schema
ALWAYS use Confluent Schema Registry with Debezium in production.
CONCERN 4: AT-LEAST-ONCE DELIVERY
Debezium + Kafka provides at-least-once delivery — the same event
may be delivered more than once during consumer restarts or failures.
Destination must handle idempotently: upsert on primary key, not INSERT.
Never use CDC with a plain INSERT at destination.
CONCERN 5: INITIAL SNAPSHOT SIZE
For tables > 100M rows, initial snapshot is expensive.
Use the pg_dump + schema_only approach described above.
Always monitor snapshot progress: Debezium metrics show rows snapshotted.
CDC LATENCY BENCHMARKS (Debezium + Kafka + consumer):
Source write to Kafka event: 50–200ms
Kafka event to consumer processing: 10–100ms
Consumer to destination write: 50–500ms
Total end-to-end (typical): 200ms – 1 second
This is suitable for: near-real-time dashboards, data lake freshness
NOT suitable for: synchronous application flow (too slow for user-facing)Full Load vs Incremental vs CDC — Every Dimension
| Dimension | Full Load | Incremental | CDC |
|---|---|---|---|
| What is read | Every row, every run | Only rows with updated_at > checkpoint | Every database operation from WAL |
| Captures inserts | ✓ Yes | ✓ Yes (via updated_at or created_at) | ✓ Yes (op: c) |
| Captures updates | ✓ Yes (overwrites) | ✓ Yes (if updated_at maintained) | ✓ Yes (op: u, with before/after) |
| Captures hard deletes | ✓ Yes (row absent after reload) | ✗ No (deleted rows invisible to query) | ✓ Yes (op: d, with before image) |
| Source load | Full table scan every run — high | Index scan on watermark column — low | WAL streaming — minimal (async read) |
| Latency | Run interval (minutes to hours) | Run interval (minutes to hours) | Near-real-time (seconds) |
| Before image available | ✗ No | ✗ No | ✓ Yes — previous values before change |
| Complexity | Low | Medium | High |
| Infrastructure required | Source DB + destination | Source DB + destination + checkpoint | WAL logical replication + Kafka + Debezium + destination |
| Requires source config | No | No (read-only query) | Yes — wal_level=logical, replication slot |
| Recovery from failure | Re-run full load | Re-run from last checkpoint | Resume from last committed Kafka offset |
| Best for | Small tables, reference data, no change tracking | Large append-heavy tables with updated_at | Any table with deletes, financial data, low latency |
How to Choose the Right Pattern for Any Source Table
The choice between the three patterns is never arbitrary — it is determined by the source table's characteristics. Answer these four questions in order and the right pattern becomes clear.
QUESTION 1: What is the table's approximate row count and growth rate?
< 1 million rows AND grows slowly? → Full Load is viable (fast, simple)
> 1 million rows OR grows quickly? → Incremental or CDC required
QUESTION 2: Does the table have a reliable updated_at column?
Yes (maintained by application on every write):
→ Incremental is viable. Continue to Question 3.
No (only created_at, or no timestamp at all):
→ If table is insert-only: use created_at or auto-increment PK
→ If table has updates/deletes: CDC or Full Load (no other option)
QUESTION 3: Are hard deletes important for the destination?
No (deletes are rare, destination can lag on deletions, or soft deletes used):
→ Incremental is sufficient.
Yes (deletes must be captured accurately and promptly):
→ CDC required. Incremental cannot see hard deletes.
QUESTION 4: What is the latency requirement?
> 15 minutes acceptable:
→ Incremental with periodic schedule is fine.
< 15 minutes required:
→ CDC (near-real-time) or micro-batch incremental (5-minute interval).
< 1 minute required:
→ CDC only.
PRACTICAL ROUTING TABLE:
product_categories (500 rows, rarely changes) → Full Load
store_master (10 rows, updated monthly) → Full Load
orders (500M rows, updated frequently) → Incremental
customers (10M rows, hard deletes for GDPR) → CDC
payment_transactions (1B rows, financial accuracy critical) → CDC
delivery_events (append-only, no deletes) → Incremental
inventory (updates + deletes frequently) → CDC
promo_codes (small, full correctness needed) → Full Load
audit_logs (append-only, insert-only) → Incremental (created_at)
user_sessions (frequent updates, deletes on logout) → CDCThe mixed-pattern architecture — most production platforms use all three
FRESHMART DATA PLATFORM — INGESTION PATTERN BY TABLE:
FULL LOAD (nightly, fast):
reference.store_master 10 rows → Replaces nightly
reference.product_categories 850 rows → Replaces nightly
reference.city_tier_mapping 500 rows → Replaces nightly
reference.store_manager 10 rows → Replaces nightly
INCREMENTAL (every 15 minutes, updated_at watermark):
orders 500M rows → updated_at watermark
delivery_events 2B rows → created_at (append-only)
customer_reviews 50M rows → created_at (append-only)
inventory_snapshots daily → full date partition replace
CDC (continuous, sub-second latency):
customers (GDPR deletes must be captured)
payments (financial, every operation matters)
merchant_accounts (balance changes, fraud patterns)
inventory_live (real-time stock for flash sales)
INGESTION PIPELINE SCHEDULE:
00:00 UTC: Full load — all reference tables (5 minutes total)
Every 15 min: Incremental — orders, delivery_events, reviews
Continuous: CDC stream — customers, payments, merchants, inventory
TOTAL INFRASTRUCTURE:
Full Load: 2 cron jobs, no special infrastructure
Incremental: 3 scheduled Airflow tasks
CDC: 1 Debezium connector, 4 Kafka topics, 1 Kafka consumer group
→ Most data volume handled by incremental
→ Most operational complexity in CDC (but only for 4 tables)Diagnosing Missing Data — Tracing It to the Ingestion Pattern
The customer success team reports that cancelled orders are still showing up as "active" on the customer analytics dashboard. Orders that customers cancelled yesterday are appearing as "placed" in the Silver layer. You are assigned to investigate.
-- Step 1: confirm the discrepancy
-- Check order 9284751 (reported as wrong)
SELECT order_id, status, updated_at FROM production.orders
WHERE order_id = 9284751;
-- Returns: {order_id: 9284751, status: 'cancelled', updated_at: '2026-03-17 14:32:00'}
SELECT order_id, status, updated_at FROM silver.orders
WHERE order_id = 9284751;
-- Returns: {order_id: 9284751, status: 'placed', updated_at: '2026-03-17 08:14:00'}
-- Silver shows 'placed' from the morning run.
-- Source shows 'cancelled' since 14:32.
-- 6-hour gap. Why didn't the 15-minute incremental pick it up?
-- Step 2: check the watermark checkpoint
-- File: /data/checkpoints/orders_watermark.json
-- Contents: {"watermark": "2026-03-17T08:00:00+00:00"}
-- Watermark is from this MORNING! Has not advanced in 6 hours.
-- Step 3: check the incremental pipeline logs
tail -100 /var/log/airflow/orders_incremental_20260317.log | grep ERROR
-- 2026-03-17 08:15:42 ERROR Connection to source database timed out
-- 2026-03-17 08:15:42 ERROR Pipeline failed — checkpoint NOT advanced
-- (All subsequent runs also failed — Airflow retried but same DB issue)
-- 2026-03-17 14:00:00 INFO Database connection restored
-- 2026-03-17 14:00:02 INFO Loaded watermark: 2026-03-17T08:00:00+00:00
-- 2026-03-17 14:00:03 INFO Extracted 284,721 rows (updated 08:00 to 14:00)
-- 2026-03-17 14:00:47 INFO 284,721 rows upserted successfully
-- 2026-03-17 14:00:47 INFO Saved watermark: 2026-03-17T14:00:00+00:00
-- The pipeline recovered at 14:00 and processed the 6-hour backlog.
-- But the dashboard was still stale when the report was checked at 14:15
-- because the pipeline had just caught up and the analyst ran the query
-- while it was still processing.
-- Step 4: verify the fix
SELECT order_id, status FROM silver.orders WHERE order_id = 9284751;
-- Returns: {order_id: 9284751, status: 'cancelled'} ← correct now
-- Root cause: 6-hour DB connectivity failure → incremental fell behind
-- The incremental pattern correctly recovered using the saved watermark.
-- The 6-hour gap was recovered exactly — no data was missed, no duplicates.
-- This is checkpointing working correctly.The incident was not a bug in the ingestion pattern — it was a 6-hour source database outage. The incremental pattern with checkpointing recovered perfectly: it resumed exactly where it stopped, processed the backlog, and the Silver layer was correct within minutes of the database recovering.
This is the correct behaviour. A full load pattern would have required a full table scan after recovery (6 hours). A CDC pattern would have required WAL catch-up (fast, but Kafka retention must have covered the 6-hour gap). The incremental pattern recovered with no special handling — just the next scheduled run.
5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓Three ingestion patterns cover every source: Full Load (read everything, replace destination), Incremental High-Watermark (read only changed rows since last checkpoint), and CDC (read the database transaction log for every operation). Every source table fits one of these three.
- ✓Full load is the right choice for small reference tables (under 1 million rows), tables with no reliable change tracking, and tables where deletes must be reflected and CDC is too complex. Use the staging table swap variant to avoid the empty-table window that truncate-and-reload creates.
- ✓Incremental ingestion scales to billions of rows because extraction time is proportional to change volume, not total table size. It requires a reliable high-watermark column (updated_at is ideal). It cannot detect hard deletes — deleted rows are invisible to any query-based extraction.
- ✓CDC reads the database transaction log (WAL in PostgreSQL) to capture every INSERT, UPDATE, and DELETE as a structured event. It is the only pattern that captures hard deletes with the before-image of the deleted row. It requires wal_level=logical on PostgreSQL and a replication slot.
- ✓Watermark columns: updated_at (best — works for updates), created_at (only for insert-only tables), auto-increment PK (only for insert-only tables with sequential inserts). When none is available: CDC or full load.
- ✓The four incremental ingestion pitfalls: hard deletes are invisible, missing updated_at forces full load or CDC, clock skew between source and pipeline server can skip rows (fix: use source DB's NOW() as upper bound), and late-arriving updates miss the window (fix: overlap the lower bound by 30 minutes and upsert).
- ✓CDC infrastructure requires: wal_level=logical in postgresql.conf (requires DB restart), a dedicated replication user with REPLICATION privilege, a replication slot, and a Debezium connector publishing to Kafka. Always use Schema Registry with Debezium.
- ✓Replication slot monitoring is critical. An unmonitored slot on a high-write database can fill the server disk and crash the production database. Alert when lag exceeds 10 GB or 30 minutes. If a slot is stale and unrecoverable, drop it rather than risk disk full.
- ✓CDC provides at-least-once delivery — the same event can be delivered more than once on consumer restart. The destination must handle this idempotently with upserts and UNIQUE constraints on the business key. Never use plain INSERT with CDC.
- ✓Most production platforms use all three patterns simultaneously: full load for reference tables (nightly, fast), incremental for large transaction tables (every 15 minutes), and CDC for financial and customer tables where deletes matter (continuous). Match the pattern to the table's characteristics, not to a personal preference.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.