Data Observability
Monitor data pipelines and data quality with the same rigor as application observability. Covers data freshness, volume, schema, lineage, anomaly detection, data SLOs, and the patterns that prevent bad data from reaching downstream consumers.
Data observability applies the principles of application observability — monitoring, alerting, and root cause analysis — to data systems. When a dashboard shows wrong data or a model trains on stale data, data observability tells you what broke, where, and when. It answers: “Is the data fresh? Is it complete? Is it correct?”
Five Pillars
1. Freshness:
"Is the data up to date?"
Expected: Table updated every hour
Alert: No new rows in 3 hours
2. Volume:
"Is the expected amount of data arriving?"
Expected: ~100,000 rows per day
Alert: Only 50,000 rows today (50% drop)
3. Distribution:
"Are the values within expected ranges?"
Expected: order_total between $1 and $10,000
Alert: 500 orders with total = $0.00
4. Schema:
"Has the structure changed unexpectedly?"
Expected: 15 columns, specific types
Alert: New column added, column type changed
5. Lineage:
"Where did this data come from, and what depends on it?"
Impact: If source_table breaks, which dashboards are affected?
Implementation
class DataObserverAgent:
def check_freshness(self, table, max_delay_hours=3):
"""Check if table has been updated recently."""
result = self.db.query(f"""
SELECT
MAX(updated_at) as last_update,
EXTRACT(EPOCH FROM NOW() - MAX(updated_at)) / 3600 as hours_stale
FROM {table}
""")
if result.hours_stale > max_delay_hours:
self.alert(
severity="HIGH",
message=f"{table} is {result.hours_stale:.1f}h stale (max: {max_delay_hours}h)",
table=table,
)
def check_volume(self, table, expected_daily_rows, tolerance=0.3):
"""Check if row count is within expected range."""
result = self.db.query(f"""
SELECT COUNT(*) as row_count
FROM {table}
WHERE created_at >= CURRENT_DATE
""")
lower_bound = expected_daily_rows * (1 - tolerance)
upper_bound = expected_daily_rows * (1 + tolerance)
if not (lower_bound <= result.row_count <= upper_bound):
self.alert(
severity="HIGH",
message=f"{table} has {result.row_count} rows today "
f"(expected {expected_daily_rows} ±{tolerance*100}%)",
table=table,
)
def check_distribution(self, table, column, min_val=None, max_val=None,
max_null_pct=0.05):
"""Check value distribution for anomalies."""
result = self.db.query(f"""
SELECT
MIN({column}) as min_value,
MAX({column}) as max_value,
AVG({column}) as avg_value,
COUNT(*) FILTER (WHERE {column} IS NULL) * 1.0 / COUNT(*) as null_pct
FROM {table}
WHERE created_at >= CURRENT_DATE
""")
if min_val and result.min_value < min_val:
self.alert(severity="MEDIUM",
message=f"{table}.{column} min={result.min_value} (expected >={min_val})")
if max_val and result.max_value > max_val:
self.alert(severity="MEDIUM",
message=f"{table}.{column} max={result.max_value} (expected <={max_val})")
if result.null_pct > max_null_pct:
self.alert(severity="HIGH",
message=f"{table}.{column} null rate={result.null_pct:.1%} (max: {max_null_pct:.1%})")
Data SLOs
# Data SLOs: The data equivalent of service SLOs
data_slos:
orders_table:
freshness:
target: "Updated within 1 hour of source"
measurement: "MAX(updated_at) lag vs source system"
completeness:
target: "99.5% of source orders present"
measurement: "COUNT(orders) / COUNT(source_orders)"
accuracy:
target: "99.9% of totals match source"
measurement: "SUM(our_total) vs SUM(source_total)"
daily_revenue_report:
freshness:
target: "Available by 06:00 UTC"
measurement: "Report generation timestamp"
completeness:
target: "All regions represented"
measurement: "DISTINCT(region) count"
Anti-Patterns
| Anti-Pattern | Consequence | Fix |
|---|---|---|
| Monitor only pipelines, not data | Pipeline succeeds but produces garbage | Monitor data quality, not just pipeline status |
| Alert on every anomaly | Alert fatigue, alerts ignored | Data SLOs with burn rate alerts |
| No data lineage | Cannot determine blast radius | Lineage tracking (dbt, Atlan, OpenLineage) |
| Manual data quality checks | Inconsistent, forgotten | Automated observability framework |
| No schema change detection | Upstream breaks downstream silently | Schema registry, contract testing |
Data observability is the difference between “the pipeline ran” and “the data is correct.” One is an operational metric; the other is a quality guarantee.