Verified by Garnet Grid

Pipeline Orchestration: Airflow, Dagster & Prefect

Choose and implement data pipeline orchestration. Covers Airflow, Dagster, Prefect, DAG design, task dependencies, error handling, scheduling, and operational best practices.

Pipeline orchestration is the coordination layer that manages when data pipelines run, in what order, how they handle failures, and how you monitor them. Without orchestration, you’re running cron jobs that fail silently, manually retrying scripts, and debugging pipelines by SSH-ing into production servers.


Orchestrator Comparison

FeatureAirflowDagsterPrefect
ArchitectureScheduler + workers + metadata DBDagit UI + daemon + user codeServer + agents + UI
DAG DefinitionPython (operator-based)Python (asset/op-based)Python (flow/task)
ParadigmTask-centric (run this, then that)Asset-centric (produce this data)Flow-centric (define workflow)
TestingDifficult (tightly coupled)First-class (isolated)Good (isolated flows)
UIFunctional, complexModern, intuitiveModern, clean
CommunityLargestGrowing fastMedium
Best forEstablished data teamsData asset managementDeveloper experience

Airflow DAG Patterns

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from datetime import datetime, timedelta

@dag(
    schedule_interval="0 6 * * *",  # Daily at 6 AM UTC
    start_date=datetime(2025, 1, 1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
        "on_failure_callback": slack_alert,
    },
    tags=["commerce", "daily"],
)
def daily_order_pipeline():
    
    @task()
    def extract_orders():
        """Extract orders from source database."""
        orders = query_source_db(
            "SELECT * FROM orders WHERE date = '{{ ds }}'")
        return orders
    
    @task()
    def validate_data(orders):
        """Run data quality checks."""
        assert len(orders) > 0, "No orders found for date"
        assert all(o["amount"] > 0 for o in orders), "Negative amounts detected"
        return orders
    
    @task()
    def transform_orders(orders):
        """Apply business logic transformations."""
        return [enrich_order(o) for o in orders]
    
    @task()
    def load_to_warehouse(transformed):
        """Load to data warehouse."""
        write_to_bigquery(transformed, table="analytics.orders_daily")
    
    raw = extract_orders()
    validated = validate_data(raw)
    transformed = transform_orders(validated)
    load_to_warehouse(transformed)

daily_order_pipeline()

Dagster Asset-Centric Model

from dagster import asset, AssetIn, DailyPartitionsDefinition, Config

partitions = DailyPartitionsDefinition(start_date="2025-01-01")

@asset(
    partitions_def=partitions,
    group_name="commerce",
    description="Raw orders extracted from source database",
)
def raw_orders(context) -> pd.DataFrame:
    partition_date = context.partition_key
    return extract_from_source(date=partition_date)

@asset(
    ins={"raw_orders": AssetIn("raw_orders")},
    partitions_def=partitions,
    group_name="commerce",
)
def cleaned_orders(context, raw_orders: pd.DataFrame) -> pd.DataFrame:
    return (raw_orders
        .dropna(subset=["order_id", "amount"])
        .drop_duplicates(subset=["order_id"]))

@asset(
    ins={"cleaned_orders": AssetIn("cleaned_orders")},
    partitions_def=partitions,
    group_name="commerce",
)
def daily_revenue(context, cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    return (cleaned_orders
        .groupby(["category"])
        .agg(revenue=("amount", "sum"), orders=("order_id", "count"))
        .reset_index())

Error Handling Patterns

PatternWhen to UseImplementation
Retry with backoffTransient failures (API timeouts)retries=3, retry_delay=timedelta(minutes=5)
Dead letter queueBad records that shouldn’t block pipelineRoute failed records to DLQ, continue processing
Circuit breakerUpstream system overwhelmedStop retrying after N failures, alert
Idempotent tasksAny task that might be re-runDesign tasks so re-running produces same result
Partial successSome partitions succeed, others failPartition-aware retry (only reprocess failed)

Anti-Patterns

Anti-PatternProblemFix
God DAGOne 200-task DAG that does everythingSplit into domain-specific DAGs
No idempotencyRe-running creates duplicatesIdempotent write patterns (upsert, replace partition)
Catch-all exception handlingErrors silently swallowedExplicit error handling, alerting, dead letter queues
No partitioningCan’t reprocess a single day without re-running allPartition by date, process incrementally
Manual triggeringRely on humans to start pipelinesSchedule-driven with manual override option

Checklist

  • Orchestrator selected (Airflow, Dagster, Prefect)
  • DAGs designed with clear task boundaries and dependencies
  • Idempotent tasks: safe to re-run without side effects
  • Retry strategy: exponential backoff with max retries
  • Error handling: alerts, dead letter queues, circuit breakers
  • Partitioning: incremental processing by date/key
  • Monitoring: task duration, success rate, SLA tracking
  • Testing: DAG validation, task unit tests
  • Documentation: each DAG has description and owner

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For pipeline orchestration consulting, visit garnetgrid.com. :::

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →