Pipeline Orchestration: Airflow, Dagster & Prefect
Choose and implement data pipeline orchestration. Covers Airflow, Dagster, Prefect, DAG design, task dependencies, error handling, scheduling, and operational best practices.
Pipeline orchestration is the coordination layer that manages when data pipelines run, in what order, how they handle failures, and how you monitor them. Without orchestration, you’re running cron jobs that fail silently, manually retrying scripts, and debugging pipelines by SSH-ing into production servers.
Orchestrator Comparison
| Feature | Airflow | Dagster | Prefect |
|---|---|---|---|
| Architecture | Scheduler + workers + metadata DB | Dagit UI + daemon + user code | Server + agents + UI |
| DAG Definition | Python (operator-based) | Python (asset/op-based) | Python (flow/task) |
| Paradigm | Task-centric (run this, then that) | Asset-centric (produce this data) | Flow-centric (define workflow) |
| Testing | Difficult (tightly coupled) | First-class (isolated) | Good (isolated flows) |
| UI | Functional, complex | Modern, intuitive | Modern, clean |
| Community | Largest | Growing fast | Medium |
| Best for | Established data teams | Data asset management | Developer experience |
Airflow DAG Patterns
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from datetime import datetime, timedelta
@dag(
schedule_interval="0 6 * * *", # Daily at 6 AM UTC
start_date=datetime(2025, 1, 1),
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"on_failure_callback": slack_alert,
},
tags=["commerce", "daily"],
)
def daily_order_pipeline():
@task()
def extract_orders():
"""Extract orders from source database."""
orders = query_source_db(
"SELECT * FROM orders WHERE date = '{{ ds }}'")
return orders
@task()
def validate_data(orders):
"""Run data quality checks."""
assert len(orders) > 0, "No orders found for date"
assert all(o["amount"] > 0 for o in orders), "Negative amounts detected"
return orders
@task()
def transform_orders(orders):
"""Apply business logic transformations."""
return [enrich_order(o) for o in orders]
@task()
def load_to_warehouse(transformed):
"""Load to data warehouse."""
write_to_bigquery(transformed, table="analytics.orders_daily")
raw = extract_orders()
validated = validate_data(raw)
transformed = transform_orders(validated)
load_to_warehouse(transformed)
daily_order_pipeline()
Dagster Asset-Centric Model
from dagster import asset, AssetIn, DailyPartitionsDefinition, Config
partitions = DailyPartitionsDefinition(start_date="2025-01-01")
@asset(
partitions_def=partitions,
group_name="commerce",
description="Raw orders extracted from source database",
)
def raw_orders(context) -> pd.DataFrame:
partition_date = context.partition_key
return extract_from_source(date=partition_date)
@asset(
ins={"raw_orders": AssetIn("raw_orders")},
partitions_def=partitions,
group_name="commerce",
)
def cleaned_orders(context, raw_orders: pd.DataFrame) -> pd.DataFrame:
return (raw_orders
.dropna(subset=["order_id", "amount"])
.drop_duplicates(subset=["order_id"]))
@asset(
ins={"cleaned_orders": AssetIn("cleaned_orders")},
partitions_def=partitions,
group_name="commerce",
)
def daily_revenue(context, cleaned_orders: pd.DataFrame) -> pd.DataFrame:
return (cleaned_orders
.groupby(["category"])
.agg(revenue=("amount", "sum"), orders=("order_id", "count"))
.reset_index())
Error Handling Patterns
| Pattern | When to Use | Implementation |
|---|---|---|
| Retry with backoff | Transient failures (API timeouts) | retries=3, retry_delay=timedelta(minutes=5) |
| Dead letter queue | Bad records that shouldn’t block pipeline | Route failed records to DLQ, continue processing |
| Circuit breaker | Upstream system overwhelmed | Stop retrying after N failures, alert |
| Idempotent tasks | Any task that might be re-run | Design tasks so re-running produces same result |
| Partial success | Some partitions succeed, others fail | Partition-aware retry (only reprocess failed) |
Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| God DAG | One 200-task DAG that does everything | Split into domain-specific DAGs |
| No idempotency | Re-running creates duplicates | Idempotent write patterns (upsert, replace partition) |
| Catch-all exception handling | Errors silently swallowed | Explicit error handling, alerting, dead letter queues |
| No partitioning | Can’t reprocess a single day without re-running all | Partition by date, process incrementally |
| Manual triggering | Rely on humans to start pipelines | Schedule-driven with manual override option |
Checklist
- Orchestrator selected (Airflow, Dagster, Prefect)
- DAGs designed with clear task boundaries and dependencies
- Idempotent tasks: safe to re-run without side effects
- Retry strategy: exponential backoff with max retries
- Error handling: alerts, dead letter queues, circuit breakers
- Partitioning: incremental processing by date/key
- Monitoring: task duration, success rate, SLA tracking
- Testing: DAG validation, task unit tests
- Documentation: each DAG has description and owner
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For pipeline orchestration consulting, visit garnetgrid.com. :::