CI/CD for Data Pipelines
Testing dbt models in CI, environment promotion, blue-green deployments, Airflow deployment patterns, slim CI, and building a safe deployment pipeline for data transformations.
CI/CD for Data — Why Deploying a dbt Model Is Not Like Pushing Code
Software CI/CD is well-understood: commit code, run unit tests, deploy to staging, run integration tests, deploy to production. Data pipeline CI/CD shares this structure but has unique challenges. A dbt model change does not just change code — it changes the data in a production table that analysts are querying right now. A schema change in a Gold model can silently break three Metabase dashboards. A wrong SQL expression produces incorrect numbers that make it into a CFO report.
The stakes of a bad data deployment are different from a bad software deployment. A software bug surfaces as an error page that users see and report. A data bug surfaces as a wrong number in a report that looks correct until someone notices it does not match expectations — often days later, after decisions have been made. This asymmetry means data CI/CD must be more rigorous about testing before deployment, not less.
Environment Strategy — Dev, Staging, and Production
A data platform needs at least two environments — development and production — and ideally three, adding a staging environment that mirrors production data and configuration. Each environment serves a specific purpose in the promotion pipeline, and the configuration must ensure that code changes flow in one direction: dev → staging → prod.
ENVIRONMENT HIERARCHY:
DEV (developer sandbox)
─────────────────────────────────────────────────────────────────────────
Purpose: Individual developer workspace for iterative development
Data: Subset of production data (last 7 days, sampled or full)
Schema: dev_{developer_name} or dev_{branch_name}
e.g. dev_priya_feature_order_tier
Access: Developer's personal credentials
Isolation: Completely isolated — dev changes cannot affect staging or prod
Cost: Low — small data volume, developer queries only
Lifespan: Created on branch checkout, deleted after merge
STAGING / CI (automated testing environment)
─────────────────────────────────────────────────────────────────────────
Purpose: Run automated tests against a clean copy of recent production data
Data: Clone of production data (last 30 days) OR Snowflake Zero-Copy Clone
Schema: ci_{PR_number} or staging_{branch_name}
Access: CI service account (read prod, write staging schema)
Isolation: Each PR gets its own isolated staging schema
Cost: Moderate — production data volume but only active during CI
Lifespan: Created on PR open, deleted after PR merge
PRODUCTION
─────────────────────────────────────────────────────────────────────────
Purpose: Serves real analytical consumers (analysts, BI tools, APIs)
Data: Full production data, updated by live pipelines
Schema: silver, gold (canonical schema names)
Access: Pipeline service accounts only (write); analysts read-only
Isolation: No direct developer write access — changes only via PR + CI
Cost: Full production compute and storage
Lifespan: Permanent
dbt TARGET CONFIGURATION (profiles.yml):
freshmart:
target: dev # default target for local development
outputs:
dev:
type: snowflake
account: freshmart.snowflake.com
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: analyst_role
database: freshmart_dev
schema: "dev_{{ env_var('DBT_DEV_SCHEMA', 'default') }}"
# Local: schema = dev_priya_feature_xyz
# CI: schema = ci_pr_142
ci:
type: snowflake
account: freshmart.snowflake.com
user: "{{ env_var('CI_SNOWFLAKE_USER') }}"
password: "{{ env_var('CI_SNOWFLAKE_PASSWORD') }}"
role: ci_service_role
database: freshmart_ci
schema: "ci_{{ env_var('PR_NUMBER', 'manual') }}"
# Each PR gets: ci_142, ci_143, etc.
prod:
type: snowflake
account: freshmart.snowflake.com
user: "{{ env_var('PROD_SNOWFLAKE_USER') }}"
password: "{{ env_var('PROD_SNOWFLAKE_PASSWORD') }}"
role: pipeline_role
database: freshmart_prod
schema: silver # or gold, depending on the model groupSnowflake Zero-Copy Clone — cheap production-like staging
PROBLEM: Running CI tests against production data is expensive.
Cloning production tables for each PR: copies 10 TB → expensive and slow.
SOLUTION: Snowflake Zero-Copy Clone
Creates an instant snapshot of a database/schema at zero storage cost.
Clone shares data pages with source until rows are modified.
Creating a 10 TB clone: 0-5 seconds, $0.00 storage (until writes)
-- Clone production schemas for a PR:
CREATE OR REPLACE DATABASE freshmart_ci_pr_142
CLONE freshmart_prod;
-- Creates an identical copy of freshmart_prod in ~2 seconds.
-- Storage cost: $0 (shared pages with freshmart_prod).
-- Write operations on the clone use new storage (small for CI tests).
-- CI SETUP SCRIPT:
def create_ci_environment(pr_number: int) -> str:
"""Create an isolated CI environment using Zero-Copy Clone."""
ci_db = f'freshmart_ci_pr_{pr_number}'
snowflake_conn.execute(f"""
CREATE OR REPLACE DATABASE {ci_db}
CLONE freshmart_prod
DATA_RETENTION_TIME_IN_DAYS = 1
""")
return ci_db
def teardown_ci_environment(pr_number: int) -> None:
"""Clean up CI environment after tests complete."""
ci_db = f'freshmart_ci_pr_{pr_number}'
snowflake_conn.execute(f'DROP DATABASE IF EXISTS {ci_db}')
# In CI pipeline:
# 1. On PR open: create_ci_environment(pr_number)
# 2. Run dbt tests: dbt test --target ci
# 3. On PR close: teardown_ci_environment(pr_number)
# BigQuery equivalent: BigQuery snapshots or dataset copies
# gcloud bigquery cp freshmart_prod freshmart_ci_pr_142 --location=asia-south1dbt CI — What to Run on Every Pull Request
A dbt CI pipeline runs on every pull request before merge. It catches compile errors, test failures, and schema breaking changes before they reach production. The key challenge is speed — a CI run that takes 45 minutes blocks the developer and tempts them to merge without waiting. The solution is slim CI: only run tests on models that were changed or depend on changed models.
# .github/workflows/dbt_ci.yml
name: dbt CI
on:
pull_request:
branches: [main]
paths:
- 'dbt/**'
- '.github/workflows/dbt_ci.yml'
jobs:
dbt-ci:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
DBT_PROFILES_DIR: /home/runner/.dbt
CI_SNOWFLAKE_USER: ${{ secrets.CI_SNOWFLAKE_USER }}
CI_SNOWFLAKE_PASSWORD: ${{ secrets.CI_SNOWFLAKE_PASSWORD }}
PR_NUMBER: ${{ github.event.pull_request.number }}
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0 # needed for dbt --select state:modified
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: pip
- name: Install dbt
run: pip install dbt-snowflake==1.8.0 dbt-utils
- name: Write dbt profiles
run: |
mkdir -p /home/runner/.dbt
cat > /home/runner/.dbt/profiles.yml << 'EOF'
freshmart:
target: ci
outputs:
ci:
type: snowflake
account: freshmart.snowflake.com
user: "{{ env_var('CI_SNOWFLAKE_USER') }}"
password: "{{ env_var('CI_SNOWFLAKE_PASSWORD') }}"
role: ci_service_role
database: "freshmart_ci_pr_{{ env_var('PR_NUMBER') }}"
schema: dbt_ci
warehouse: CI_WH
threads: 8
EOF
- name: Create CI database (Snowflake Zero-Copy Clone)
run: python scripts/ci/create_ci_db.py --pr ${{ github.event.pull_request.number }}
- name: dbt deps (install packages)
working-directory: dbt
run: dbt deps
- name: dbt compile (catch SQL syntax errors)
working-directory: dbt
run: dbt compile --target ci
- name: dbt run — SLIM CI (only changed models + downstream)
working-directory: dbt
run: |
# Slim CI: only run models modified in this PR + their dependents.
# This runs 5-20 models instead of all 150. CI time: 4 min vs 45 min.
dbt run --target ci \
--select state:modified+ \
--defer \
--state ./prod_artifacts \
--exclude tag:skip_ci
- name: dbt test — tests for changed models + downstream
working-directory: dbt
run: |
dbt test --target ci \
--select state:modified+ \
--defer \
--state ./prod_artifacts \
--store-failures
- name: dbt docs — generate for PR preview
working-directory: dbt
if: always()
run: dbt docs generate --target ci
- name: Check for breaking schema changes
run: python scripts/ci/check_schema_changes.py --pr ${{ github.event.pull_request.number }}
- name: Teardown CI database
if: always() # clean up even if previous steps failed
run: python scripts/ci/teardown_ci_db.py --pr ${{ github.event.pull_request.number }}
- name: Post test results to PR
if: always()
uses: actions/github-script@v7
with:
script: |
const results = require('./dbt/target/run_results.json');
const passed = results.results.filter(r => r.status === 'pass').length;
const failed = results.results.filter(r => r.status === 'fail').length;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## dbt CI Results\n✅ ${passed} passed ❌ ${failed} failed`,
});Slim CI — running only changed models with --defer
SLIM CI CONCEPTS:
1. STATE-BASED SELECTION (state:modified):
dbt compares the current code (manifest.json from the PR branch)
to a reference state (manifest.json from the last production run).
Only models that changed — or depend on changed models — are selected.
state:modified → only models whose SQL or config changed
state:modified+ → changed models + all downstream dependents
+state:modified → changed models + all upstream ancestors
+state:modified+ → full subtree around changed models
EXAMPLE: PR changes silver.orders.
state:modified+ selects:
silver.orders ← changed directly
gold.daily_revenue ← downstream of silver.orders
gold.customer_ltv ← downstream of silver.orders
gold.fct_orders_wide ← downstream of silver.orders
Skips: silver.customers, silver.payments, unrelated gold models.
Runs 4 models instead of 80. CI time: ~4 min instead of 45 min.
2. --DEFER (use production data for unmodified upstream models):
When running silver.orders in CI, it needs bronze.orders as input.
bronze.orders exists in production but not in the CI database.
--defer tells dbt: for models NOT in the CI run, use the production
version of that model from the prod database.
Without --defer:
silver.orders → tries to read from ci_pr_142.bronze.orders → NOT FOUND → error
With --defer --state ./prod_artifacts:
silver.orders → reads from freshmart_prod.bronze.orders → works! ✓
The CI environment only writes the models in state:modified+.
Everything else is deferred to production data.
3. PROD ARTIFACTS (the reference state):
prod_artifacts/manifest.json is the manifest from the last successful
production run. It is stored as:
- An artifact in the CI/CD system (GitHub Actions cache/artifact)
- Or fetched from dbt Cloud's API
- Or stored in S3 and downloaded at CI start
FETCHING PROD MANIFEST FROM S3:
aws s3 cp s3://freshmart-ci-artifacts/dbt/manifest.json ./prod_artifacts/
aws s3 cp s3://freshmart-ci-artifacts/dbt/catalog.json ./prod_artifacts/
UPDATING PROD MANIFEST after successful prod deployment:
aws s3 cp ./target/manifest.json s3://freshmart-ci-artifacts/dbt/
# Runs at end of every successful production dbt run
4. SCHEMA CHANGE DETECTION:
A script that compares current manifest to prod manifest and
flags breaking changes: removed columns, type changes, renamed columns.
# scripts/ci/check_schema_changes.py
def detect_breaking_changes(
current_manifest: dict,
prod_manifest: dict,
) -> list[str]:
breaking = []
for node_id, node in prod_manifest['nodes'].items():
if node_id not in current_manifest['nodes']:
breaking.append(f"Model removed: {node['name']}")
continue
prod_cols = {c: v['data_type']
for c, v in node.get('columns', {}).items()}
current_cols = {c: v['data_type']
for c, v in current_manifest['nodes'][node_id]
.get('columns', {}).items()}
for col, dtype in prod_cols.items():
if col not in current_cols:
breaking.append(f"{node['name']}.{col} removed")
elif current_cols[col] != dtype:
breaking.append(
f"{node['name']}.{col}: {dtype} → {current_cols[col]}"
)
return breakingDeploying to Production — Safe Deployment Patterns for dbt
Deploying dbt changes to production requires more care than deploying application code. A full dbt run on production tables that takes 3 hours cannot be rolled back instantly if a bug is found 2 hours in. Safe deployment patterns reduce the blast radius and enable fast recovery.
STRATEGY 1: DIRECT DEPLOYMENT (simplest — fine for most cases)
On merge to main: run dbt in production environment.
Any test failure blocks the deployment.
Rollback: re-run the previous git tag.
GITHUB ACTIONS — PRODUCTION DEPLOY:
name: dbt Production Deploy
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
dbt deps
dbt run --target prod --vars '{"run_date": "{{ ds }}"}'
dbt test --target prod
- name: Update prod artifacts in S3
run: aws s3 cp ./target/manifest.json s3://freshmart-ci-artifacts/dbt/
STRATEGY 2: BLUE-GREEN DEPLOYMENT (for high-risk Gold changes)
Build the new version of the table in a shadow schema.
Validate it. Atomically swap the schema pointer.
Rollback: swap back to the old schema (still exists).
IMPLEMENTATION IN SNOWFLAKE:
def blue_green_deploy_gold_model(model_name: str, run_date: str):
"""
Build the new version in a shadow schema.
If tests pass: swap shadow → production.
Old production schema preserved for 24h for rollback.
"""
# Step 1: Build in shadow schema (not live to analysts):
subprocess.run([
'dbt', 'run', '--target', 'prod',
'--select', model_name,
'--vars', json.dumps({'run_date': run_date,
'target_schema': 'gold_shadow'}),
], check=True)
# Step 2: Run tests against shadow schema:
subprocess.run([
'dbt', 'test', '--target', 'prod',
'--select', model_name,
'--vars', json.dumps({'target_schema': 'gold_shadow'}),
], check=True)
# Step 3: Atomic schema swap (analysts see new version immediately):
conn.execute("""
ALTER SCHEMA freshmart_prod.gold
RENAME TO freshmart_prod.gold_old_20260317;
""")
conn.execute("""
ALTER SCHEMA freshmart_prod.gold_shadow
RENAME TO freshmart_prod.gold;
""")
# Analysts querying gold.daily_revenue now see the new version.
# If rollback needed: rename back.
# Step 4: Drop old schema after 24h validation window:
schedule_schema_drop('gold_old_20260317', delay_hours=24)
STRATEGY 3: INCREMENTAL DEPLOYMENT (for schema migrations)
When adding a new column to a large Silver table:
Step 1: Add column as nullable in the same dbt run.
Analysts see NULL for the new column — no breakage.
Step 2: Backfill the new column value for all existing rows.
Run as a separate job with progress tracking.
Step 3: Once backfill complete: apply not_null constraint.
Remove the temporary nullable flag.
This prevents a 3-hour "backfill lock" on a 500M-row table
that would block analysts from querying it.
-- Step 1: Add column (dbt schema.yml change):
-- new column 'tip_amount' added with no tests initially.
-- Step 2: Backfill script (run separately):
UPDATE silver.orders
SET tip_amount = 0.0
WHERE tip_amount IS NULL
AND created_at < '2026-03-17'; -- all rows before the feature launch
-- Step 3: Add not_null test to schema.yml after backfill completes.
ROLLBACK STRATEGY:
dbt does not have a native "rollback" command.
Rollback options:
1. Revert the git commit and redeploy:
git revert HEAD && git push origin main → CI/CD re-deploys old version
2. Delta Lake time travel:
RESTORE TABLE silver.orders TO VERSION AS OF 41
(restores the Silver table to the state before the bad deploy)
3. Blue-green: swap back to old schema (if blue-green was used)
WHICH TO USE:
Simple model logic change: git revert + redeploy (safest, cleanest)
Large data change: Delta time travel (fastest data recovery)
Critical Gold model: blue-green swap (immediate, no recompute needed)Airflow Deployment — DAG Versioning and Safe Updates
Deploying Airflow DAGs has unique challenges compared to dbt models. A DAG change takes effect the next time the scheduler parses it — typically within 30 seconds. If the change modifies a DAG that is currently running, the in-progress run may behave unexpectedly with the new code. DAG versioning and safe deployment patterns prevent mid-run disruptions.
AIRFLOW DAG DEPLOYMENT APPROACHES:
APPROACH 1: GIT SYNC (most common for managed Airflow)
Airflow reads DAG files directly from a Git repository.
Any push to the main branch is reflected in Airflow within 30-60 seconds.
Used by: Google Cloud Composer, MWAA, Astronomer.
FLOW:
Developer pushes → CI tests pass → merge to main → Git Sync detects change
→ Airflow scheduler re-parses DAG file → change is live
RISK: no staging for Airflow DAGs.
A syntax error in a DAG file makes the DAG disappear from the UI.
A schedule change takes effect immediately, possibly mid-run.
MITIGATION:
- Run python -m py_compile dags/*.py in CI to catch syntax errors
- Run airflow dags list-import-errors in CI to catch import errors
- Use DAG pausing for risky changes (pause, deploy, verify, unpause)
APPROACH 2: DAG VERSIONING (for breaking schedule/structure changes)
When changing a DAG's schedule or removing tasks, create a new DAG ID.
Old DAG runs to completion. New DAG takes over from next run.
# Bad approach: modify existing DAG schedule mid-stream
DAG('freshmart_morning_pipeline', schedule='0 2 * * *', ...)
# Change to:
DAG('freshmart_morning_pipeline', schedule='0 6 * * *', ...)
# Risk: if a run is in progress, it sees the new schedule on next evaluation.
# Good approach: version the DAG ID for breaking changes
DAG('freshmart_morning_pipeline_v2', schedule='0 6 * * *', ...)
# v1 continues running its current cycle.
# v2 starts on the new schedule from the first run after deploy.
# Once v1 has no more in-progress runs: delete it.
APPROACH 3: STAGED DEPLOYMENT WITH TESTING
CI pipeline for Airflow DAGs:
# .github/workflows/airflow_ci.yml
steps:
- name: Lint DAG files
run: |
pip install apache-airflow flake8
flake8 dags/ --max-line-length=120
- name: Validate DAG syntax (no import errors)
run: |
for dag_file in dags/*.py; do
python -m py_compile "$dag_file" && echo "OK: $dag_file"
done
- name: Check for import errors using Airflow CLI
run: |
airflow db init
airflow dags list-import-errors
# Fails CI if any DAG has import errors
- name: Validate DAG structure (task dependencies, no cycles)
run: |
python scripts/ci/validate_dag_structure.py
# Checks: all task IDs are unique, no circular dependencies,
# required tasks (start, end EmptyOperators) present,
# all referenced connections exist in Airflow connections
- name: Run DAG unit tests
run: pytest tests/dags/ -v
UNIT TESTING AIRFLOW DAGS:
import pytest
from airflow.models import DagBag
def test_freshmart_pipeline_dag_structure():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
dag = dagbag.get_dag('freshmart_morning_pipeline')
assert dag is not None, "DAG not found"
assert len(dagbag.import_errors) == 0, f"Import errors: {dagbag.import_errors}"
task_ids = [task.task_id for task in dag.tasks]
assert 'start' in task_ids, "Missing 'start' task"
assert 'end' in task_ids, "Missing 'end' task"
assert 'dbt_silver' in task_ids, "Missing 'dbt_silver' task"
assert 'dbt_gold' in task_ids, "Missing 'dbt_gold' task"
def test_freshmart_pipeline_task_order():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
dag = dagbag.get_dag('freshmart_morning_pipeline')
# Verify that dbt_silver runs before dbt_gold:
silver_task = dag.get_task('dbt_silver')
gold_task = dag.get_task('dbt_gold')
assert gold_task.task_id in [t.task_id for t in silver_task.downstream_list], "dbt_gold must be downstream of dbt_silver"
def test_schedule_is_set():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
dag = dagbag.get_dag('freshmart_morning_pipeline')
assert dag.schedule_interval is not None, "DAG has no schedule"
assert dag.catchup is False, "catchup must be False in production DAGs"Testing Strategies for Data Pipelines — Unit, Integration, and E2E
Data pipelines are harder to unit test than application code because the business logic is in SQL and the side effects are writes to a database. The testing pyramid for data pipelines is inverted compared to software: integration and end-to-end tests provide more value than unit tests, because most bugs occur at the boundary between the SQL and the data, not in pure logic.
DATA PIPELINE TESTING PYRAMID:
End-to-end (run full pipeline against prod-like data, validate outputs)
↑ most valuable but slowest
Integration tests (run dbt models against real data, validate with dbt tests)
↑ good coverage, medium speed
Unit tests (test pure Python logic — validation functions, hash key generators)
↓ least valuable alone, fastest
Unlike software, the pyramid is widest at the top for data pipelines.
A dbt model with correct SQL tests is more valuable than mocking the SQL.
UNIT TESTS (for pure Python logic):
Test validation functions, hash key generators, transformation helpers.
Do NOT try to unit test SQL by mocking the database — that doesn't work.
# tests/unit/test_validation.py
import pytest
from pipeline.validate import validate_order_row, VALID_STATUSES
def test_valid_order_passes():
row = {
'order_id': 9284751,
'customer_id': 4201938,
'order_amount': 380.00,
'status': 'delivered',
}
result = validate_order_row(row)
assert result.is_valid, f"Expected valid, got: {result.error}"
def test_negative_amount_rejected():
row = {'order_id': 1, 'customer_id': 1, 'order_amount': -10, 'status': 'placed'}
result = validate_order_row(row)
assert not result.is_valid
assert result.error_type == 'negative_amount'
def test_invalid_status_rejected():
row = {'order_id': 1, 'customer_id': 1, 'order_amount': 100, 'status': 'unknown'}
result = validate_order_row(row)
assert not result.is_valid
assert result.error_type == 'invalid_status'
assert 'unknown' in result.error_message
def test_hash_key_is_deterministic():
from pipeline.vault import compute_hub_hk
key1 = compute_hub_hk('4201938')
key2 = compute_hub_hk('4201938')
assert key1 == key2
def test_hash_key_normalisation():
from pipeline.vault import compute_hub_hk
# Upper, lower, and padded versions must produce the same key:
assert compute_hub_hk('ST001') == compute_hub_hk('st001')
assert compute_hub_hk('ST001') == compute_hub_hk(' ST001 ')
INTEGRATION TESTS (dbt tests against real data — the most valuable):
Run in CI against the Zero-Copy Clone of production data.
These are the dbt tests in schema.yml — not_null, unique, accepted_values.
The value: tests run against PRODUCTION DATA VOLUMES.
A not_null test that passes locally on 1,000 dev rows may fail on
50 million production rows because production has edge cases dev does not.
Run in CI:
dbt test --target ci --select state:modified+
All dbt tests for changed models run against production-like data.
Failures block the PR before merge.
END-TO-END TESTS (run full pipeline, validate output):
Run the complete pipeline (extraction → Bronze → Silver → Gold) on
a subset of production data. Validate the output tables match expectations.
# tests/e2e/test_morning_pipeline.py
import pytest
from datetime import date, timedelta
def test_morning_pipeline_e2e(snowflake_conn, dbt_runner):
"""
Run a full pipeline on a small date range.
Validate key output metrics match known-good values.
"""
test_date = date.today() - timedelta(days=1)
# Run the full pipeline for one day:
result = dbt_runner.run(
select='staging.* silver.* gold.*',
vars={'run_date': str(test_date)},
target='ci',
)
assert result.success, f"Pipeline failed: {result.errors}"
# Validate row counts are in expected range:
rows = snowflake_conn.execute(f"""
SELECT COUNT(*) FROM ci_pr_142.gold.daily_revenue
WHERE order_date = '{test_date}'
""").scalar()
assert 40_000 < rows < 100_000, f"Unexpected row count: {rows}"
# Validate key business invariants:
negative_revenue = snowflake_conn.execute(f"""
SELECT COUNT(*) FROM ci_pr_142.gold.daily_revenue
WHERE net_revenue < 0
""").scalar()
assert negative_revenue == 0, f"Found {negative_revenue} negative revenue rows"
# Validate Silver reconciles with Bronze:
bronze_count = snowflake_conn.execute(f"""
SELECT COUNT(*) FROM ci_pr_142.bronze.orders
WHERE _bronze_date = '{test_date}'
""").scalar()
silver_count = snowflake_conn.execute(f"""
SELECT COUNT(*) FROM ci_pr_142.silver.orders
WHERE DATE(created_at) = '{test_date}'
""").scalar()
dlq_count = snowflake_conn.execute(f"""
SELECT COUNT(*) FROM ci_pr_142.pipeline.dead_letter_queue
WHERE pipeline_name = 'silver_orders' AND run_date = '{test_date}'
""").scalar()
# Bronze = Silver + DLQ (all rows accounted for):
assert bronze_count == silver_count + dlq_count, f"Row count mismatch: {bronze_count} bronze != {silver_count} silver + {dlq_count} dlq"The Complete CI/CD Flow — From Commit to Production
| Stage | Trigger | What runs | Blocks merge? | Time |
|---|---|---|---|---|
| Pre-commit | git commit (local hook) | sqlfluff lint, black format check, py_compile DAG files | No (local only) | < 5s |
| PR opened | pull_request event | Create Zero-Copy Clone CI DB, dbt deps, compile | Yes if compile fails | 2 min |
| PR CI tests | pull_request (push) | dbt run state:modified+ --defer, dbt test state:modified+, schema change detection, DAG unit tests | Yes if tests fail | 4-8 min |
| PR review | Human approval | Code review, data contract check, downstream impact review | Yes (1 approval required) | Human |
| Merge to main | PR merged | Production dbt run, dbt test --target prod, update prod artifacts in S3, teardown CI DB | Auto-merge blocked if CI fails | 10-30 min |
| Post-deploy | Successful prod run | Notify Slack #deploys, run post-deploy smoke tests, update monitoring dashboard | No | 2 min |
A Schema Change That Broke Three Dashboards — And How CI Would Have Prevented It
A data engineer renames net_revenue to revenue_after_discount in gold.daily_revenue for clarity. The rename is a SQL-only change in one dbt model. No dbt tests fail. The PR is merged. Three Metabase dashboards that query net_revenue directly break immediately. Finance notices at 09:00.
THE INCIDENT:
Change: gold/daily_revenue.sql
-- Before:
order_amount - discount_amount AS net_revenue
-- After:
order_amount - discount_amount AS revenue_after_discount
Impact: 3 Metabase dashboards query:
SELECT SUM(net_revenue) FROM gold.daily_revenue WHERE ...
→ After rename: net_revenue column no longer exists → dashboard error
Detection time: 45 minutes (Finance analyst reports broken dashboard)
Fix time: 20 minutes (add net_revenue as alias, redeploy)
Total impact: 1h 5min of broken Finance dashboards in the morning
ROOT CAUSE: No schema change detection in CI.
The PR had:
✓ SQL compiled successfully
✓ dbt tests passed (not_null, unique on order_date, store_id)
✗ No check that net_revenue was removed
✗ No check that Metabase uses net_revenue
✗ No breakage visible until prod deploy
CI IMPROVEMENTS ADDED AFTER THIS INCIDENT:
1. SCHEMA CHANGE DETECTION (in every PR):
# scripts/ci/check_schema_changes.py
def check_for_breaking_column_changes():
"""
Compare current manifest to prod manifest.
Fail CI if any column was removed or renamed in a Gold model.
"""
prod = load_manifest('./prod_artifacts/manifest.json')
current = load_manifest('./target/manifest.json')
changes = detect_breaking_changes(prod['nodes'], current['nodes'])
if changes:
print("BREAKING SCHEMA CHANGES DETECTED:")
for c in changes:
print(f" - {c}")
print()
print("If this is intentional, update all downstream consumers first.")
print("Then re-run CI with: git commit -m 'chore: update consumers'")
sys.exit(1)
# Output on this PR:
# BREAKING SCHEMA CHANGES DETECTED:
# - gold.daily_revenue.net_revenue removed
# If this is intentional, update all downstream consumers first.
2. DOWNSTREAM CONSUMER CHECK (for Gold columns):
# As part of DataHub ingestion, tag all Metabase columns that
# reference each Gold column.
# CI queries DataHub API:
def check_downstream_consumers(changed_columns: list[str]) -> list[str]:
"""Return list of BI tools using any of the changed columns."""
affected = []
for col in changed_columns:
consumers = datahub_client.get_downstream_consumers(
table='gold.daily_revenue', column=col
)
affected.extend(consumers)
return affected
# Output: ["Metabase: Daily Revenue dashboard", "CFO Report export"]
# CI fails with: "These consumers must be updated before this column is renamed."
3. BACKWARD-COMPATIBLE MIGRATION PATTERN:
# The correct way to rename a column:
# Step 1 (this PR): Add the new column, keep the old as an alias
order_amount - discount_amount AS revenue_after_discount,
order_amount - discount_amount AS net_revenue, -- ← backward compat alias
# Step 2: Notify all dashboard owners about the migration window.
# Step 3 (next PR, after dashboards updated): Remove net_revenue alias.
# This is the same deprecation pattern used in software APIs.
# Add the new name. Keep the old name. Remove old name only after all consumers migrated.
RESULT AFTER CI IMPROVEMENTS:
Next time a Gold column is renamed or removed:
CI fails with: "BREAKING SCHEMA CHANGES: net_revenue removed"
Developer sees: list of consumers to update before this is safe to merge.
Engineer cannot merge until consumers are updated or the PR is modified.
Zero production breakages from schema changes.5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓Data pipeline CI/CD has higher stakes than software CI/CD. A software bug surfaces as a visible error. A data bug looks like normal data but produces wrong numbers — discovered hours or days later after decisions have been made. This asymmetry demands rigorous testing before production deployment.
- ✓Three environments: Dev (individual developer sandbox, small data subset, isolated schema), Staging/CI (Zero-Copy Clone of production data, isolated per PR, created on PR open and torn down after merge), Production (full data, pipeline service accounts only, no direct developer write access).
- ✓Snowflake Zero-Copy Clone creates an instant snapshot of a production database at zero storage cost. Use it to give each CI run an isolated environment with production-like data. Creating a 10 TB clone takes seconds and costs nothing until the CI run writes to it. Tear down after merge to avoid accumulating idle clones.
- ✓Slim CI uses --select state:modified+ to run only changed models and their downstream dependents. The --defer flag uses production data for upstream models not in the CI selection. Together: CI runs 4-8 minutes instead of 45+ minutes. The prod_artifacts/manifest.json (updated after every successful prod run) provides the reference state for change detection.
- ✓Schema change detection compares the current manifest to the production manifest and fails CI if any Gold column was removed or renamed. This is the most important CI check for preventing broken dashboards. A column rename must go through a deprecation cycle: add the new name, keep the old name as an alias, notify consumers, remove the old name only after all consumers migrate.
- ✓Airflow DAG CI: compile with py_compile (syntax), import with DagBag (catches missing modules), assert DAG structure (expected task IDs, correct dependency order, catchup=False, schedule not None), validate connections exist. Run against the same Docker image as production — a package installed in CI but not production causes the DAG to disappear from the UI after deployment.
- ✓Blue-green deployment for high-risk Gold changes: build the new version in a shadow schema, run tests against it, then atomically swap shadow → production using a transaction. The old schema is preserved for 24 hours as a rollback option. Wrap the schema rename in a BEGIN/COMMIT transaction to make it atomic — non-atomic renames leave a window where no schema exists.
- ✓Rollback strategies: git revert + redeploy (safe and clean, takes 5-10 min), Delta Lake time travel (RESTORE TABLE to a previous version — fast data recovery), blue-green swap back (immediate, no recompute — only if blue-green was used). Choose based on the nature of the problem: logic error → git revert, large data corruption → Delta time travel.
- ✓The data testing pyramid is inverted. Integration tests (dbt tests against real data in CI) provide more value than unit tests because most bugs occur at the boundary between SQL and data. Unit tests are valuable for pure Python logic (validators, hash functions). End-to-end tests validate the full pipeline output against known business invariants.
- ✓The PR process and CI gates are an investment in trust. Analysts who have been burned by wrong data distrust every number. Analysts who trust the data use it confidently and make better decisions. The minutes spent in CI are returned many times over in analyst confidence, fewer post-incident investigations, and stakeholder trust in the data platform.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.