Feature Stores for ML Pipelines
Design and operate feature stores for machine learning. Covers feature engineering, online/offline serving, consistency, versioning, and integration with training and inference pipelines.
Feature stores solve the most persistent engineering problem in ML: the gap between how features are computed in training and how they’re served in production. When a data scientist computes features in a Jupyter notebook using a batch SQL query, and the production system recomputes them with a streaming pipeline, the logic diverges. Predictions drift. Bugs hide. This training-serving skew is the silent killer of ML systems.
A feature store provides a single source of truth for feature definitions, computation, storage, and serving — ensuring that the features used to train a model are exactly the features served in production.
Architecture
Data Sources (Events, DBs, APIs)
↓
┌────────────────────────┐
│ Feature Transformation │ ← Define once, run everywhere
│ (SQL, Python, Spark) │
└────────────────────────┘
↓
┌─────────────┬──────────────┐
│ Offline Store│ Online Store │
│ (Data Lake) │ (Redis/DDB) │
│ Historical │ Low-latency │
│ for training │ for inference │
└─────────────┴──────────────┘
↓ ↓
Model Training Model Serving
Online vs Offline Serving
| Aspect | Offline Store | Online Store |
|---|---|---|
| Latency | Seconds to minutes | < 10ms |
| Volume | Historical (months/years) | Latest values only |
| Use Case | Training, batch scoring | Real-time inference |
| Storage | S3, BigQuery, Snowflake | Redis, DynamoDB, Cassandra |
| Update | Batch (hourly/daily) | Streaming (real-time) |
Feature Engineering Patterns
Point-in-Time Correctness
The #1 mistake in ML pipelines: using future data to compute features for past events (data leakage).
# WRONG: Uses all data including future
user_purchase_count = df.groupby("user_id")["amount"].count()
# RIGHT: Point-in-time correct
def compute_features_at_time(events, entity_id, event_time):
"""Only use data available BEFORE the prediction time."""
historical = events[
(events["user_id"] == entity_id) &
(events["timestamp"] < event_time)
]
return {
"purchase_count_30d": len(historical[
historical["timestamp"] > event_time - timedelta(days=30)
]),
"avg_order_value_90d": historical[
historical["timestamp"] > event_time - timedelta(days=90)
]["amount"].mean(),
"days_since_last_purchase": (
event_time - historical["timestamp"].max()
).days if len(historical) > 0 else -1,
}
Common Feature Types
# Aggregation features
"user_purchase_count_7d": count(purchases, window=7d)
"user_avg_order_value_30d": avg(order_value, window=30d)
"user_max_order_value_90d": max(order_value, window=90d)
# Recency features
"days_since_last_login": now() - max(login_timestamp)
"days_since_first_purchase": now() - min(purchase_timestamp)
# Ratio features
"return_rate_30d": count(returns, 30d) / count(purchases, 30d)
"weekend_purchase_ratio": count(weekend_purchases) / count(all_purchases)
# Categorical encodings
"user_segment": categorical(lifetime_value, bins=[0, 100, 500, float('inf')])
"preferred_category": mode(purchase_category, window=90d)
Feast Implementation
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.infra.offline_stores.bigquery_source import BigQuerySource
from datetime import timedelta
# Define entity
customer = Entity(
name="customer_id",
value_type=ValueType.STRING,
description="Unique customer identifier",
)
# Define data source
customer_stats_source = BigQuerySource(
table="ml_features.customer_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Define feature view
customer_features = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=1),
schema=[
Feature(name="purchase_count_30d", dtype=ValueType.INT64),
Feature(name="avg_order_value_90d", dtype=ValueType.DOUBLE),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
Feature(name="return_rate_30d", dtype=ValueType.DOUBLE),
Feature(name="lifetime_value", dtype=ValueType.DOUBLE),
],
online=True,
source=customer_stats_source,
)
# Training: get point-in-time correct features
training_df = store.get_historical_features(
entity_df=training_entities, # customer_id + event_timestamp
features=[
"customer_features:purchase_count_30d",
"customer_features:avg_order_value_90d",
"customer_features:return_rate_30d",
],
).to_df()
# Inference: get latest features
online_features = store.get_online_features(
features=[
"customer_features:purchase_count_30d",
"customer_features:avg_order_value_90d",
],
entity_rows=[{"customer_id": "C-12345"}],
).to_dict()
Feature Store Selection
| Platform | Type | Best For |
|---|---|---|
| Feast | Open source | Teams wanting full control |
| Tecton | Managed SaaS | Enterprise, real-time features |
| Databricks Feature Store | Integrated | Databricks-centric teams |
| SageMaker Feature Store | AWS managed | AWS-native ML pipelines |
| Vertex AI Feature Store | GCP managed | GCP-native ML pipelines |
| Hopsworks | Open/managed | Streaming feature engineering |
Feature Monitoring
def monitor_feature_health(feature_name, current_batch, reference_stats):
checks = {
"null_rate": current_batch[feature_name].isna().mean(),
"mean": current_batch[feature_name].mean(),
"std": current_batch[feature_name].std(),
"min": current_batch[feature_name].min(),
"max": current_batch[feature_name].max(),
}
alerts = []
if checks["null_rate"] > reference_stats["null_rate"] * 2:
alerts.append(f"Null rate spike: {checks['null_rate']:.2%}")
if abs(checks["mean"] - reference_stats["mean"]) > 3 * reference_stats["std"]:
alerts.append(f"Mean shifted: {checks['mean']:.2f} vs {reference_stats['mean']:.2f}")
return {"feature": feature_name, "checks": checks, "alerts": alerts}
Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Training/serving skew | Different feature code in notebook vs production | Single feature definition, serve from feature store |
| No point-in-time | Future data leaks into training features | Enforce timestamp-based feature retrieval |
| Feature sprawl | Thousands of unused features stored | Track feature usage, deprecate unused after 90 days |
| No versioning | Feature logic changes break models | Version feature definitions, tie to model versions |
| Compute in serving path | Complex features computed at inference time | Pre-compute and cache in online store |
Checklist
- Feature store selected (Feast, Tecton, managed)
- Entity definitions created for all ML entities
- Feature views defined with TTL and schema
- Point-in-time correctness validated for historical retrieval
- Online store configured for low-latency serving
- Feature monitoring: null rates, distribution shifts, staleness
- Feature versioning and deprecation policy established
- Training/serving parity tested and validated
- Feature documentation: business meaning, computation logic, owner
- Access control: sensitive features restricted by team/role
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For ML platform consulting, visit garnetgrid.com. :::