Verified by Garnet Grid

Data Lineage & Observability

Track data lineage across pipelines. Covers column-level lineage, OpenLineage, data catalogs, impact analysis, root cause analysis, and building lineage into your data stack.

Data lineage answers the question every data team dreads: “This dashboard number looks wrong — where did this data come from?” Without lineage, debugging data issues requires manually tracing SQL queries, checking pipeline logs, and calling five different teams. With lineage, you can click on a metric and see every table, transformation, and source that contributed to it.


Lineage Types

TypeWhat It TracksUse Case
Table-levelTable A → Table B”What tables feed this dashboard?”
Column-levelColumn A.x → Column B.y”Where does the revenue number come from?”
Job-levelPipeline/DAG dependencies”What breaks if this job fails?”
Row-levelSpecific record provenance”Which source records produced this output?”

Architecture

Data Sources          Transformation         Consumption
┌──────────┐         ┌──────────────┐       ┌──────────┐
│ Postgres  ├────┐    │ dbt          ├──┐    │Dashboard │
│ MySQL     ├────┼───▶│ Spark        │  ├───▶│ML Model  │
│ APIs      ├────┘    │ Airflow      │  │    │Reports   │
└──────────┘         └──────┬───────┘  │    └──────────┘
                            │          │
                    ┌───────▼──────────▼────┐
                    │   Lineage Metadata     │
                    │   (OpenLineage/        │
                    │    DataHub/Atlan)       │
                    └────────────────────────┘

OpenLineage Integration

from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import SqlJobFacet

client = OpenLineageClient(url="http://lineage-api:5000")

# Emit lineage events from your pipeline
def emit_lineage(job_name, inputs, outputs, sql_query=None):
    run = Run(runId=str(uuid4()))
    job = Job(namespace="commerce-pipeline", name=job_name)
    
    event = RunEvent(
        eventType=RunState.COMPLETE,
        run=run,
        job=job,
        inputs=[
            Dataset(namespace="postgres", name=table)
            for table in inputs
        ],
        outputs=[
            Dataset(namespace="warehouse", name=table)
            for table in outputs
        ],
    )
    
    if sql_query:
        event.job.facets["sql"] = SqlJobFacet(query=sql_query)
    
    client.emit(event)

Impact Analysis

def impact_analysis(changed_table, lineage_graph):
    """Find all downstream tables/dashboards affected by a change."""
    
    affected = set()
    queue = [changed_table]
    
    while queue:
        current = queue.pop(0)
        downstream = lineage_graph.get_downstream(current)
        
        for item in downstream:
            if item not in affected:
                affected.add(item)
                queue.append(item)
    
    return {
        "changed": changed_table,
        "affected_tables": [a for a in affected if a.type == "table"],
        "affected_dashboards": [a for a in affected if a.type == "dashboard"],
        "affected_ml_models": [a for a in affected if a.type == "model"],
        "total_affected": len(affected),
    }

Data Catalog Integration

CatalogLineage SupportBest For
DataHubOpenLineage, SQL parsingOpen source, enterprise-ready
AtlanAutomated lineage discoveryManaged, collaboration-focused
OpenMetadataOpenLineage, dbtOpen source, metadata-first
Unity CatalogDatabricks ecosystemDatabricks users
AWS Glue CatalogAWS native servicesAWS-centric data stacks

Anti-Patterns

Anti-PatternProblemFix
Manual lineage documentationStale immediately, nobody maintains itAutomated lineage extraction
Table-level only”Revenue comes from orders” isn’t enoughColumn-level lineage for debugging
No impact analysisSchema changes break unknown downstreamQuery lineage before schema changes
Lineage as nice-to-haveOnly used during incidentsIntegrate into daily workflows (PRs, deployments)

Checklist

  • Lineage tool selected (DataHub, Atlan, OpenMetadata)
  • OpenLineage integration for pipelines (Airflow, Spark, dbt)
  • Column-level lineage available for critical datasets
  • Impact analysis before schema changes
  • Data catalog: all datasets discoverable with lineage
  • Root cause analysis workflow documented
  • Lineage freshness: metadata updated on every pipeline run

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For data lineage consulting, visit garnetgrid.com. :::

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →