Data Quality Engineering
Build data quality into pipelines. Covers quality dimensions, validation frameworks, Great Expectations, dbt tests, data contracts, anomaly detection, and data quality SLAs.
Bad data costs more than no data. A missing column crashes a dashboard. An incorrect join inflates revenue by 10x. A timezone bug causes every international order to appear on the wrong date. Data quality engineering treats data defects as seriously as application bugs — with testing, monitoring, alerting, and incident response.
This guide covers how to build data quality into every layer of your pipeline, not bolt it on as an afterthought.
Data Quality Dimensions
| Dimension | Definition | Example Test |
|---|---|---|
| Completeness | No missing values where required | NOT NULL checks, fill rate > 99% |
| Accuracy | Values reflect reality | Revenue matches source system |
| Consistency | Same values across systems | Customer count matches in CRM and warehouse |
| Timeliness | Data arrives on schedule | Daily pipeline completes by 6 AM |
| Uniqueness | No unwanted duplicates | Primary key uniqueness |
| Validity | Values conform to rules | Email matches regex, age > 0 |
Validation Frameworks
Great Expectations
import great_expectations as gx
context = gx.get_context()
# Define expectations for orders table
validator = context.sources.pandas_default.read_csv("orders.csv")
# Completeness
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("amount")
# Validity
validator.expect_column_values_to_be_between("amount", min_value=0.01, max_value=1000000)
validator.expect_column_values_to_match_regex("email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
validator.expect_column_values_to_be_in_set("status", ["pending", "shipped", "delivered", "cancelled"])
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Statistical
validator.expect_column_mean_to_be_between("amount", min_value=50, max_value=200)
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=100000)
# Run validation
results = validator.validate()
dbt Tests
# schema.yml
models:
- name: orders
description: "Cleaned orders table"
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id
- name: amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0.01
max_value: 1000000
- name: status
tests:
- accepted_values:
values: ['pending', 'shipped', 'delivered', 'cancelled']
tests:
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
- dbt_utils.recency:
datepart: day
field: order_date
interval: 1
Data Quality SLAs
data_quality_sla:
orders_gold:
freshness:
max_delay: "2 hours"
alert_after: "1 hour"
completeness:
required_columns: ["order_id", "customer_id", "amount", "order_date"]
null_rate_threshold: 0.01 # < 1% nulls
uniqueness:
primary_key: "order_id"
duplicate_rate_threshold: 0.0 # Zero duplicates
accuracy:
revenue_variance: 0.02 # Within 2% of source
volume:
min_daily_rows: 5000
max_daily_rows: 50000
anomaly_detection: "z_score > 3"
Anomaly Detection
import numpy as np
def detect_volume_anomaly(table_name, current_count, lookback_days=30):
"""Detect unusual row counts in daily loads."""
historical = get_daily_counts(table_name, days=lookback_days)
mean = np.mean(historical)
std = np.std(historical)
z_score = (current_count - mean) / std if std > 0 else 0
alert = None
if abs(z_score) > 3:
alert = {
"severity": "critical",
"message": f"{table_name}: {current_count:,} rows (expected ~{mean:,.0f} ± {std:,.0f})",
"z_score": round(z_score, 2),
}
elif abs(z_score) > 2:
alert = {
"severity": "warning",
"message": f"{table_name}: {current_count:,} rows (z-score: {z_score:.1f})",
}
return alert
Data Quality Pipeline
Source Data
↓
┌── Schema Validation ──────────┐
│ Column types, nullable, format │
└───────────────────────────────┘
↓ (fail → reject batch)
┌── Row-Level Validation ───────┐
│ Range checks, regex, referential│
└───────────────────────────────┘
↓ (fail → quarantine rows)
┌── Statistical Validation ─────┐
│ Volume, distribution, freshness │
└───────────────────────────────┘
↓ (fail → alert, continue)
┌── Cross-System Reconciliation ┐
│ Row counts, totals vs source │
└───────────────────────────────┘
↓
Clean Data (with quality score)
Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Quality checks only in production | Bad data discovered too late | Test in staging with production-like data |
| Boolean quality (pass/fail) | No nuance about how bad the data is | Quality scores (0-100) per dimension |
| No quarantine | Bad rows either block pipeline or corrupt output | Quarantine bad rows separately, process good ones |
| Testing only structure | Schema is valid but values are wrong | Statistical tests: distribution, volume, freshness |
| Manual investigation | Every alert requires a human to investigate | Automated root cause analysis + runbooks |
| No baseline | Can’t tell if today’s quality is normal | Establish baselines, detect deviations |
Checklist
- Quality dimensions defined per dataset
- Validation framework integrated (Great Expectations, dbt tests, Soda)
- Schema validation: column types, nullable, format
- Row-level validation: ranges, referential integrity, business rules
- Statistical validation: volume anomaly detection, distribution shifts
- Freshness monitoring: SLA on data arrival time
- Quality scores: quantitative score per dataset per dimension
- Quarantine: bad rows isolated, not mixed with good data
- Alerting: quality failures trigger PagerDuty/Slack alerts
- Dashboards: data quality trends visible to stakeholders
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For data quality consulting, visit garnetgrid.com. :::