MLOps Pipeline Architecture: From Experiment to Production
Build production-grade ML pipelines. Covers experiment tracking, model versioning, CI/CD for ML, feature stores, model monitoring, and the MLOps maturity model.
87% of ML models never make it to production. The gap isn’t in model quality — it’s in the engineering infrastructure around the model. MLOps bridges that gap by applying DevOps principles to machine learning: version control, automated testing, CI/CD, monitoring, and reproducibility.
The biggest misconception: teams think the hard part is training the model. In reality, training is 10% of the work. The other 90% is data pipelines, validation, serving, monitoring, and retraining — the infrastructure that keeps a model useful after the initial demo.
The MLOps Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Layer │ │ Training │ │ Serving │
│ │ │ Pipeline │ │ Layer │
│ Feature Store │───>│ Experiment │───>│ Model │
│ Data Catalog │ │ Training │ │ Registry │
│ Data Quality │ │ Evaluation │ │ API Gateway │
│ │ │ Validation │ │ Monitoring │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└────────────────────┼────────────────────┘
│
┌──────────────┐
│ Orchestrator │
│ (Airflow / │
│ Kubeflow) │
└──────────────┘
Tool Selection Guide
| Component | Open Source | Managed Service | When to Choose Managed |
|---|---|---|---|
| Experiment tracking | MLflow | W&B, Neptune | Team > 5 data scientists |
| Feature store | Feast | Tecton, Databricks FS | Need real-time features |
| Orchestration | Airflow, Prefect | SageMaker Pipelines | Already on AWS |
| Model serving | FastAPI, Seldon | SageMaker, Vertex AI | Need auto-scaling |
| Monitoring | Evidently, Alibi | Arize, WhyLabs | Need alerting out of box |
Component 1: Experiment Tracking
Track every experiment — parameters, metrics, artifacts, code version.
MLflow Implementation
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("customer-churn-prediction")
with mlflow.start_run(run_name="rf-v3-balanced"):
# Log parameters
params = {
"n_estimators": 200,
"max_depth": 15,
"min_samples_split": 5,
"class_weight": "balanced",
"random_state": 42
}
mlflow.log_params(params)
# Train
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred),
"auc_roc": roc_auc_score(y_test, y_proba),
"train_size": len(X_train),
"test_size": len(X_test)
}
mlflow.log_metrics(metrics)
# Log model artifact
mlflow.sklearn.log_model(model, "model",
registered_model_name="churn-predictor")
# Log feature importance
import matplotlib.pyplot as plt
importances = model.feature_importances_
fig, ax = plt.subplots(figsize=(10, 6))
ax.barh(feature_names, importances)
mlflow.log_figure(fig, "feature_importance.png")
What to Track
| Category | Items | Why |
|---|---|---|
| Parameters | Hyperparams, data version, feature set | Reproducibility |
| Metrics | Accuracy, F1, AUC, latency | Comparison |
| Artifacts | Model file, feature importance plot, confusion matrix | Debugging |
| Tags | Team, use case, data version hash | Organization |
| Environment | Python version, pip freeze, CUDA version | Reproducibility |
Component 2: Feature Store
Centralized repository for ML features — ensures consistency between training and inference.
# Feature definition (using Feast)
# feature_repo/features.py
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta
customer = Entity(
name="customer_id",
value_type=ValueType.STRING,
description="Unique customer identifier"
)
customer_features = FeatureView(
name="customer_features",
entities=["customer_id"],
ttl=timedelta(days=1),
features=[
Feature(name="total_orders_30d", dtype=ValueType.INT64),
Feature(name="avg_order_value", dtype=ValueType.DOUBLE),
Feature(name="days_since_last_order", dtype=ValueType.INT64),
Feature(name="support_tickets_90d", dtype=ValueType.INT64),
Feature(name="login_frequency_7d", dtype=ValueType.DOUBLE),
],
online=True,
source=FileSource(
path="s3://features/customer_features.parquet",
event_timestamp_column="event_timestamp"
)
)
# Training: get historical features
training_df = store.get_historical_features(
entity_df=entity_df,
features=["customer_features:total_orders_30d",
"customer_features:avg_order_value",
"customer_features:days_since_last_order"]
).to_df()
# Inference: get online features (real-time)
online_features = store.get_online_features(
features=["customer_features:total_orders_30d",
"customer_features:avg_order_value"],
entity_rows=[{"customer_id": "cust_123"}]
).to_dict()
Component 3: Model Registry & Versioning
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Promote model through stages
client.transition_model_version_stage(
name="churn-predictor",
version=5,
stage="Staging"
)
# After validation passes
client.transition_model_version_stage(
name="churn-predictor",
version=5,
stage="Production",
archive_existing_versions=True # Auto-archive previous Production version
)
# Load production model
import mlflow.pyfunc
model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
prediction = model.predict(input_data)
Model Lifecycle Stages
| Stage | Purpose | Who Promotes | Guard Rails |
|---|---|---|---|
| None | Initial experiment, not tested | Automatic on log | None |
| Staging | Passed automated tests | CI pipeline | Min accuracy, max latency |
| Production | Serving live traffic | Team lead / approval | A/B test results, rollback plan |
| Archived | Previous version replaced | Automatic | Retained for rollback (30 days) |
Component 4: CI/CD for ML
Training Pipeline (GitHub Actions)
# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline
on:
schedule:
- cron: '0 6 * * 1' # Weekly Monday 6 AM
workflow_dispatch:
inputs:
retrain_reason:
description: 'Reason for retraining'
required: true
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data quality
run: |
python scripts/validate_data.py \
--min-rows 10000 \
--max-null-ratio 0.05 \
--schema config/schema.json
train:
needs: data-validation
runs-on: gpu-runner
steps:
- name: Train model
run: python scripts/train.py --config config/production.yaml
- name: Register model
run: python scripts/register_model.py --stage staging
evaluate:
needs: train
steps:
- name: Run evaluation suite
run: |
python scripts/evaluate.py \
--model-stage staging \
--min-accuracy 0.85 \
--min-auc 0.90 \
--max-latency-ms 50
deploy:
needs: evaluate
if: success()
steps:
- name: Promote to production
run: python scripts/promote_model.py --stage production
- name: Deploy serving endpoint
run: |
kubectl apply -f k8s/model-serving.yaml
kubectl rollout status deployment/churn-model-server
Component 5: Model Serving
FastAPI Model Server
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
import numpy as np
app = FastAPI(title="Churn Prediction API")
model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
class PredictionRequest(BaseModel):
customer_id: str
total_orders_30d: int
avg_order_value: float
days_since_last_order: int
support_tickets_90d: int
login_frequency_7d: float
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
risk_level: str
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
features = np.array([[
request.total_orders_30d,
request.avg_order_value,
request.days_since_last_order,
request.support_tickets_90d,
request.login_frequency_7d
]])
probability = float(model.predict_proba(features)[0][1])
risk_level = "high" if probability > 0.7 else "medium" if probability > 0.4 else "low"
return PredictionResponse(
customer_id=request.customer_id,
churn_probability=round(probability, 4),
risk_level=risk_level,
model_version="v5"
)
@app.get("/health")
async def health():
return {"status": "healthy", "model": "churn-predictor", "version": "v5"}
Component 6: Model Monitoring
Data Drift Detection
from scipy import stats
import numpy as np
def detect_drift(reference_data: np.ndarray, production_data: np.ndarray,
threshold: float = 0.05) -> dict:
"""Kolmogorov-Smirnov test for distribution drift."""
statistic, p_value = stats.ks_2samp(reference_data, production_data)
return {
"statistic": float(statistic),
"p_value": float(p_value),
"drift_detected": p_value < threshold,
"severity": "high" if statistic > 0.2 else "medium" if statistic > 0.1 else "low"
}
# Monitor each feature
for feature in feature_names:
result = detect_drift(
reference_data=training_data[feature].values,
production_data=last_week_data[feature].values
)
if result["drift_detected"]:
alert(f"Data drift detected in {feature}: {result}")
Performance Monitoring Dashboard Metrics
| Metric | Description | Alert Threshold | Check Frequency |
|---|---|---|---|
| Prediction latency (p95) | 95th percentile response time | > 100ms | Real-time |
| Prediction volume | Requests per minute | < 50% of baseline | Hourly |
| Feature drift (KS statistic) | Distribution shift per feature | > 0.15 | Daily |
| Prediction distribution | Output probability distribution | Mean shift > 0.1 | Daily |
| Model accuracy (delayed) | Accuracy on labeled production data | < 0.80 | Weekly |
| Error rate | Failed predictions / total | > 1% | Real-time |
Retraining Triggers
| Trigger | Condition | Action |
|---|---|---|
| Scheduled | Weekly/monthly cron | Retrain on latest data |
| Data drift | KS statistic > 0.15 for any feature | Alert + auto-retrain |
| Performance drop | Accuracy < threshold on labeled data | Alert + manual investigation |
| New data available | Significant new labeled data | Retrain with expanded dataset |
| Concept drift | Business rules or environment changed | Feature re-engineering + retrain |
MLOps Maturity Model
| Level | Description | Capabilities | Time to Reach |
|---|---|---|---|
| 0: Manual | Jupyter notebooks, manual deployment | No versioning, no monitoring | Starting point |
| 1: Automated Training | Scripted training, experiment tracking | MLflow, DVC, manual deployment | 1-2 months |
| 2: CI/CD for ML | Automated testing, model registry | Auto-validation, staged deployment | 3-6 months |
| 3: Full MLOps | Feature store, monitoring, auto-retraining | Data drift detection, A/B testing | 6-12 months |
| 4: Self-Healing | Auto-retraining triggers, auto-rollback | Closed-loop ML lifecycle | 12-18 months |
Implementation Checklist
- Experiment tracking set up (MLflow / W&B / Neptune)
- What to track defined (params, metrics, artifacts, env, tags)
- Model artifacts versioned and registered
- Model lifecycle stages defined (None → Staging → Production → Archived)
- Training pipeline automated (Airflow / Kubeflow / GitHub Actions)
- Data validation gates before training
- Model evaluation suite with minimum quality thresholds
- Feature store implemented (Feast / Tecton / custom)
- Model serving deployed (FastAPI / Seldon / SageMaker)
- Data drift monitoring active (daily checks per feature)
- Prediction monitoring dashboards built
- Retraining triggers configured (scheduled + drift-based)
- Rollback procedure documented and tested
- Team aligned on MLOps maturity roadmap
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For MLOps consulting, visit garnetgrid.com. :::