MLOps Pipeline Engineering
Build production ML pipelines that automate training, evaluation, deployment, and monitoring. Covers feature stores, model registries, training pipelines, A/B testing for models, model drift detection, and the patterns that make ML repeatable and reliable.
MLOps is to machine learning what DevOps is to software engineering. Without MLOps, ML models are built in notebooks, deployed manually, monitored rarely, and retrained never. With MLOps, the entire lifecycle — from data to training to deployment to monitoring — is automated, reproducible, and observable.
ML Lifecycle
Traditional ML:
Data Scientist: Trains model in Jupyter notebook
Data Scientist: "Here's model.pkl, please deploy it"
Engineer: Wraps in Flask, deploys manually
Nobody: Monitors model performance
3 months later: Model accuracy has degraded
Nobody: Retrains
MLOps ML:
1. Data Pipeline: Automated feature extraction
2. Training Pipeline: Triggered by new data or schedule
3. Evaluation: Automated metrics + fairness checks
4. Registry: Versioned model stored with metadata
5. Deployment: Canary rollout with automated analysis
6. Monitoring: Real-time drift + performance tracking
7. Retraining: Triggered by drift or schedule
Feature Store
# Feature Store: Single source of truth for features
class FeatureStore:
"""Serve consistent features for training and inference."""
def define_feature(self, name, transform, source):
"""Register a feature definition."""
return FeatureDefinition(
name=name,
transform=transform,
source=source,
# Offline store: For training (batch)
offline_store="s3://features/{name}/",
# Online store: For inference (low-latency)
online_store="redis://features",
)
# Training: Get historical features (batch)
def get_training_data(self, features, entity_ids, start_date, end_date):
"""Point-in-time correct feature retrieval for training."""
return self.offline_store.query(
features=features,
entities=entity_ids,
timestamp_range=(start_date, end_date),
# Point-in-time: Only use features available AT each timestamp
# Prevents data leakage from the future
)
# Inference: Get latest features (online)
def get_online_features(self, features, entity_id):
"""Low-latency feature retrieval for real-time inference."""
return self.online_store.get(
features=features,
entity_id=entity_id,
)
# Response time: < 10ms
Training Pipeline
# Automated training pipeline (Kubeflow/Airflow)
class TrainingPipeline:
def run(self, config):
"""End-to-end automated training."""
# 1. Fetch training data
data = self.feature_store.get_training_data(
features=config.features,
start_date=config.training_window_start,
end_date=config.training_window_end,
)
# 2. Train model
model = self.train(data, config.hyperparameters)
# 3. Evaluate
metrics = self.evaluate(model, data.test_split)
# 4. Validate against minimum thresholds
assert metrics["accuracy"] >= config.min_accuracy, "Accuracy below threshold"
assert metrics["auc_roc"] >= config.min_auc, "AUC below threshold"
# 5. Register in model registry
version = self.model_registry.register(
model=model,
metrics=metrics,
training_data_hash=data.hash,
config=config,
)
# 6. Deploy (canary)
if config.auto_deploy and self.is_better_than_current(metrics):
self.deploy_canary(version, traffic_pct=5)
return version
Model Drift Detection
class DriftDetector:
"""Detect when model performance degrades."""
def check_data_drift(self, reference_data, current_data):
"""Statistical test for input feature distribution change."""
from scipy.stats import ks_2samp
drift_results = {}
for feature in reference_data.columns:
stat, p_value = ks_2samp(
reference_data[feature],
current_data[feature],
)
drift_results[feature] = {
"statistic": stat,
"p_value": p_value,
"drifted": p_value < 0.05,
}
return drift_results
def check_prediction_drift(self, reference_predictions, current_predictions):
"""Monitor if prediction distribution has shifted."""
ref_mean = reference_predictions.mean()
cur_mean = current_predictions.mean()
return {
"reference_mean": ref_mean,
"current_mean": cur_mean,
"pct_change": abs(cur_mean - ref_mean) / ref_mean * 100,
"alert": abs(cur_mean - ref_mean) / ref_mean > 0.10,
}
Anti-Patterns
| Anti-Pattern | Consequence | Fix |
|---|---|---|
| Notebook-to-production | Unreproducible, untestable | Training pipeline with version control |
| No model versioning | Cannot roll back, cannot audit | Model registry with metadata |
| Training/serving skew | Features differ at training vs inference | Feature store for consistency |
| No monitoring | Silent model degradation | Drift detection + performance tracking |
| Manual retraining | Model rots for months | Automated retraining triggers |
MLOps is not about tools — it is about treating ML models as production systems that need the same rigor as any other critical service: automation, monitoring, rollback, and continuous improvement.