ESC
Type to search guides, tutorials, and reference documentation.
Verified by Garnet Grid

Data Observability

Monitor data pipelines and data quality with the same rigor as application observability. Covers data freshness, volume, schema, lineage, anomaly detection, data SLOs, and the patterns that prevent bad data from reaching downstream consumers.

Data observability applies the principles of application observability — monitoring, alerting, and root cause analysis — to data systems. When a dashboard shows wrong data or a model trains on stale data, data observability tells you what broke, where, and when. It answers: “Is the data fresh? Is it complete? Is it correct?”


Five Pillars

1. Freshness:
   "Is the data up to date?"
   Expected: Table updated every hour
   Alert: No new rows in 3 hours
   
2. Volume:
   "Is the expected amount of data arriving?"
   Expected: ~100,000 rows per day
   Alert: Only 50,000 rows today (50% drop)
   
3. Distribution:
   "Are the values within expected ranges?"
   Expected: order_total between $1 and $10,000
   Alert: 500 orders with total = $0.00
   
4. Schema:
   "Has the structure changed unexpectedly?"
   Expected: 15 columns, specific types
   Alert: New column added, column type changed
   
5. Lineage:
   "Where did this data come from, and what depends on it?"
   Impact: If source_table breaks, which dashboards are affected?

Implementation

class DataObserverAgent:
    def check_freshness(self, table, max_delay_hours=3):
        """Check if table has been updated recently."""
        result = self.db.query(f"""
            SELECT 
                MAX(updated_at) as last_update,
                EXTRACT(EPOCH FROM NOW() - MAX(updated_at)) / 3600 as hours_stale
            FROM {table}
        """)
        
        if result.hours_stale > max_delay_hours:
            self.alert(
                severity="HIGH",
                message=f"{table} is {result.hours_stale:.1f}h stale (max: {max_delay_hours}h)",
                table=table,
            )
    
    def check_volume(self, table, expected_daily_rows, tolerance=0.3):
        """Check if row count is within expected range."""
        result = self.db.query(f"""
            SELECT COUNT(*) as row_count
            FROM {table}
            WHERE created_at >= CURRENT_DATE
        """)
        
        lower_bound = expected_daily_rows * (1 - tolerance)
        upper_bound = expected_daily_rows * (1 + tolerance)
        
        if not (lower_bound <= result.row_count <= upper_bound):
            self.alert(
                severity="HIGH",
                message=f"{table} has {result.row_count} rows today "
                        f"(expected {expected_daily_rows} ±{tolerance*100}%)",
                table=table,
            )
    
    def check_distribution(self, table, column, min_val=None, max_val=None, 
                           max_null_pct=0.05):
        """Check value distribution for anomalies."""
        result = self.db.query(f"""
            SELECT 
                MIN({column}) as min_value,
                MAX({column}) as max_value,
                AVG({column}) as avg_value,
                COUNT(*) FILTER (WHERE {column} IS NULL) * 1.0 / COUNT(*) as null_pct
            FROM {table}
            WHERE created_at >= CURRENT_DATE
        """)
        
        if min_val and result.min_value < min_val:
            self.alert(severity="MEDIUM", 
                      message=f"{table}.{column} min={result.min_value} (expected >={min_val})")
        
        if max_val and result.max_value > max_val:
            self.alert(severity="MEDIUM",
                      message=f"{table}.{column} max={result.max_value} (expected <={max_val})")
        
        if result.null_pct > max_null_pct:
            self.alert(severity="HIGH",
                      message=f"{table}.{column} null rate={result.null_pct:.1%} (max: {max_null_pct:.1%})")

Data SLOs

# Data SLOs: The data equivalent of service SLOs

data_slos:
  orders_table:
    freshness:
      target: "Updated within 1 hour of source"
      measurement: "MAX(updated_at) lag vs source system"
    completeness:
      target: "99.5% of source orders present"
      measurement: "COUNT(orders) / COUNT(source_orders)"
    accuracy:
      target: "99.9% of totals match source"
      measurement: "SUM(our_total) vs SUM(source_total)"
    
  daily_revenue_report:
    freshness:
      target: "Available by 06:00 UTC"
      measurement: "Report generation timestamp"
    completeness:
      target: "All regions represented"
      measurement: "DISTINCT(region) count"

Anti-Patterns

Anti-PatternConsequenceFix
Monitor only pipelines, not dataPipeline succeeds but produces garbageMonitor data quality, not just pipeline status
Alert on every anomalyAlert fatigue, alerts ignoredData SLOs with burn rate alerts
No data lineageCannot determine blast radiusLineage tracking (dbt, Atlan, OpenLineage)
Manual data quality checksInconsistent, forgottenAutomated observability framework
No schema change detectionUpstream breaks downstream silentlySchema registry, contract testing

Data observability is the difference between “the pipeline ran” and “the data is correct.” One is an operational metric; the other is a quality guarantee.

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →