Change Data Capture (CDC) — Deep Dive
WAL internals, Debezium architecture, Schema Registry, the Outbox Pattern, event ordering, and operating CDC in production.
CDC From First Principles
Module 23 introduced CDC as one of three ingestion patterns. This module goes much deeper — the internal mechanics of the WAL, how Debezium turns binary log records into structured events, the Schema Registry contract that prevents breaking changes, the Outbox Pattern that solves dual-write consistency, event ordering guarantees and their limits, and what it actually takes to operate CDC reliably in production.
CDC is the most powerful and most complex ingestion pattern. It is powerful because it captures everything the database does — with no polling delay, no missed deletes, and the before-image of every changed row available for audit and historical analysis. It is complex because it introduces infrastructure (Kafka, Debezium, Schema Registry), operational requirements (WAL configuration, replication slot monitoring), and correctness challenges (event ordering, at-least-once delivery, schema evolution) that query-based ingestion patterns do not have.
The Write-Ahead Log — What It Is and How CDC Reads It
The Write-Ahead Log is PostgreSQL's durability guarantee. Every change to the database is recorded in the WAL before the actual data pages are modified. On a crash, PostgreSQL replays the WAL from the last checkpoint to recover all committed transactions. The WAL is an ordered, append-only, sequential log of every database operation — it is the authoritative record of everything the database has done.
CDC leverages the WAL not for crash recovery but for change streaming. Because the WAL already records every INSERT, UPDATE, and DELETE in precise order, it is the perfect source for a change stream. The challenge is reading it: the WAL is in a binary format designed for internal database use, not for external consumption. PostgreSQL's logical replication feature solves this by decoding the WAL into a structured, readable format.
WAL levels and logical replication
PostgreSQL WAL has three levels (wal_level setting):
MINIMAL:
Records only what is needed for crash recovery.
Does NOT record enough information for logical decoding.
Cannot be used for CDC.
This is the default in older PostgreSQL versions.
REPLICA:
Records enough for physical replication (exact byte-for-byte copy).
Still does NOT record enough for logical decoding.
Used for standby replicas, not CDC.
LOGICAL:
Records full before and after images of changed rows.
Includes enough information for logical decoding — exactly what CDC needs.
Required for Debezium and any WAL-based CDC tool.
Slight overhead: larger WAL files due to full row images.
Setting: wal_level = logical (requires PostgreSQL restart)
ADDITIONAL SETTINGS REQUIRED FOR CDC:
max_replication_slots = 10 # how many logical replication consumers allowed
max_wal_senders = 10 # parallel WAL streaming connections
wal_sender_timeout = 60000 # ms — close idle WAL sender connections
HOW LOGICAL DECODING WORKS:
PostgreSQL exposes two interfaces for logical decoding:
1. pg_logical_slot_get_changes() — pull-based SQL function
2. Streaming replication protocol — push-based (what Debezium uses)
Debezium connects as a logical replication client over the replication
protocol — exactly like a PostgreSQL standby would connect, but instead
of applying the WAL to a replica database, it decodes it into events.
LOG SEQUENCE NUMBER (LSN):
Every WAL record has an LSN — a monotonically increasing 64-bit integer
representing its position in the WAL stream.
Format in PostgreSQL: 0/1A3F2B8 (hexadecimal offset)
LSN is used for:
- Tracking how far the consumer has read (confirmed_flush_lsn in replication slot)
- Ordering events (lower LSN = happened earlier)
- Resuming after failure (start reading from last confirmed LSN)
Viewing current LSN:
SELECT pg_current_wal_lsn(); -- where we are now
SELECT pg_wal_lsn_diff(
pg_current_wal_lsn(),
confirmed_flush_lsn
) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot'; -- how far behind the consumer isWhat the WAL actually contains
APPLICATION EXECUTES:
BEGIN;
UPDATE orders
SET status = 'delivered', delivered_at = NOW()
WHERE order_id = 9284751;
COMMIT;
WAL RECORDS WRITTEN (binary, simplified representation):
Record 1: XLOG_HEAP_UPDATE
relation: 16423 (orders table OID)
old_ctid: (0, 47) -- physical location of old row
new_ctid: (0, 847) -- physical location of new row
old_tuple: [9284751, 'confirmed', NULL]
new_tuple: [9284751, 'delivered', '2026-03-17 20:14:32 UTC']
lsn: 0/1A3F2B8
xid: 847291 -- transaction ID
Record 2: XLOG_XACT_COMMIT
xid: 847291
commit_time: 2026-03-17 20:14:32.847 UTC
lsn: 0/1A3F2C4
HOW LOGICAL DECODING TRANSFORMS THIS:
1. Debezium reads binary WAL records via replication protocol
2. Applies the 'pgoutput' logical replication plugin (built into PostgreSQL)
3. pgoutput translates OIDs to table names, type-decodes raw bytes
4. Debezium wraps the decoded event in an envelope:
DECODED EVENT (JSON, published to Kafka):
{
"schema": { ... avro schema definition ... },
"payload": {
"before": {
"order_id": 9284751,
"status": "confirmed",
"delivered_at": null
},
"after": {
"order_id": 9284751,
"status": "delivered",
"delivered_at": 1710706472000000 -- microseconds since epoch
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "freshmart",
"ts_ms": 1710706472847,
"snapshot": "false",
"db": "freshmart_prod",
"sequence": "["0/1A3F2B4","0/1A3F2B8"]", -- [lastCommittedLsn, lsn]
"schema": "public",
"table": "orders",
"txId": 847291,
"lsn": 28434104,
"xmin": null
},
"op": "u", -- u=update, c=create, d=delete, r=read/snapshot
"ts_ms": 1710706472901, -- when Debezium processed this event
"transaction": null
}
}
KEY FIELDS FOR DATA ENGINEERS:
op: the operation type
before: row values BEFORE the change (null for inserts)
after: row values AFTER the change (null for deletes)
source.lsn: position in WAL — use for ordering and dedup
source.txId: transaction ID — group multi-table ops from same txn
source.ts_ms: event time at source (commit time)
ts_ms: processing time (when Debezium emitted to Kafka)Debezium — Architecture and Lifecycle
Debezium is a distributed platform for CDC built on top of Apache Kafka Connect. It provides production-ready connectors for PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, and others. Understanding how Debezium works internally — not just how to configure it — is what lets you diagnose problems, tune performance, and design CDC pipelines that behave correctly under failure.
Debezium component architecture
DEBEZIUM ARCHITECTURE:
SOURCE DB KAFKA CONNECT KAFKA CONSUMERS
─────────────────────────────────────────────────────────────────────────
PostgreSQL ←── Debezium → Topic: Spark
(primary) PostgreSQL freshmart.cdc Streaming
Connector .public.orders
(Kafka Connect Topic: Python
Replication ←── Worker process) → freshmart.cdc Consumer
Slot .public.customers
Schema Registry → Topic: Flink
(sidecar) freshmart.cdc
.public.payments
KAFKA CONNECT:
- Distributed worker framework that runs connector plugins
- Scales horizontally — multiple workers share connector load
- Stores connector offsets (LSN position) in a Kafka topic
(not in the source database — offset is in _connect-offsets topic)
- REST API: POST/GET/DELETE connectors, check status, restart
DEBEZIUM POSTGRESQL CONNECTOR:
- Connects to PostgreSQL as a logical replication client
- Reads WAL events via streaming replication protocol
- Decodes binary WAL records using pgoutput plugin
- Publishes one Kafka message per database row change
- Kafka message key: primary key of the changed row (for partition routing)
- Kafka message value: the full event envelope (before/after/source)
KAFKA TOPIC NAMING:
Default pattern: {server.name}.{schema}.{table}
Example: freshmart.public.orders
One topic per table (recommended) — allows independent consumer groups
Alternatively: route all tables to one topic (harder to manage)
OFFSET STORAGE:
Debezium stores its WAL position (LSN) in a Kafka topic,
NOT in the source PostgreSQL replication slot alone.
Two records of position:
1. confirmed_flush_lsn in PostgreSQL replication slot
(what PostgreSQL knows the consumer has processed)
2. Kafka Connect offset (internal __connect-offsets topic)
(what Kafka Connect tracks as the connector's position)
On restart: Debezium resumes from the Kafka Connect offset,
tells PostgreSQL slot to start streaming from that LSN.
DELIVERY GUARANTEE:
At-least-once: Debezium commits to Kafka before advancing the
replication slot. On crash-restart, Debezium may re-emit the
last batch of events (since the Kafka offset was not yet confirmed).
Consumer MUST handle duplicates idempotently.Connector configuration — the critical parameters
{
"name": "freshmart-orders-cdc",
"config": {
// ── Connector class ───────────────────────────────────────────────
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// ── Source database connection ────────────────────────────────────
"database.hostname": "postgres-primary.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/secrets/debezium.properties:db.password}",
"database.dbname": "freshmart_prod",
"database.server.name": "freshmart", // prefix for Kafka topic names
// ── Table selection (include/exclude) ────────────────────────────
"table.include.list": "public.orders,public.customers,public.payments",
// OR exclude system tables:
"table.exclude.list": "public.schema_migrations,public.sessions",
// ── Column filtering (exclude PII from specific tables) ───────────
"column.exclude.list": "public.customers.raw_phone,public.customers.ssn",
// Excluded columns appear as null in CDC events — use for PII fields
// that should not flow through Kafka
// ── Logical replication plugin ────────────────────────────────────
"plugin.name": "pgoutput", // built into PostgreSQL 10+
"slot.name": "debezium_freshmart",
"publication.name": "dbz_freshmart_pub",
// ── Snapshot configuration ───────────────────────────────────────
"snapshot.mode": "initial", // initial | never | schema_only | always
"snapshot.isolation.mode": "serializable", // consistent snapshot
// ── Kafka topic configuration ─────────────────────────────────────
"topic.prefix": "freshmart.cdc",
// Results in topics: freshmart.cdc.public.orders
// freshmart.cdc.public.customers
// ── Serialization (use Avro + Schema Registry in production) ─────
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
// ── Heartbeat (prevents slot lag during low-write periods) ────────
"heartbeat.interval.ms": "30000", // emit heartbeat every 30 seconds
// Without heartbeat: on low-write tables, the replication slot LSN
// never advances, WAL accumulates, slot lag appears to grow forever.
// Heartbeat emits a WAL record every N ms to advance the confirmed LSN.
// ── Event flattening (simplify event structure) ────────────────────
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
// ExtractNewRecordState flattens the envelope:
// BEFORE: {before: {...}, after: {...}, op: "u", source: {...}}
// AFTER: {order_id: 9284751, status: "delivered", __op: "u", __lsn: ...}
// Simpler for consumers but loses the before image
// ── Tombstone handling (for deletes) ──────────────────────────────
"tombstones.on.delete": "true",
// After a DELETE event, Debezium emits a tombstone (null-value message)
// with the same key. Kafka uses tombstones to compact deleted records.
// Consumers must handle null value messages without crashing.
// ── Decimal handling ──────────────────────────────────────────────
"decimal.handling.mode": "string",
// Options: precise (Avro Decimal), double (lossy), string (safe for all consumers)
// Use "string" unless consumers can handle Avro Decimal type correctly
// ── Interval handling ─────────────────────────────────────────────
"interval.handling.mode": "string"
}
}Schema Registry — Why It Is Non-Negotiable in Production
Schema Registry solves the versioning problem that makes CDC brittle without it. When the source table's schema changes — a column is added, a type changes, a column is renamed — every Kafka message format changes with it. Without Schema Registry, consumers that were written to parse the old format crash on the new format. With Schema Registry, schema evolution is managed through a central contract, and compatibility rules enforce what changes are allowed.
How Schema Registry works
SCHEMA REGISTRY ROLE:
PRODUCER (Debezium) SCHEMA REGISTRY CONSUMER (Spark/Python)
──────────────────────────────────────────────────────────────────────────────────────
Detects orders table has:
order_id: INT8
status: VARCHAR
amount: DECIMAL
Registers Avro schema → POST /subjects/freshmart.cdc.public.orders-value/versions
Receives schema_id: 42
Serializes message:
[magic_byte=0][schema_id=42][avro_bytes...]
→ Publishes to Kafka topic
Stores schema:
ID 42 → Avro schema for orders v1
Consumer reads Kafka message:
Sees [magic_byte=0][schema_id=42][...]
Fetches schema 42 from registry
Deserializes avro_bytes using schema 42
Gets: {order_id: 9284751, status: "delivered", amount: 380.00}
AVRO SCHEMA FOR ORDERS TABLE (simplified):
{
"type": "record",
"name": "orders",
"namespace": "freshmart.cdc.public",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "status", "type": ["null", "string"], "default": null},
{"name": "amount", "type": {"type": "bytes", "logicalType": "decimal",
"precision": 10, "scale": 2}},
{"name": "created_at", "type": ["null", "long"], "default": null,
"logicalType": "timestamp-micros"}
]
}
SCHEMA EVOLUTION — ADDING A COLUMN:
Source DBA adds: ALTER TABLE orders ADD COLUMN delivery_fee DECIMAL(6,2) DEFAULT 0;
Debezium detects the schema change from the WAL DDL record.
New schema registers as version 2 (schema_id: 43):
Added field: {"name": "delivery_fee", "type": ["null", "float"], "default": null}
COMPATIBILITY CHECK:
Registry checks: is schema v2 backward-compatible with v1?
Backward compatibility: new schema can READ messages written by old schema.
Rule: adding a field with a default value is backward-compatible. ✓
Rule: removing a required field is NOT backward-compatible. ✗
CONSUMER BEHAVIOUR:
Consumer with schema v1 reads message written with schema v2:
→ delivery_fee field is unknown → ignored (backward-compatible)
Consumer with schema v2 reads message written with schema v1:
→ delivery_fee field missing → uses default null (backward-compatible)
Both consumers continue working without redeployment.
COMPATIBILITY MODES (configurable per subject):
BACKWARD (default):
New schema can read data produced by previous schema.
Allows: adding fields with defaults, removing fields without defaults.
Prevents: adding required fields, changing types.
Best for: consumers that need to read old messages.
FORWARD:
Previous schema can read data produced by new schema.
Opposite of backward — protects old consumers from new producers.
FULL:
Both backward and forward. Most restrictive. Only additive changes.
Best for: strict production environments.
NONE:
No compatibility checking. Any change allowed.
Use only in development.Schema Registry in practice
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import json
# AvroConsumer handles schema fetching and Avro deserialization automatically:
consumer = AvroConsumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'freshmart-cdc-silver-writer',
'schema.registry.url': 'http://schema-registry:8081',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # manual commit for at-least-once
})
consumer.subscribe([
'freshmart.cdc.public.orders',
'freshmart.cdc.public.payments',
])
def process_event(msg) -> None:
"""Process one CDC event with schema-aware deserialization."""
if msg.value() is None:
# Tombstone event (after a delete) — value is null
# Key still contains the primary key of the deleted record
key = msg.key()
handle_tombstone(key['order_id'] if key else None)
return
event = msg.value() # AvroConsumer deserialized using Schema Registry
op = event.get('__op') or event.get('op')
# Route by operation type:
if op in ('c', 'r'): # create or snapshot read
upsert_to_silver(event)
elif op == 'u': # update
upsert_to_silver(event)
elif op == 'd': # delete
# With ExtractNewRecordState transform: event contains old values + __deleted=true
mark_deleted_in_silver(event.get('order_id'))
# Idempotency key: use source LSN for dedup
# If we process the same event twice (at-least-once), upsert handles it
while True:
try:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: ${msg.error()}')
continue
process_event(msg)
consumer.commit() # commit AFTER successful processing
except SerializerError as e:
# Schema deserialization failed — schema incompatibility
print(f'Schema error: ${e}')
# Do NOT commit — message will be redelivered
# Alert: schema compatibility issue needs investigationThe Outbox Pattern — Reliable Event Publishing Without Dual-Write
The Outbox Pattern is one of the most elegant applications of CDC in microservices architecture. It solves a fundamental distributed systems problem: how do you atomically update a database and publish an event to Kafka, when these are two different systems that cannot participate in a single transaction?
The dual-write problem
THE PROBLEM: payment service must update DB AND publish event
NAIVE APPROACH (incorrect):
BEGIN;
UPDATE payments SET status = 'captured', captured_at = NOW()
WHERE payment_id = 'pay_xxx';
COMMIT; -- Step 1: DB write succeeds
producer.send('payments.captured', event) -- Step 2: Kafka publish
FAILURE MODES:
A) DB write succeeds, then process crashes before Kafka publish
→ DB has status='captured'
→ Kafka has no event
→ Downstream services (order fulfillment, analytics) never notified
→ Inconsistency between payment status and downstream systems
B) Kafka publish succeeds, then DB write fails (rolled back)
→ Kafka has event saying payment captured
→ DB shows payment still pending
→ Downstream services fulfill an order that was not paid for
→ Financial inconsistency — potentially serious
TWO-PHASE COMMIT (XA Transactions)?
Theoretically solves this but:
→ Most modern systems do not support XA
→ XA is slow (requires synchronisation between DB and Kafka)
→ Kafka does not participate in XA transactions
→ Not practical in high-throughput production systemsThe Outbox Pattern — the correct solution
THE OUTBOX PATTERN:
Key insight: the database is the single source of truth.
Instead of writing to DB AND Kafka, write everything to DB,
and use CDC to reliably deliver the event to Kafka.
STEP 1: Create an outbox table (in the application database):
CREATE TABLE outbox_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(50) NOT NULL, -- 'order', 'payment', etc.
aggregate_id VARCHAR(50) NOT NULL, -- the entity's ID
event_type VARCHAR(100) NOT NULL, -- 'PaymentCaptured', etc.
payload JSONB NOT NULL, -- event data
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Index for processing order:
CREATE INDEX idx_outbox_created ON outbox_events(created_at);
STEP 2: Application writes to both payments AND outbox atomically:
BEGIN;
-- Update the actual business table:
UPDATE payments
SET status = 'captured', captured_at = NOW()
WHERE payment_id = 'pay_xxx';
-- Write the event to the outbox (same transaction!):
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (
'payment',
'pay_xxx',
'PaymentCaptured',
'${{"payment_id": "pay_xxx", "amount": 38000, "currency": "INR",
"merchant_id": "merch_001", "captured_at": "2026-03-17T20:14:32Z"}}'
);
COMMIT;
-- Both succeed or both fail — no inconsistency possible
STEP 3: Debezium reads the outbox_events table via CDC:
Debezium configuration (Outbox Event Router transform):
{
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "event_id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}"
}
This routes events to Kafka topics by aggregate_type:
'payment' events → Kafka topic: outbox.payment
'order' events → Kafka topic: outbox.order
STEP 4: Consumers process events from outbox Kafka topics.
STEP 5 (optional): Delete processed outbox rows periodically:
DELETE FROM outbox_events WHERE created_at < NOW() - INTERVAL '7 days';
-- Or use a separate cleanup job — the outbox is not an event store,
-- Kafka is. Outbox rows are only needed until CDC reads them.
WHY THIS WORKS:
The outbox INSERT is in the same transaction as the business logic update.
If the transaction commits: both payment update AND outbox event exist in DB.
If the transaction rolls back: neither exists.
CDC reads the outbox and publishes to Kafka — CDC provides at-least-once delivery.
Downstream consumers handle duplicates with idempotency keys (event_id).
Result: exactly-once business semantics from at-least-once delivery + idempotency.Event Ordering — What CDC Guarantees and Where It Breaks Down
Event ordering is one of the most commonly misunderstood aspects of CDC. Many engineers assume that because CDC reads the WAL sequentially, all events arrive at consumers in the order they happened. This is true within a single table partition, but breaks down in specific scenarios that cause subtle bugs in CDC consumers.
WHAT CDC GUARANTEES:
Within a single Kafka partition → events are strictly ordered by LSN.
For a given primary key → all events for that row go to the same partition
(because Kafka partition key = row's primary key by default).
Therefore: all changes to order_id = 9284751 are ordered correctly.
WHAT CDC DOES NOT GUARANTEE:
1. ORDERING ACROSS TABLES:
Event A (orders table, LSN 1000) and Event B (customers table, LSN 1001)
may arrive in different Kafka partitions and be consumed out of order
if the consumers for those topics run at different speeds.
Example: A payment is captured (payments topic, LSN 5000) and the
order status updated (orders topic, LSN 5001). A consumer that reads
both topics may process the order update before the payment capture
depending on consumer lag per topic.
2. ORDERING ACROSS KAFKA PARTITIONS:
A table with high write volume may have multiple Kafka partitions.
Events for different primary keys go to different partitions.
Events within each partition are ordered, but between partitions they are not.
order_id 9284751 (partition 0) and order_id 9284752 (partition 1) events
may arrive interleaved in any order at a consumer reading both partitions.
3. SNAPSHOT + STREAM ORDERING:
During initial snapshot, Debezium emits all existing rows as op=r events.
Streaming changes begin from the snapshot LSN.
But: the snapshot may take hours. During snapshot, the source table is
being modified. The snapshot reads a consistent point-in-time view,
then streaming catches up. Between snapshot completion and stream catchup,
there is a window where the consumer has a mix of:
- Snapshot rows (as of snapshot start time)
- Stream changes (from snapshot start LSN forward)
The consumer must handle this: upsert semantics reconcile both correctly.
PRACTICAL IMPLICATION FOR DATA ENGINEERS:
SAFE PATTERN — process one table at a time, in order:
consumer reads freshmart.cdc.public.orders one partition at a time
→ events for same order are ordered, downstream safe
UNSAFE PATTERN — join across CDC streams in the consumer:
consumer reads orders AND payments topic simultaneously
tries to join: "when payment captured, also update order status"
→ ordering not guaranteed across topics
→ race condition: order update arrives before payment capture in consumer
CORRECT PATTERN for cross-table consistency:
Let each CDC stream write to its own Silver table independently.
Let dbt handle the join in a SQL model.
SQL joins are order-independent — dbt reads whatever is in both tables
at the time the model runs.
Never try to implement cross-table joins in the CDC consumer layer.Transaction boundaries — atomicity from source to consumer
PROBLEM: A single source transaction updates two tables.
The consumer may see the two events separately and process them
out of order or independently, violating the intended atomicity.
SOURCE TRANSACTION:
BEGIN;
INSERT INTO orders (order_id, status) VALUES (9284753, 'placed');
INSERT INTO order_items (order_id, product_id, qty) VALUES (9284753, 42, 2);
COMMIT; -- both rows committed atomically
CDC EVENTS (Debezium emits):
{op: "c", table: "orders", txId: 847292, ...}
{op: "c", table: "order_items", txId: 847292, ...}
Note: both events share txId: 847292.
They may be on different Kafka topics/partitions.
SOLUTION: Debezium transaction metadata support
Enable transaction metadata topic:
{
"provide.transaction.metadata": "true",
"transaction.metadata.factory": "io.debezium.pipeline.txmetadata..."
}
Debezium emits an additional event on the transaction metadata topic:
BEGIN event: {txId: 847292, event_count: 2}
COMMIT event: {txId: 847292, event_count: 2, data_collections: [...]}
Consumer can use this to:
- Know exactly how many events belong to transaction 847292
- Buffer events until all events in the transaction are received
- Process them atomically together or skip if incomplete
SIMPLER APPROACH for analytics:
Don't try to preserve transaction boundaries at the consumer layer.
Write each event to its own Silver table with upsert semantics.
Run dbt models that JOIN across Silver tables — dbt sees a consistent
snapshot of all Silver tables at query time.
This is usually sufficient for analytical use cases.Schema Evolution — Handling DDL Changes Mid-Stream
The source database schema changes over time. Tables gain new columns, columns are renamed, types are widened. CDC pipelines must handle these changes gracefully — a schema change that causes Debezium or a consumer to crash is an operational incident, not an expected upgrade.
SCENARIO 1: ADD A NEW COLUMN (backward-compatible)
Source: ALTER TABLE orders ADD COLUMN delivery_fee DECIMAL(6,2) DEFAULT 0;
Debezium behaviour:
- Detects DDL change from WAL
- Registers new Avro schema (adds delivery_fee field with null default)
- New schema version: backward-compatible with previous schema
- Schema Registry accepts the new version (compatibility check passes)
Messages BEFORE the DDL: delivery_fee field absent in Avro, consumers use default null
Messages AFTER the DDL: delivery_fee field present with value
Consumer impact:
- Old consumers (schema v1): see delivery_fee as null (backward-compatible) ✓
- New consumers (schema v2): see correct delivery_fee value ✓
- NO consumer crash, NO redeployment required for existing consumers
Data warehouse impact:
- dbt Silver model needs updating to SELECT delivery_fee
- Run: ALTER TABLE silver.orders ADD COLUMN delivery_fee DECIMAL(6,2);
- Or: dbt run --full-refresh silver.orders (recreates table with new schema)
SCENARIO 2: RENAME A COLUMN (breaking change)
Source: ALTER TABLE orders RENAME COLUMN order_amount TO amount;
Debezium behaviour:
- Old schema had "order_amount" field
- New schema has "amount" field
- Removing "order_amount" WITHOUT a default: NOT backward-compatible!
- Schema Registry REJECTS this schema if compatibility mode = BACKWARD
How Debezium handles it:
- Emits the column under the NEW name in post-DDL events
- Pre-DDL events remain with the old column name in Kafka
- If Schema Registry blocks: connector pauses, requires manual intervention
SAFE MIGRATION APPROACH (avoid breaking changes):
1. Add the NEW column: ALTER TABLE orders ADD COLUMN amount DECIMAL(10,2);
2. Write to both columns temporarily (application change)
3. Backfill: UPDATE orders SET amount = order_amount WHERE amount IS NULL;
4. Consumer updated to read "amount" column
5. Drop old column: ALTER TABLE orders DROP COLUMN order_amount;
6. Update Debezium schema: new schema version removes order_amount
This staged migration takes longer but never breaks the pipeline.
SCENARIO 3: CHANGE A COLUMN TYPE (potentially breaking)
Source: ALTER TABLE orders ALTER COLUMN status TYPE VARCHAR(50);
-- was VARCHAR(20)
Impact: Widening (VARCHAR(20) → VARCHAR(50)) is usually safe.
Values that were valid VARCHAR(20) are still valid VARCHAR(50).
Debezium emits strings — the type width change is transparent.
Narrowing (VARCHAR(50) → VARCHAR(20)) is dangerous.
Existing data may violate the new constraint.
Debezium / Schema Registry may reject this if truncation is detected.
DEBEZIUM SCHEMA CHANGE EVENT:
When Debezium detects a DDL event, it emits a schema change event
to a separate topic: {server.name}.schema-changes.{database}
{
"source": {"db": "freshmart_prod"},
"historyRecord": {
"ddl": "ALTER TABLE orders ADD COLUMN delivery_fee DECIMAL(6,2) DEFAULT 0",
"tableChanges": [
{
"type": "ALTER",
"id": "public.orders",
"table": {
"columns": [
... full new column list ...
]
}
}
]
}
}
This topic is used by Debezium internally for schema history.
Also useful for: auditing schema changes, alerting when unexpected DDL occurs.Landing CDC Events in a Data Lake — Efficiently
CDC events arrive as a continuous high-throughput stream. Writing each event individually to S3 or a data lake creates a massive small-file problem. The standard architecture uses Kafka as a buffer and a micro-batch consumer to land batches of events efficiently.
CDC LANDING ARCHITECTURE:
PostgreSQL → Debezium → Kafka → [Spark Streaming / Flink] → Delta Lake
(micro-batch consumer) (Bronze layer)
WHY NOT WRITE EACH EVENT DIRECTLY TO S3:
At 10,000 events/second: 10,000 S3 PUT requests/second
Each Parquet file: ~1 KB (tiny — the small file problem at its worst)
After 1 hour: 36,000,000 tiny files — unusable for analytics
CORRECT APPROACH: buffer in Kafka, write batches to Delta Lake
SPARK STRUCTURED STREAMING (micro-batch, 5-minute trigger):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp
from pyspark.sql.types import StructType, LongType, StringType, TimestampType
spark = SparkSession.builder .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .getOrCreate()
# Read from Kafka (raw bytes):
raw = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "freshmart.cdc.public.orders") .option("startingOffsets", "latest") .option("maxOffsetsPerTrigger", 500_000) # limit batch size
.load()
# Parse Avro (with Schema Registry):
# from_avro() with Schema Registry URL handles deserialization
orders_schema = StructType() .add("order_id", LongType()) .add("status", StringType()) .add("amount", StringType()) # decimal as string
.add("updated_at", LongType()) # microseconds epoch
.add("__op", StringType()) # operation type
.add("__lsn", LongType()) # WAL LSN for ordering
parsed = raw .select(from_json(col("value").cast("string"), orders_schema).alias("e")) .select("e.*") .withColumn("ingested_at", current_timestamp())
# Write to Delta Lake using MERGE for idempotent upserts:
def upsert_to_delta(batch_df, batch_id):
from delta.tables import DeltaTable
if DeltaTable.isDeltaTable(spark, "s3://freshmart-lake/bronze/orders"):
delta_table = DeltaTable.forPath(spark, "s3://freshmart-lake/bronze/orders")
delta_table.alias("target").merge(
batch_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute()
else:
batch_df.write.format("delta") .mode("overwrite") .partitionBy("date") .save("s3://freshmart-lake/bronze/orders")
query = parsed.writeStream .foreachBatch(upsert_to_delta) .option("checkpointLocation", "s3://freshmart-lake/checkpoints/orders_cdc") .trigger(processingTime="5 minutes") .start()
query.awaitTermination()
HANDLING DELETE EVENTS IN THE DATA LAKE:
Option A: Soft-delete flag
Store __op and __deleted fields in Delta Lake.
dbt Silver model: WHERE __op != 'd' OR __deleted != true
Preserves full history (useful for audit).
Option B: Hard delete via Delta MERGE DELETE clause
.whenMatchedDelete(condition="source.__op = 'd'")
Removes row from Delta Lake — cleaner but loses history.
Option C: Separate deleted records table
Route delete events to bronze.orders_deletes table.
dbt Silver: EXCEPT customer_ids IN (SELECT order_id FROM bronze.orders_deletes)
Useful when you want both full history and current state.
COMPACTION SCHEDULE:
CDC streams create many small Delta files per 5-minute trigger.
Run OPTIMIZE daily:
OPTIMIZE delta.`s3://freshmart-lake/bronze/orders`
WHERE date >= current_date - 7;Operating CDC in Production — The Full Runbook
CDC is the most operationally demanding ingestion pattern. Query-based pipelines are stateless — a failure just means the next run processes more rows. CDC has persistent state (replication slots, Kafka offsets, consumer group positions) that must be monitored and managed. A CDC deployment without a monitoring and runbook plan is a production incident waiting to happen.
# ── POSTGRESQL MONITORING QUERIES ────────────────────────────────────────────
-- Replication slot lag (most critical metric):
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) / 1024 / 1024
AS lag_mb,
now() - pg_last_xact_replay_timestamp() AS replication_delay
FROM pg_replication_slots;
-- ALERT thresholds:
-- lag_mb > 1024 (1 GB): WARNING — consumer is falling behind
-- lag_mb > 10240 (10 GB): CRITICAL — risk of disk fill, investigate immediately
-- active = false: CRITICAL — slot exists but consumer is disconnected
-- WAL disk usage:
SELECT pg_size_pretty(sum(size)) AS wal_disk_usage
FROM pg_ls_waldir();
-- Active replication connections:
SELECT application_name, state, sent_lsn, write_lsn, flush_lsn,
replay_lsn, sync_state
FROM pg_stat_replication;
# ── DEBEZIUM / KAFKA CONNECT MONITORING ───────────────────────────────────────
import requests
# Check connector status:
response = requests.get('http://kafka-connect:8083/connectors/freshmart-orders-cdc/status')
status = response.json()
# Expected: {"connector": {"state": "RUNNING"}, "tasks": [{"state": "RUNNING", ...}]}
# Alert if state != "RUNNING"
# List all connectors and their status:
response = requests.get('http://kafka-connect:8083/connectors?expand=status')
# Pause/resume connector (for maintenance):
requests.put('http://kafka-connect:8083/connectors/freshmart-orders-cdc/pause')
requests.put('http://kafka-connect:8083/connectors/freshmart-orders-cdc/resume')
# Restart a failed task:
requests.post('http://kafka-connect:8083/connectors/freshmart-orders-cdc/tasks/0/restart')
# ── KAFKA CONSUMER LAG MONITORING ────────────────────────────────────────────
# Using kafka-consumer-groups.sh:
# kafka-consumer-groups.sh --bootstrap-server kafka:9092 # --describe --group freshmart-cdc-silver-writer
#
# Output shows lag per partition:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# freshmart.cdc.public.orders 0 1847291 1847301 10
# freshmart.cdc.public.orders 1 2039847 2039847 0
# Using Python (confluent-kafka):
from confluent_kafka.admin import AdminClient
admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
offsets = admin.list_consumer_group_offsets(['freshmart-cdc-silver-writer'])
# ALERT if any partition lag > 10,000 events or > 5 minutes of events
# ── AUTOMATED MONITORING SCRIPT ───────────────────────────────────────────────
import psycopg2, requests, time
def check_cdc_health(pg_conn_str: str, kafka_connect_url: str) -> list[str]:
"""Check CDC health, return list of alert messages."""
alerts = []
# Check replication slot lag:
with psycopg2.connect(pg_conn_str) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT slot_name, active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) / 1024 / 1024
FROM pg_replication_slots
""")
for slot_name, active, lag_mb in cur.fetchall():
if not active:
alerts.append(f'CRITICAL: Slot ${slot_name} is INACTIVE')
elif lag_mb and lag_mb > 10240:
alerts.append(f'CRITICAL: Slot ${slot_name} lag = ${lag_mb:.0f} MB')
elif lag_mb and lag_mb > 1024:
alerts.append(f'WARNING: Slot ${slot_name} lag = ${lag_mb:.0f} MB')
# Check connector status:
resp = requests.get(f'${kafka_connect_url}/connectors?expand=status', timeout=10)
for name, info in resp.json().items():
state = info['status']['connector']['state']
if state != 'RUNNING':
alerts.append(f'CRITICAL: Connector ${name} state = ${state}')
for task in info['status']['tasks']:
if task['state'] != 'RUNNING':
alerts.append(f'CRITICAL: Connector ${name} task ${task["id"]} = ${task["state"]}')
return alertsRecovery runbook — what to do when CDC breaks
FAILURE 1: Connector FAILED state
Symptom: connector status shows FAILED
Likely cause: PostgreSQL connectivity issue, schema change, credential expiry
Recovery:
1. Check connector error: GET /connectors/{name}/status
→ error_trace field shows the specific exception
2. Fix the root cause (restore DB connection, update credential, etc.)
3. Restart the failed task: POST /connectors/{name}/tasks/0/restart
4. If task keeps failing: DELETE and recreate the connector
(may require re-snapshot if offset is incompatible)
FAILURE 2: Replication slot lag growing (consumer slow)
Symptom: lag_mb increasing, not decreasing
Risk: if lag grows indefinitely, source disk fills
Recovery:
1. Identify bottleneck: is Kafka consumer or Kafka broker slow?
Check: kafka-consumer-groups.sh --describe --group {group}
2. If consumer is slow: scale up consumer group (add instances)
3. If Kafka is the bottleneck: increase topic partitions
4. If unrecoverable: temporarily increase consumer batch size
maxOffsetsPerTrigger (Spark) or fetch.max.bytes (plain consumer)
5. Alert escalation: if lag > 50 GB with no improvement in 30 min:
Consider dropping slot (accepts data loss) to protect source DB
FAILURE 3: Consumer group offset behind Kafka retention
Symptom: Kafka error: "Offset 0 is not available, earliest is 48000000"
Cause: Consumer was paused/stopped for > Kafka retention period
Kafka deleted old messages, consumer offset is now before earliest
Recovery:
1. Reset consumer group offset to earliest available:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group {group} --topic {topic} --reset-offsets --to-earliest --execute
2. Delete the Debezium connector checkpoint (so it re-snapshots from current)
3. Or: if destination can tolerate re-processing, reset to beginning
and re-process everything (upserts handle idempotency)
FAILURE 4: PostgreSQL primary failover (switchover to replica)
Symptom: Debezium loses connection to old primary, new primary is promoted
Recovery:
1. Update Debezium connector database.hostname to new primary
(use a DNS alias like postgres.internal that points to current primary)
2. Restart connector with new hostname config
3. Debezium resumes from last confirmed LSN on new primary
Note: WAL LSN sequence resets if replica was not fully caught up to old primary
Some events may be replayed or lost — use heartbeats to detect this
FAILURE 5: Schema change breaks consumer
Symptom: consumer throws SerializerError or NullPointerException on new events
Cause: source table schema changed, Schema Registry rejected incompatible schema
OR consumer code does not handle new field
Recovery:
1. Check Schema Registry: GET /subjects/{topic}-value/versions/latest
2. If schema was accepted: update consumer code to handle new field
3. If schema was rejected by Registry: investigate why
(breaking change — column removed without default, type narrowed)
Work with source team to do a safe migration insteadImplementing CDC for GDPR Compliance — The Right Way
FreshMart's legal team informs the data engineering team that under GDPR (applicable to users in the EU), customers can request deletion of their personal data. When a deletion request is processed by the application team, the customer row in PostgreSQL is hard-deleted. The data engineering team has 30 days to ensure that data is also removed from the analytics warehouse and all downstream systems.
The incremental ingestion pipeline currently running on the customers table uses an updated_at watermark and cannot detect hard deletes. Silver and Gold tables in Snowflake still contain data for deleted customers. This is a compliance violation.
The solution: CDC on the customers table.
STEP 1: Switch customers table ingestion from incremental to CDC.
Configure Debezium to capture the customers table.
Create consumer that writes to bronze.customers (Delta Lake).
Decommission the incremental Python pipeline.
STEP 2: Handle DELETE events in the consumer:
def process_customer_event(event: dict) -> None:
op = event.get('__op') or event.get('op')
if op == 'd':
# Customer deletion — GDPR erasure
customer_id = event.get('customer_id') or (event.get('before') or {}).get('customer_id')
# 1. Delete from bronze:
spark.sql(f"""
DELETE FROM delta.`s3://freshmart-lake/bronze/customers`
WHERE customer_id = ${customer_id}
""")
# 2. Record in erasure log for audit:
log_erasure(customer_id, reason='gdpr_deletion',
erased_at=datetime.now(timezone.utc))
# 3. Publish erasure event for downstream systems:
publish_to_kafka('customer.erasure', {
'customer_id': customer_id,
'erased_at': datetime.now(timezone.utc).isoformat(),
})
elif op in ('c', 'u', 'r'):
upsert_to_bronze(event)
STEP 3: dbt Silver model respects deletions:
-- models/silver/customers.sql
WITH bronze AS (
SELECT * FROM ${ref('bronze_customers')}
),
-- Exclude any customer IDs in the erasure log:
erased AS (
SELECT customer_id FROM ${ref('erasure_log')}
WHERE erased_at >= DATEADD('day', -30, CURRENT_DATE)
)
SELECT b.*
FROM bronze b
LEFT JOIN erased e USING (customer_id)
WHERE e.customer_id IS NULL -- exclude erased customers
STEP 4: Cascade deletions to Gold tables:
dbt run --select +customers+ -- re-run all models downstream of customers
This rebuilds Gold tables without deleted customer data.
STEP 5: Purge PII from Kafka (important!):
Kafka retains messages for the configured retention period.
CDC events for a deleted customer contain PII in the 'before' field.
Use Kafka's tombstone mechanism:
- Producer sends null-value message with same key (customer_id)
- When topic compaction runs, tombstone removes all older messages
with that key
Or: use Kafka Schema Registry + Confluent's data masking feature
to field-level encrypt PII in CDC events.
STEP 6: Verify compliance:
SELECT COUNT(*) FROM silver.customers WHERE customer_id = ${deleted_id};
-- Must return 0
SELECT COUNT(*) FROM gold.customer_ltv WHERE customer_id = ${deleted_id};
-- Must return 0
Result: GDPR erasure complete within minutes of the application deletion,
well within the 30-day compliance window.This pattern — CDC enabling reliable hard-delete propagation — is one of the most common business-driven reasons for adopting CDC beyond analytics performance. Incremental ingestion simply cannot support right-to-erasure requirements. CDC is the only query-based ingestion alternative to full periodic reloads for propagating deletions.
5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓CDC reads the database WAL (Write-Ahead Log) — the database's own durability record. Every INSERT, UPDATE, and DELETE committed to PostgreSQL is recorded in the WAL before data pages are modified. Logical decoding converts binary WAL records into structured events with before/after images and operation type.
- ✓PostgreSQL requires wal_level=logical for CDC. This cannot be changed on a running database — it requires a restart. It also requires creating a replication slot (which retains WAL until the consumer advances its LSN), a dedicated replication user, and a publication for the tables to capture.
- ✓Debezium is a Kafka Connect plugin that connects to PostgreSQL as a logical replication client. It decodes WAL events and publishes them to Kafka with the schema embedded. The connector stores its position (LSN) in a Kafka offset topic. On restart, it resumes from the last confirmed offset.
- ✓Schema Registry is non-negotiable in production. It registers Avro schemas and enforces compatibility rules, preventing source schema changes from silently breaking consumers. BACKWARD compatibility (new schema can read old messages) is the correct default — it allows adding fields with defaults without redeploying consumers.
- ✓The Outbox Pattern solves dual-write consistency. The application writes to both the business table and an outbox table in one transaction. CDC reads the outbox and publishes to Kafka. This gives exactly-once business semantics from at-least-once infrastructure, with idempotency at the consumer layer handling duplicate deliveries.
- ✓CDC guarantees ordering within a single Kafka partition (events for the same primary key are always ordered). CDC does not guarantee ordering across partitions or across tables. Never implement cross-table join logic in the CDC consumer — let dbt handle it in SQL, which is order-independent.
- ✓The before image in an UPDATE event contains the row's previous values. PostgreSQL only records full before images when REPLICA IDENTITY FULL is set on the table. By default only primary key columns appear in the before image. Enable FULL replica identity for tables where you need audit trails or undo capabilities.
- ✓Replication slot monitoring is the most critical operational concern. Alert at 1 GB lag (warning) and 10 GB lag (critical). An unmonitored slot can fill the source database disk and crash the production application. Set heartbeat.interval.ms in Debezium to ensure the slot advances even during low-write periods.
- ✓CDC provides at-least-once delivery — events may be delivered more than once on connector restart. Every CDC consumer must use upsert (not INSERT) at the destination, with a UNIQUE constraint on the business key. Use the source LSN as an idempotency key for deduplication when exact-once processing is required.
- ✓CDC for data lakes: do not write each event directly to S3. Use a Spark Structured Streaming micro-batch consumer (5-minute trigger) that reads from Kafka and upserts to Delta Lake. Run Delta OPTIMIZE daily to compact the small files the micro-batch pattern creates. Handle DELETE events explicitly — mark as soft-deleted or use Delta MERGE DELETE clause.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.