Working with Files at Scale
File organisation, compression, partitioning, the small file problem, and format conversion pipelines.
Why Working with Files at Scale Is Its Own Engineering Discipline
A single CSV file is trivial. A directory of 50 million Parquet files representing three years of event data is not. At scale, every file decision has compounding consequences: how files are named determines whether partition pruning works. How files are sized determines whether Spark jobs are fast or slow. How files are compressed determines storage cost. How they are organised determines whether analysts can find data without asking a data engineer.
This module covers the file engineering that sits between "I know Parquet exists" and "I can design a file layer that scales to petabytes and still serves fast queries." These decisions are made once and lived with for years — getting them right matters.
File Naming — The Foundation of a Usable Data Lake
File naming in a data lake is not cosmetic. The names must encode enough information to identify the file without reading it, sort chronologically without special logic, and survive being listed in any filesystem tool without ambiguity. Poor naming conventions produce lakes where data engineers spend 20 minutes finding the right file for every query.
# ── THE RULES ────────────────────────────────────────────────────────────────
# 1. Timestamps: always ISO 8601, always UTC, always in the name
# 2. Use underscores not spaces (spaces cause shell escaping nightmares)
# 3. Include enough context to identify without opening the file
# 4. Lexicographic sort order = chronological sort order (ISO dates achieve this)
# 5. Include a unique identifier to prevent overwrite collisions
# ── LANDING ZONE: preserve origin context ────────────────────────────────────
# Pattern: {source}_{entity}_{start_ts}_{end_ts}_{batch_id}.{ext}
razorpay_payments_20260317T000000Z_20260317T235959Z_f8a3b2c4.json
shopify_orders_20260317T060000Z_20260317T120000Z_9e1d7c3f.csv
shipfast_deliveries_20260317T000000Z_20260317T235959Z_2b4a8d6e.parquet
# What each component gives you:
# razorpay → which source system (filter by prefix)
# payments → which entity/table
# 20260317T... → ISO 8601 UTC timestamps (sort = chronological order)
# f8a3b2c4 → unique batch ID (trace back to pipeline run logs)
# ── BRONZE LAYER: add pipeline metadata ──────────────────────────────────────
# Pattern: {entity}/year={YYYY}/month={MM}/day={DD}/{entity}_{ts}_{id}.parquet
# (Hive-style partitioning — covered in Part 03)
s3://freshmart-lake/bronze/payments/year=2026/month=03/day=17/
payments_20260317T000000Z_f8a3b2c4.parquet
payments_20260317T060000Z_9e1d7c3f.parquet
# ── SILVER AND GOLD: clean entity-oriented names ─────────────────────────────
s3://freshmart-lake/silver/orders/date=2026-03-17/part-00001.parquet
s3://freshmart-lake/gold/daily_revenue/date=2026-03-17/part-00001.parquet
# ── WHAT NOT TO DO ────────────────────────────────────────────────────────────
# BAD: no timestamp — cannot determine when file was created
orders.csv
# BAD: ambiguous date format — is 03/17/26 March or day 3?
orders_03-17-26.csv
# BAD: spaces in filename — breaks shell commands without quoting
march 17 orders.csv
# BAD: no source or entity context
data_f8a3b2c4.parquet
# BAD: non-sortable timestamp — file 2_11_2026 sorts before 3_1_2026
orders_2_11_2026.csv # Feb 11 sorts AFTER Mar 1? Depends on tool.
# BAD: mutable names — what does "latest" point to next week?
orders_latest.parquet
orders_final.parquet
orders_final_v2.parquet
orders_final_v2_ACTUALLY_FINAL.parquet # this is not a jokeNaming for operational visibility
# Production pipelines write many files per day.
# Good names let you diagnose issues without opening files.
# Include run metadata in filename for traceability:
# {entity}_{date}_{pipeline_run_id}_{chunk_index:04d}.parquet
orders_20260317_run-abc123_0001.parquet
orders_20260317_run-abc123_0002.parquet
orders_20260317_run-abc123_0003.parquet
# Benefits:
# 1. Know exactly which pipeline run wrote each file
# 2. 0001/0002/0003 tells you how many chunks were written
# 3. If a run writes 0001 and 0002 but not 0003, you know it stopped early
# 4. Re-running produces different run_id — old files are not overwritten
# Generating this in Python:
import uuid
from datetime import date, timezone
def make_output_filename(
entity: str,
run_date: date,
run_id: str,
chunk_idx: int,
fmt: str = 'parquet',
) -> str:
return f"${entity}_${run_date.strftime('%Y%m%d')}_run-${run_id[:8]}_${chunk_idx:04d}.${fmt}"
run_id = str(uuid.uuid4())
# "orders_20260317_run-f8a3b2c4_0001.parquet"
print(make_output_filename('orders', date(2026, 3, 17), run_id, 1))Partitioning — The Single Biggest Lever for Query Performance
Partitioning is the practice of organising files into a directory hierarchy based on column values. When a query filters on a partition column, the query engine reads only the directories matching that filter and skips all others. A query for last week's orders on a dataset partitioned by date reads 7 directories out of 1,000 — 99.3% of files never open. This is called partition pruning and it is the most impactful performance optimisation in a data lake.
Hive-style partitioning — the standard
# Hive-style partitioning uses key=value directory names.
# Query engines (Spark, Athena, Presto, BigQuery external tables)
# understand this structure natively and prune partitions automatically.
# Directory structure for orders partitioned by date:
s3://freshmart-lake/silver/orders/
date=2026-03-15/
part-00001.parquet (rows where date = 2026-03-15)
part-00002.parquet
date=2026-03-16/
part-00001.parquet (rows where date = 2026-03-16)
date=2026-03-17/
part-00001.parquet (rows where date = 2026-03-17)
part-00002.parquet
part-00003.parquet
# Query: SELECT COUNT(*) FROM orders WHERE date = '2026-03-17'
# Without partitioning: reads ALL files → 100% I/O
# With date partitioning: reads ONLY date=2026-03-17/ → ~0.3% I/O (1 of 365 days)
# Multi-level partitioning (for finer granularity):
s3://freshmart-lake/silver/orders/
year=2026/
month=03/
day=17/
store=ST001/
part-00001.parquet
store=ST002/
part-00001.parquet
# Query: WHERE year=2026 AND month=03 AND store='ST001'
# Reads: ONLY year=2026/month=03/*/store=ST001/ files
# Writing Hive-partitioned Parquet in Python (PyArrow):
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
df = pd.read_csv('orders.csv')
df['date'] = pd.to_datetime(df['created_at']).dt.date.astype(str)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path='s3://freshmart-lake/silver/orders',
partition_cols=['date'], # creates date=YYYY-MM-DD/ dirs
filesystem=s3_filesystem, # pyarrow.fs.S3FileSystem
existing_data_behavior='overwrite_or_ignore',
compression='snappy',
)
# Writing in PySpark:
df.write .mode('overwrite') .partitionBy('date') .parquet('s3://freshmart-lake/silver/orders')Choosing the right partition key — the most important decision
The partition key must match the most common query filter. If analysts almost always filter by date, partition by date. If they filter by store, partition by store. The wrong partition key means partition pruning never fires and you get no benefit from the overhead of managing partitions.
Compression Codecs — Choosing the Right One for Each Situation
Every Parquet and Avro file is compressed. The codec choice affects storage cost, read speed, write speed, and CPU usage during compression/decompression. There is no universal best choice — different workloads have different optimal codecs.
| Codec | Compression ratio | Compress speed | Decompress speed | Splittable? | Best for |
|---|---|---|---|---|---|
| SNAPPY | 2–3× (moderate) | Very fast | Very fast | Yes (Parquet/Avro blocks) | Default for data lake Parquet — best balance of speed and ratio |
| GZIP / DEFLATE | 4–6× (good) | Slow | Moderate | No (as raw .gz file) | Archival storage, landing zone CSVs, when size matters more than speed |
| ZSTD | 3–5× (very good) | Fast (tunable levels) | Very fast | Yes (Parquet/Avro blocks) | Modern default — better ratio than Snappy at similar speed. Parquet 1.5+ default |
| LZ4 | 1.5–2× (low) | Extremely fast | Extremely fast | Yes | Real-time streaming, Kafka messages — CPU cost matters more than ratio |
| BROTLI | 5–7× (excellent) | Very slow | Moderate | Yes | Cold archival storage, rarely read files |
| UNCOMPRESSED | 1× (none) | N/A | N/A | Yes | Development/testing only — never use in production data lake |
Splittability — why it matters for Spark performance
A splittable format allows multiple Spark executors to read different parts of the same file in parallel. A non-splittable format forces a single executor to read the entire file before splitting the data — a bottleneck that eliminates the parallelism that makes Spark fast.
# SPLITTABLE: Parquet/Avro with Snappy, ZSTD, or LZ4
# Each row group (Parquet) or data block (Avro) is compressed independently.
# Spark assigns one row group per task — true parallel reading.
s3://freshmart-lake/silver/orders/date=2026-03-17/part-00001.parquet
# This single 500 MB Parquet file with 10 row groups:
# → Spark creates 10 tasks, each reading one 50 MB row group in parallel
# → All 10 executors work simultaneously
# → Read time: ~5 seconds with 10 executors
# NON-SPLITTABLE: plain .gz CSV files
s3://freshmart-lake/landing/orders_20260317.csv.gz
# This single 500 MB gzip file:
# → One executor must decompress the ENTIRE file before splitting
# → Other executors wait idle
# → Read time: ~50 seconds (10× slower)
# HOW TO CHECK IF A FILE IS SPLITTABLE:
# Parquet with Snappy/ZSTD → always splittable (row group level)
# Parquet with GZIP → splittable (row group level) in modern Parquet
# .csv.gz → NOT splittable
# .csv.bz2 → splittable (bz2 supports block splitting but still slow)
# Avro with any codec → splittable (block level)
# .json.gz → NOT splittable
# RULE: for data lake storage, always use Parquet (inherently splittable
# regardless of codec) or Avro. Never store large raw .gz CSV files
# in the analytical layer.Codec selection by use case
# DATA LAKE — Parquet files (Silver, Gold, Bronze)
pq.write_table(table, path, compression='zstd') # best all-rounder for 2026
pq.write_table(table, path, compression='snappy') # safe default, widely supported
# Per-column compression (Parquet supports different codecs per column):
pq.write_table(
table, path,
compression={
'order_id': 'zstd', # numeric ID — compresses well with delta encoding
'order_text': 'snappy', # free text — fast decomp matters for queries
'image_url': 'gzip', # URL strings — ratio more important than speed
}
)
# KAFKA / STREAMING (Avro messages)
# LZ4 — lowest latency, minimal CPU overhead
# Producer config: compression.type=lz4
# LANDING ZONE (vendor CSV/JSON files — as received, do not re-compress)
# Accept whatever the vendor sends, land it as-is
# Convert to Parquet/Snappy at Bronze layer
# ARCHIVAL (data older than 2 years, rarely accessed)
# GZIP for CSV archives, BROTLI or ZSTD level 19 for Parquet
# Trade CPU time at write for storage savings on cold data
# COMPRESSION LEVEL TUNING (ZSTD):
pq.write_table(table, path,
compression='zstd',
compression_level=1, # fastest, moderate ratio (default)
# compression_level=3, # good ratio, still fast
# compression_level=9, # high ratio, slower — for archival
# compression_level=19, # max ratio, very slow — cold storage only
)The Small File Problem — The Silent Performance Killer in Every Data Lake
The small file problem is one of the most common and most impactful performance issues in data lakes. It occurs when a data lake accumulates millions of tiny files — each valid, each correct — but collectively making every operation slow: listing, querying, reading, and writing.
The root cause is almost always streaming or micro-batch pipelines that write many small files over time, or highly partitioned tables where each partition gets very few rows per pipeline run. The diagnosis and solution are both well-understood — but many teams do not apply them until performance has already degraded severely.
Why small files are slow
# HEALTHY file structure: few large files per partition
s3://freshmart-lake/silver/orders/date=2026-03-17/
part-00001.parquet (480 MB)
part-00002.parquet (520 MB)
part-00003.parquet (495 MB)
# 3 files × ~500 MB each = 1.5 GB total
# Spark creates 3 tasks, each reads one large file efficiently
# SMALL FILE PROBLEM: many tiny files per partition
s3://freshmart-lake/silver/orders/date=2026-03-17/
part-00001.parquet (2.1 KB) ← written by 5-minute micro-batch 00:05
part-00002.parquet (1.8 KB) ← written by 5-minute micro-batch 00:10
part-00003.parquet (2.4 KB) ← written by 5-minute micro-batch 00:15
... (287 more files)
part-00288.parquet (1.9 KB) ← written by 5-minute micro-batch 23:55
# 288 files × ~2 KB each = 576 KB total data (same data!)
# Spark creates 288 tasks — 288× task scheduling overhead
# S3 LIST request returns 288 file metadata entries
# Each file read requires a separate S3 GET request
# WHAT SMALL FILES DO TO PERFORMANCE:
# 1. S3/ADLS LIST calls:
# Listing 3 files: <10ms
# Listing 288 files: ~100ms (and S3 paginates at 1,000 objects)
# Listing 50,000 files: 5-10 seconds just for the directory listing
# 2. Spark task overhead:
# Each Spark task has ~100ms JVM overhead for scheduling
# 288 tasks × 100ms = ~29 seconds of pure overhead
# 3 tasks × 100ms = ~300ms overhead
# Same data, 97× more overhead
# 3. Parquet footer reads:
# Each Parquet file requires reading its footer to get schema and statistics
# 288 footer reads = 288 S3 GET requests for metadata
# Most of these tiny files have no useful statistics anyway
# 4. Hive Metastore / Glue catalog:
# Every file is a separate entry in partition metadata
# MSCK REPAIR TABLE (rediscover partitions) takes minutes on lakes
# with millions of files instead of seconds
# SCALE OF THE PROBLEM at large companies:
# A pipeline writing every 5 minutes produces 288 files/day per partition
# With 10 store partitions: 2,880 files/day
# After 1 year: 1,051,200 files
# After 3 years: 3,153,600 files — most systems start struggling around 1M filesSolutions — compaction, coalescing, and preventing accumulation
# ── APPROACH 1: COMPACTION (merging small files into large ones) ──────────────
# Run a compaction job after each pipeline run or on a schedule
from pyspark.sql import SparkSession
def compact_partition(
spark: SparkSession,
path: str,
date: str,
target_file_size_mb: int = 512,
) -> None:
"""
Read all small files in a partition and rewrite as fewer large files.
Target: 1 file per 512 MB of compressed data (128 MB min, 1 GB max).
"""
partition_path = f"${path}/date=${date}"
df = spark.read.parquet(partition_path)
row_count = df.count()
# Calculate target number of files
# Rough estimate: 1M rows ≈ 100 MB compressed Parquet
estimated_mb = row_count / 10_000
target_files = max(1, int(estimated_mb / target_file_size_mb))
df.coalesce(target_files) .write .mode('overwrite') .parquet(partition_path)
print(f"Compacted date=${date}: ${row_count:,} rows → ${target_files} files")
# ── APPROACH 2: COALESCE BEFORE WRITE ─────────────────────────────────────────
# Prevent small files at write time by coalescing the output DataFrame
def write_compact_parquet(df, output_path: str, partition_col: str = 'date') -> None:
"""Write Parquet with controlled file count per partition."""
# Count rows to estimate file count needed
row_count = df.count()
target_mb = 512 # target file size in MB
rows_per_mb = 10_000 # ~10k rows per MB of compressed Parquet (adjust to your data)
target_files = max(1, int(row_count / (target_mb * rows_per_mb)))
df.repartition(target_files, partition_col) .write .mode('overwrite') .partitionBy(partition_col) .parquet(output_path)
# ── APPROACH 3: DELTA LAKE AUTO-OPTIMISE ──────────────────────────────────────
# Delta Lake (and Iceberg) have built-in compaction via OPTIMIZE
# Delta Lake:
# OPTIMIZE silver.orders WHERE date = '2026-03-17';
# → Reads all small files in that partition, rewrites as ~1 GB files
# → Does NOT change table content — only file organisation
# → After OPTIMIZE, run VACUUM to remove old small files
# Python API:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, 's3://freshmart-lake/silver/orders')
delta_table.optimize().where("date = '2026-03-17'").executeCompaction()
# Schedule OPTIMIZE to run after every batch write:
# OPTIMIZE runs in ~seconds for small tables, minutes for large ones
# Run daily: OPTIMIZE silver.orders WHERE date >= current_date - 7
# ── APPROACH 4: PREVENT AT STREAM WRITE TIME ──────────────────────────────────
# Spark Structured Streaming: control output file size
streamDf.writeStream .trigger(processingTime='1 hour') .option('maxRecordsPerFile', 500_000) .partitionBy('date') .parquet('s3://freshmart-lake/silver/orders')
# processingTime='1 hour' batches 1 hour of data before writing
# → 24 files/day instead of 288 files/day (12× improvement)
# maxRecordsPerFile limits maximum records per output fileTarget File Sizes — What the Numbers Actually Mean
Every modern storage and compute guide says files should be "128 MB to 1 GB." Where does this range come from? Understanding the reasoning behind it lets you tune for your specific workload rather than applying a rule blindly.
# WHY TOO-SMALL IS BAD (< 32 MB):
# ── Overhead dominates:
# S3 GET request latency: ~10ms
# Parquet footer read: ~5ms
# Spark task scheduling: ~100ms
# For a 1 KB file: overhead (115ms) >> actual read time (<1ms)
# For a 512 MB file: overhead (115ms) << actual read time (~500ms)
# ── Metadata catalog grows large (each file = one catalog entry)
# ── S3 LIST paginated at 1,000 objects — large counts require many API calls
# WHY TOO-LARGE IS BAD (> 2 GB):
# ── Spark cannot split a single file across multiple tasks
# (within a file, tasks map to row groups, not to bytes)
# One 4 GB file → one executor reads all 4 GB serially
# Four 1 GB files → four executors read 1 GB each in parallel
# ── Partial failures waste more work:
# A failed write of 4 GB wastes 4 GB of work
# A failed write of 512 MB wastes only 512 MB
# ── Schema discovery and footer reads take longer on very large files
# THE SWEET SPOT:
# Target: 256 MB to 1 GB per file (compressed Parquet)
# Minimum: 128 MB (below this, overhead ratio becomes significant)
# Maximum: 1–2 GB (above this, single-executor bottleneck emerges)
# PRACTICAL CALIBRATION:
# Run this on a sample partition to understand your row size:
import pyarrow.parquet as pq
metadata = pq.read_metadata('s3://freshmart-lake/silver/orders/date=2026-03-17/part-00001.parquet')
file_size_mb = metadata.serialized_size / (1024 * 1024)
row_count = metadata.num_rows
num_row_groups = metadata.num_row_groups
print(f"File size: ${file_size_mb:.1f} MB")
print(f"Row count: ${row_count:,}")
print(f"Row groups: ${num_row_groups}")
print(f"MB per million rows: ${file_size_mb / row_count * 1_000_000:.1f}")
# Use this last number to calibrate target_files calculations
# ROW GROUP SIZE WITHIN PARQUET FILES:
# Each Parquet file is divided into row groups (typically 100k-1M rows each)
# Row group size affects:
# - Predicate pushdown granularity (each row group has min/max statistics)
# - Memory during write (one row group is buffered in memory during write)
# - Parallelism within file (Spark assigns one task per row group)
# Tuning row groups in PyArrow:
pq.write_table(
table, path,
row_group_size=500_000, # 500k rows per row group
# Smaller row groups: better predicate pushdown, more metadata overhead
# Larger row groups: less metadata overhead, coarser pruning
)Bloom filters — accelerating point lookups in Parquet
# Parquet supports bloom filters on specific columns.
# A bloom filter is a probabilistic data structure that answers:
# "Is value X definitely NOT in this row group?"
# If yes → skip the row group entirely (zero reads)
# If no → maybe (the row group might have it) → read and check
# Without bloom filter:
# "Find order_id = 9284751 in a 500 MB Parquet file"
# Must read every row group and check every value: slow
# With bloom filter:
# Bloom filter answers "9284751 is NOT in row groups 1,3,4,5,7,8,9,10"
# Only row group 2 and 6 need to be read: ~10× faster
# WHEN TO ADD BLOOM FILTERS:
# ✓ UUID or string primary key columns (high cardinality, point lookups common)
# ✓ External ID columns (payment_id, order_id from source systems)
# ✓ Any column where queries do "WHERE column = specific_value"
# ✗ Date columns (already pruned by partition pruning)
# ✗ Low-cardinality columns (status, boolean — min/max stats already handle these)
# Adding bloom filters with PyArrow:
import pyarrow.parquet as pq
pq.write_table(
table,
'output.parquet',
compression='zstd',
write_bloom_filter=True, # enable for all columns
bloom_filter_columns=['payment_id', 'order_id'], # or specify columns
bloom_filter_false_positive_rate=0.05, # 5% false positive rate
# Lower rate → larger bloom filter → better pruning but more memory
)
# Snowflake / BigQuery clustering achieves similar effect via table clustering:
# ALTER TABLE silver.orders CLUSTER BY (date, store_id);
# Snowflake physically co-locates rows with same cluster key values → fast point lookupsFormat Conversion Pipelines — CSV/JSON to Parquet in Production
The most common file operation in data lake Bronze layer is format conversion: raw CSV and JSON files from vendors and APIs become typed, compressed, partitioned Parquet files. This operation seems simple but has dozens of real-world edge cases that must be handled to produce reliable, resumable, production-safe pipelines.
"""
Bronze layer format conversion: CSV landing → Parquet Bronze
Handles: encoding detection, schema inference, type casting,
bad row logging, resumability, S3 output.
"""
import os, json, logging
from pathlib import Path
from datetime import date, datetime, timezone
from typing import Iterator
import chardet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.fs as pafs
log = logging.getLogger('csv_to_parquet')
# ── Schema definition (explicit is better than inferred) ──────────────────────
ORDERS_SCHEMA = pa.schema([
pa.field('order_id', pa.int64(), nullable=False),
pa.field('store_id', pa.string(), nullable=False),
pa.field('customer_id', pa.int64(), nullable=True),
pa.field('amount', pa.decimal128(10, 2), nullable=False),
pa.field('status', pa.string(), nullable=False),
pa.field('created_at', pa.timestamp('us', tz='UTC'), nullable=False),
pa.field('ingested_at', pa.timestamp('us', tz='UTC'), nullable=False),
])
def detect_encoding(filepath: str, sample_bytes: int = 100_000) -> str:
"""Detect file encoding from first N bytes."""
with open(filepath, 'rb') as f:
raw = f.read(sample_bytes)
result = chardet.detect(raw)
encoding = result.get('encoding') or 'utf-8'
confidence = result.get('confidence', 0)
log.info('Detected encoding: ${s} (confidence: ${.0%})', encoding, confidence)
return encoding if confidence > 0.7 else 'utf-8'
def read_csv_chunked(filepath: str, chunk_size: int = 200_000) -> Iterator[pd.DataFrame]:
"""Read CSV in chunks, detecting encoding and handling bad lines."""
encoding = detect_encoding(filepath)
bad_lines_path = filepath.replace('.csv', '_bad_lines.ndjson')
try:
for chunk in pd.read_csv(
filepath,
chunksize = chunk_size,
encoding = encoding,
encoding_errors = 'replace', # replace undecodable bytes with ?
dtype = str, # read all as string first, cast later
na_values = ['', 'NULL', 'null', 'N/A', 'n/a', 'NA', '-'],
keep_default_na = True,
on_bad_lines = 'warn', # log bad lines, do not crash
):
yield chunk
except Exception as e:
log.error('Failed to read CSV ${s}: ${s}', filepath, str(e))
raise
def cast_chunk(chunk: pd.DataFrame, source_date: date) -> pd.DataFrame:
"""Apply type casting and add pipeline metadata columns."""
df = chunk.copy()
# Cast numeric columns:
for col in ['order_id', 'customer_id']:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
if 'amount' in df.columns:
df['amount'] = pd.to_numeric(
df['amount'].str.replace(',', '', regex=False), # remove thousands sep
errors='coerce',
).round(2)
# Cast timestamps:
if 'created_at' in df.columns:
df['created_at'] = pd.to_datetime(
df['created_at'], utc=True, errors='coerce'
)
# Add pipeline metadata:
df['ingested_at'] = pd.Timestamp.now(tz='UTC')
df['source_date'] = source_date.isoformat() # partition column
# Drop rows with NULL primary key (unrecoverable):
bad_mask = df['order_id'].isna()
if bad_mask.any():
log.warning('Dropping ${d} rows with NULL order_id', bad_mask.sum())
df = df[~bad_mask]
return df
def convert_csv_to_parquet(
input_path: str,
output_root: str,
source_date: date,
s3_bucket: str | None = None,
) -> dict:
"""
Convert a CSV file to partitioned Parquet on S3.
Returns stats: {'rows_written', 'rows_rejected', 'files_written'}
"""
stats = {'rows_written': 0, 'rows_rejected': 0, 'files_written': 0}
all_tables = []
for chunk_idx, chunk in enumerate(read_csv_chunked(input_path)):
original_rows = len(chunk)
cast = cast_chunk(chunk, source_date)
dropped = original_rows - len(cast)
if dropped:
stats['rows_rejected'] += dropped
log.warning('Chunk ${d}: dropped ${d} invalid rows', chunk_idx, dropped)
if len(cast) > 0:
try:
table = pa.Table.from_pandas(cast, schema=ORDERS_SCHEMA, safe=False)
all_tables.append(table)
except Exception as e:
log.error('Schema cast failed on chunk ${d}: ${s}', chunk_idx, str(e))
raise
stats['rows_written'] += len(cast)
if not all_tables:
log.warning('No valid rows to write')
return stats
full_table = pa.concat_tables(all_tables)
# Write partitioned Parquet:
output_path = f"${output_root}/orders"
if s3_bucket:
filesystem = pafs.S3FileSystem(region='ap-south-1')
output_path = f"${s3_bucket}/bronze/orders"
else:
filesystem = pafs.LocalFileSystem()
Path(output_path).mkdir(parents=True, exist_ok=True)
pq.write_to_dataset(
full_table,
root_path = output_path,
partition_cols = ['source_date'],
filesystem = filesystem,
compression = 'zstd',
row_group_size = 500_000,
write_statistics = True,
existing_data_behavior = 'overwrite_or_ignore',
)
log.info(
'Conversion complete: ${:,} rows written, ${:,} rejected, source_date=${s}',
stats['rows_written'], stats['rows_rejected'], source_date.isoformat(),
)
return statsJSON-to-Parquet conversion with schema normalisation
import json
from typing import Iterator
import pyarrow as pa
import pyarrow.parquet as pq
def read_ndjson(filepath: str) -> Iterator[dict]:
"""Stream records from NDJSON file one at a time."""
with open(filepath, encoding='utf-8') as f:
for line_num, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
log.warning('Invalid JSON on line ${d}: ${s}', line_num, str(e))
def flatten_order(raw: dict) -> dict:
"""Flatten nested order JSON to a flat dict for Parquet storage."""
return {
'order_id': raw.get('order_id') or raw.get('id'),
'customer_id': raw.get('customer', {}).get('id'),
'customer_city': raw.get('customer', {}).get('address', {}).get('city'),
'restaurant_id': raw.get('restaurant', {}).get('id'),
'restaurant_name': raw.get('restaurant', {}).get('name'),
'order_amount': raw.get('payment', {}).get('amount'),
'payment_method': raw.get('payment', {}).get('method'),
'item_count': len(raw.get('items', [])),
'status': raw.get('status'),
'created_at': raw.get('created_at'),
'promo_code': raw.get('promo_code'),
# Keep full payload for reference (rarely needed fields):
'_raw_items': json.dumps(raw.get('items', [])),
}
def json_to_parquet(
input_path: str,
output_path: str,
batch_size: int = 100_000,
) -> int:
"""Convert NDJSON to Parquet with flattening. Returns total rows written."""
total = 0
batch: list[dict] = []
writer = None
for record in read_ndjson(input_path):
flat = flatten_order(record)
batch.append(flat)
if len(batch) >= batch_size:
df = pd.DataFrame(batch)
table = pa.Table.from_pandas(df)
if writer is None:
writer = pq.ParquetWriter(output_path, table.schema, compression='zstd')
writer.write_table(table)
total += len(batch)
batch = []
# Write final batch:
if batch:
df = pd.DataFrame(batch)
table = pa.Table.from_pandas(df)
if writer is None:
writer = pq.ParquetWriter(output_path, table.schema, compression='zstd')
writer.write_table(table)
total += len(batch)
if writer:
writer.close()
log.info('Wrote ${:,} rows to ${s}', total, output_path)
return totalFile Lifecycle — Retention, Archival, and Cleanup
A data lake without a lifecycle policy is a storage cost that grows indefinitely. Every file written to S3 costs $0.023 per GB per month forever, unless explicitly deleted or transitioned to cheaper storage. Lifecycle management is the operational discipline that keeps storage costs under control without deleting data that is still needed.
# S3 Lifecycle policy (set via AWS Console or Terraform)
# Automatically moves/deletes files based on age
# Terraform example:
resource "aws_s3_bucket_lifecycle_configuration" "freshmart_lake" {
bucket = "freshmart-data-lake"
# Rule 1: Landing zone — raw files only kept 30 days
# After conversion to Bronze, raw files are not needed
rule {
id = "landing-zone-cleanup"
status = "Enabled"
filter { prefix = "landing/" }
# Move to Infrequent Access after 7 days:
transition {
days = 7
storage_class = "STANDARD_IA" # 45% cheaper than Standard
}
# Delete after 30 days:
expiration { days = 30 }
}
# Rule 2: Bronze layer — keep 1 year in Standard, then archive
rule {
id = "bronze-archive"
status = "Enabled"
filter { prefix = "bronze/" }
transition {
days = 90
storage_class = "STANDARD_IA"
}
transition {
days = 365
storage_class = "GLACIER_IR" # Glacier Instant Retrieval ~68% cheaper
}
}
# Rule 3: Silver layer — keep 2 years Standard, then long-term archive
rule {
id = "silver-lifecycle"
status = "Enabled"
filter { prefix = "silver/" }
transition {
days = 180
storage_class = "STANDARD_IA"
}
transition {
days = 730 # 2 years
storage_class = "GLACIER" # Deep Archive — cheapest, 12h retrieval
}
}
# Rule 4: Gold layer — keep hot forever (small, frequently accessed)
# No lifecycle rule needed for Gold — it is usually small
}
# STORAGE COST COMPARISON (per GB/month, US East):
# S3 Standard: $0.023
# S3 Standard-IA: $0.0125 (45% cheaper, 30-day minimum)
# S3 Glacier Instant: $0.004 (83% cheaper, millisecond retrieval)
# S3 Glacier Flexible: $0.0036 (84% cheaper, 3-5 hour retrieval)
# S3 Glacier Deep Archive: $0.00099 (96% cheaper, 12+ hour retrieval)Delta Lake VACUUM — cleaning up old file versions
# Delta Lake writes new Parquet files for every UPDATE, DELETE, and OPTIMIZE.
# Old files are kept (for time travel) but accumulate space costs.
# VACUUM removes files that are no longer needed for time travel.
# Default retention: 7 days (safe minimum — ensures no active transactions use old files)
# VACUUM silently skips files newer than retention threshold
# Run VACUUM after OPTIMIZE to reclaim space from compaction:
-- SQL:
VACUUM silver.orders RETAIN 168 HOURS; -- 168h = 7 days
VACUUM silver.orders RETAIN 720 HOURS; -- 30 days (if time travel is needed that far back)
-- Python API:
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, 's3://freshmart-lake/silver/orders')
dt.vacuum(retentionHours=168)
# VACUUM dry run (shows what WOULD be deleted without deleting):
dt.vacuum(retentionHours=168, dry_run=True)
# WARNING: setting retention to < 7 days requires explicitly disabling the safety check:
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
dt.vacuum(retentionHours=0) # removes ALL old files — only for dev/testing
# SCHEDULE: run VACUUM weekly on all Silver and Gold tables
# Automate with Airflow DAG or Databricks Job:
# 0 3 * * 0 (every Sunday at 3 AM)
# ICEBERG equivalent:
# CALL catalog.system.expire_snapshots(
# table => 'silver.orders',
# older_than => TIMESTAMP '2026-02-17 00:00:00',
# retain_last => 10
# );Practical file inventory — knowing what you have
# Regular file inventory helps catch small file problems early
# and verifies lifecycle policies are working as expected.
import boto3
from collections import defaultdict
from datetime import datetime, timezone
def audit_s3_prefix(bucket: str, prefix: str) -> dict:
"""
Audit S3 prefix: count files, total size, average size, min/max size.
Returns health report for identifying small file problems.
"""
s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')
sizes = []
partition_files = defaultdict(list)
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get('Contents', []):
size_mb = obj['Size'] / (1024 * 1024)
sizes.append(size_mb)
# Group by partition (last directory level)
key_parts = obj['Key'].split('/')
partition = '/'.join(key_parts[:-1])
partition_files[partition].append(size_mb)
if not sizes:
return {'error': 'No files found'}
total_gb = sum(sizes) / 1024
avg_mb = sum(sizes) / len(sizes)
small_files = sum(1 for s in sizes if s < 10) # files < 10 MB
report = {
'total_files': len(sizes),
'total_gb': round(total_gb, 2),
'avg_mb': round(avg_mb, 1),
'min_mb': round(min(sizes), 3),
'max_mb': round(max(sizes), 1),
'small_files': small_files,
'small_pct': round(small_files / len(sizes) * 100, 1),
'partitions': len(partition_files),
}
# Flag health issues:
if small_files / len(sizes) > 0.5:
report['warning'] = f'More than 50% of files are < 10 MB — compaction needed'
if len(sizes) > 100_000:
report['warning'] = 'More than 100k files — metadata listing will be slow'
return report
# Example output:
# {
# 'total_files': 48234,
# 'total_gb': 142.7,
# 'avg_mb': 3.0, ← WAY too small (should be 128–512 MB)
# 'min_mb': 0.002,
# 'max_mb': 12.4,
# 'small_files': 45891,
# 'small_pct': 95.1, ← 95% of files are tiny
# 'warning': 'More than 50% of files are < 10 MB — compaction needed'
# }Diagnosing a Slow Athena Query — and Fixing It with File Engineering
An analyst runs a monthly revenue report every Monday morning. It used to take 90 seconds. This week it took 18 minutes. Nothing changed in the query. You investigate.
# Step 1: Check the Athena query execution details
# In Athena console, click the query → "Execution detail"
# Data scanned: 4.2 TB ← this is the problem signal
# Expected for a 90-day query: ~400 GB
# Step 2: Check partition count in the catalog
aws glue get-partitions --database-name freshmart_silver --table-name events --query 'Partitions | length(@)'
# Returns: 156,420 partitions
# Expected for 3 years of daily data: ~1,095 partitions
# Something created 155,000 extra partitions!
# Step 3: Find the culprit partition key
aws glue get-partitions --database-name freshmart_silver --table-name events --query 'Partitions[0:5].StorageDescriptor.Location'
# s3://freshmart-lake/silver/events/date=2026-03-17/hour=20/minute=14/
# s3://freshmart-lake/silver/events/date=2026-03-17/hour=20/minute=15/
# Found it: the pipeline was accidentally partitioning by MINUTE!
# Counting files:
aws s3 ls s3://freshmart-lake/silver/events/ --recursive | awk '{print $4}' | wc -l
# 4,847,293 files — each file has ~1 KB (one minute of events)
# Step 4: Understand the impact
# Athena: to list all partitions for MSCK REPAIR TABLE → tens of thousands of API calls
# Athena: to execute query → reads 4.2 TB instead of 400 GB (no minute-level pruning benefit)
# Glue catalog: 156k partition entries → slow SHOW PARTITIONS, slow query planning
# Step 5: Fix — rewrite with date-only partitioning
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('s3://freshmart-lake/silver/events/')
# Rewrite with correct partition: date only
df.write .mode('overwrite') .partitionBy('date') .parquet('s3://freshmart-lake/silver/events_v2/')
# Step 6: Run MSCK REPAIR TABLE to update Glue catalog
spark.sql('MSCK REPAIR TABLE freshmart_silver.events_v2')
# After fix:
# Partitions: 1,095 (date-only)
# Files: ~3,285 (3 per day × 1,095 days)
# Avg file size: ~130 MB
# Query time: 94 seconds (was 18 minutes — 11× improvement)
# Data scanned: 412 GB (was 4.2 TB — 10× reduction)The root cause was a single line in the Spark write configuration that added minute as a partition column alongside date. This turned 1,095 daily partitions into 1,576,800 minute partitions — all valid, all correct data, but completely unusable for analytics. Two hours of investigation and a Spark rewrite job fixed it permanently.
5 Interview Questions — With Complete Answers
Errors You Will Hit — And Exactly Why They Happen
🎯 Key Takeaways
- ✓File naming must encode source, entity, date (ISO 8601 UTC), and a unique run identifier. ISO 8601 dates (YYYYMMDD) sort lexicographically in chronological order. Include the pipeline run ID in filenames to trace any file back to the run that created it. Never use mutable names like "latest" or "final."
- ✓Hive-style partitioning (date=2026-03-17/ directories) enables partition pruning — query engines skip entire directories that cannot contain matching rows. A date-filtered query on a date-partitioned table reads 0.3% of data instead of 100%. Partition pruning is the single biggest performance lever in a data lake.
- ✓Choose partition columns with low-to-medium cardinality (date: 365/year, store_id: 10–1,000). Never partition by high-cardinality columns like customer_id — 10 million customers creates 10 million directories, making every operation slower. Each partition should hold at least 100 MB of data.
- ✓For data lake Parquet files, choose ZSTD (better ratio than Snappy at similar speed, modern default) or Snappy (widely supported, fast). Use GZIP only for archival and landing zone files where storage cost matters more than read speed. Never store uncompressed files in production.
- ✓Splittability matters for Spark parallelism. Parquet files are splittable at the row group level regardless of codec. Plain .gz CSV files are not splittable — one executor reads the whole file. Always use Parquet (not gzip CSV) in the analytical layer.
- ✓The small file problem occurs when streaming or micro-batch pipelines create millions of tiny files. Performance impact: S3 LIST API overhead, Spark task scheduling waste, and Parquet footer read overhead dominate actual read time. Target 256 MB to 1 GB per Parquet file.
- ✓Fix small files with compaction: Delta Lake OPTIMIZE, Spark coalesce and overwrite, or PyArrow dataset rewrite. Schedule compaction after every batch write or daily. Prevent small files by batching micro-batches (write hourly not every 5 minutes) and using coalesce before writing.
- ✓Bloom filters on high-cardinality string columns (payment_id, order_id) enable fast point lookups in Parquet by allowing the query engine to skip row groups that definitely do not contain a specific value. Add bloom filters to UUID and external ID columns used in WHERE column = value queries.
- ✓File lifecycle management prevents unbounded storage cost growth. Landing zone files delete after 30 days. Bronze and Silver files transition to Infrequent Access after 90 days and Glacier after 2 years. Delta Lake VACUUM removes old file versions after the time travel retention period (7 days default).
- ✓When an Athena or Spark query is suddenly slow: check data scanned (should match partition size), count files per partition (thousands of tiny files = small file problem), verify format is Parquet not CSV/JSON, confirm partition pruning is firing (no functions on partition columns in WHERE), and check if MSCK REPAIR TABLE needs to be run to register new partitions.
Discussion
0Have a better approach? Found something outdated? Share it — your knowledge helps everyone learning here.