Verified by Garnet Grid

MLOps Pipeline Architecture: From Experiment to Production

Build production-grade ML pipelines. Covers experiment tracking, model versioning, CI/CD for ML, feature stores, model monitoring, and the MLOps maturity model.

87% of ML models never make it to production. The gap isn’t in model quality — it’s in the engineering infrastructure around the model. MLOps bridges that gap by applying DevOps principles to machine learning: version control, automated testing, CI/CD, monitoring, and reproducibility.

The biggest misconception: teams think the hard part is training the model. In reality, training is 10% of the work. The other 90% is data pipelines, validation, serving, monitoring, and retraining — the infrastructure that keeps a model useful after the initial demo.


The MLOps Architecture

┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│   Data Layer  │    │  Training    │    │  Serving     │
│               │    │  Pipeline    │    │  Layer       │
│ Feature Store │───>│ Experiment   │───>│ Model        │
│ Data Catalog  │    │ Training     │    │ Registry     │
│ Data Quality  │    │ Evaluation   │    │ API Gateway  │
│               │    │ Validation   │    │ Monitoring   │
└──────────────┘    └──────────────┘    └──────────────┘
       │                    │                    │
       └────────────────────┼────────────────────┘

                    ┌──────────────┐
                    │  Orchestrator │
                    │  (Airflow /   │
                    │   Kubeflow)   │
                    └──────────────┘

Tool Selection Guide

ComponentOpen SourceManaged ServiceWhen to Choose Managed
Experiment trackingMLflowW&B, NeptuneTeam > 5 data scientists
Feature storeFeastTecton, Databricks FSNeed real-time features
OrchestrationAirflow, PrefectSageMaker PipelinesAlready on AWS
Model servingFastAPI, SeldonSageMaker, Vertex AINeed auto-scaling
MonitoringEvidently, AlibiArize, WhyLabsNeed alerting out of box

Component 1: Experiment Tracking

Track every experiment — parameters, metrics, artifacts, code version.

MLflow Implementation

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("customer-churn-prediction")

with mlflow.start_run(run_name="rf-v3-balanced"):
    # Log parameters
    params = {
        "n_estimators": 200,
        "max_depth": 15,
        "min_samples_split": 5,
        "class_weight": "balanced",
        "random_state": 42
    }
    mlflow.log_params(params)

    # Train
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    y_proba = model.predict_proba(X_test)[:, 1]

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred),
        "auc_roc": roc_auc_score(y_test, y_proba),
        "train_size": len(X_train),
        "test_size": len(X_test)
    }
    mlflow.log_metrics(metrics)

    # Log model artifact
    mlflow.sklearn.log_model(model, "model",
        registered_model_name="churn-predictor")

    # Log feature importance
    import matplotlib.pyplot as plt
    importances = model.feature_importances_
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.barh(feature_names, importances)
    mlflow.log_figure(fig, "feature_importance.png")

What to Track

CategoryItemsWhy
ParametersHyperparams, data version, feature setReproducibility
MetricsAccuracy, F1, AUC, latencyComparison
ArtifactsModel file, feature importance plot, confusion matrixDebugging
TagsTeam, use case, data version hashOrganization
EnvironmentPython version, pip freeze, CUDA versionReproducibility

Component 2: Feature Store

Centralized repository for ML features — ensures consistency between training and inference.

# Feature definition (using Feast)
# feature_repo/features.py

from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta

customer = Entity(
    name="customer_id",
    value_type=ValueType.STRING,
    description="Unique customer identifier"
)

customer_features = FeatureView(
    name="customer_features",
    entities=["customer_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="total_orders_30d", dtype=ValueType.INT64),
        Feature(name="avg_order_value", dtype=ValueType.DOUBLE),
        Feature(name="days_since_last_order", dtype=ValueType.INT64),
        Feature(name="support_tickets_90d", dtype=ValueType.INT64),
        Feature(name="login_frequency_7d", dtype=ValueType.DOUBLE),
    ],
    online=True,
    source=FileSource(
        path="s3://features/customer_features.parquet",
        event_timestamp_column="event_timestamp"
    )
)

# Training: get historical features
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["customer_features:total_orders_30d",
              "customer_features:avg_order_value",
              "customer_features:days_since_last_order"]
).to_df()

# Inference: get online features (real-time)
online_features = store.get_online_features(
    features=["customer_features:total_orders_30d",
              "customer_features:avg_order_value"],
    entity_rows=[{"customer_id": "cust_123"}]
).to_dict()

Component 3: Model Registry & Versioning

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Promote model through stages
client.transition_model_version_stage(
    name="churn-predictor",
    version=5,
    stage="Staging"
)

# After validation passes
client.transition_model_version_stage(
    name="churn-predictor",
    version=5,
    stage="Production",
    archive_existing_versions=True  # Auto-archive previous Production version
)

# Load production model
import mlflow.pyfunc
model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
prediction = model.predict(input_data)

Model Lifecycle Stages

StagePurposeWho PromotesGuard Rails
NoneInitial experiment, not testedAutomatic on logNone
StagingPassed automated testsCI pipelineMin accuracy, max latency
ProductionServing live trafficTeam lead / approvalA/B test results, rollback plan
ArchivedPrevious version replacedAutomaticRetained for rollback (30 days)

Component 4: CI/CD for ML

Training Pipeline (GitHub Actions)

# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline

on:
  schedule:
    - cron: '0 6 * * 1'  # Weekly Monday 6 AM
  workflow_dispatch:
    inputs:
      retrain_reason:
        description: 'Reason for retraining'
        required: true

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate data quality
        run: |
          python scripts/validate_data.py \
            --min-rows 10000 \
            --max-null-ratio 0.05 \
            --schema config/schema.json

  train:
    needs: data-validation
    runs-on: gpu-runner
    steps:
      - name: Train model
        run: python scripts/train.py --config config/production.yaml
      - name: Register model
        run: python scripts/register_model.py --stage staging

  evaluate:
    needs: train
    steps:
      - name: Run evaluation suite
        run: |
          python scripts/evaluate.py \
            --model-stage staging \
            --min-accuracy 0.85 \
            --min-auc 0.90 \
            --max-latency-ms 50

  deploy:
    needs: evaluate
    if: success()
    steps:
      - name: Promote to production
        run: python scripts/promote_model.py --stage production
      - name: Deploy serving endpoint
        run: |
          kubectl apply -f k8s/model-serving.yaml
          kubectl rollout status deployment/churn-model-server

Component 5: Model Serving

FastAPI Model Server

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
import numpy as np

app = FastAPI(title="Churn Prediction API")
model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")

class PredictionRequest(BaseModel):
    customer_id: str
    total_orders_30d: int
    avg_order_value: float
    days_since_last_order: int
    support_tickets_90d: int
    login_frequency_7d: float

class PredictionResponse(BaseModel):
    customer_id: str
    churn_probability: float
    risk_level: str
    model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    features = np.array([[
        request.total_orders_30d,
        request.avg_order_value,
        request.days_since_last_order,
        request.support_tickets_90d,
        request.login_frequency_7d
    ]])

    probability = float(model.predict_proba(features)[0][1])
    risk_level = "high" if probability > 0.7 else "medium" if probability > 0.4 else "low"

    return PredictionResponse(
        customer_id=request.customer_id,
        churn_probability=round(probability, 4),
        risk_level=risk_level,
        model_version="v5"
    )

@app.get("/health")
async def health():
    return {"status": "healthy", "model": "churn-predictor", "version": "v5"}

Component 6: Model Monitoring

Data Drift Detection

from scipy import stats
import numpy as np

def detect_drift(reference_data: np.ndarray, production_data: np.ndarray,
                 threshold: float = 0.05) -> dict:
    """Kolmogorov-Smirnov test for distribution drift."""
    statistic, p_value = stats.ks_2samp(reference_data, production_data)

    return {
        "statistic": float(statistic),
        "p_value": float(p_value),
        "drift_detected": p_value < threshold,
        "severity": "high" if statistic > 0.2 else "medium" if statistic > 0.1 else "low"
    }

# Monitor each feature
for feature in feature_names:
    result = detect_drift(
        reference_data=training_data[feature].values,
        production_data=last_week_data[feature].values
    )
    if result["drift_detected"]:
        alert(f"Data drift detected in {feature}: {result}")

Performance Monitoring Dashboard Metrics

MetricDescriptionAlert ThresholdCheck Frequency
Prediction latency (p95)95th percentile response time> 100msReal-time
Prediction volumeRequests per minute< 50% of baselineHourly
Feature drift (KS statistic)Distribution shift per feature> 0.15Daily
Prediction distributionOutput probability distributionMean shift > 0.1Daily
Model accuracy (delayed)Accuracy on labeled production data< 0.80Weekly
Error rateFailed predictions / total> 1%Real-time

Retraining Triggers

TriggerConditionAction
ScheduledWeekly/monthly cronRetrain on latest data
Data driftKS statistic > 0.15 for any featureAlert + auto-retrain
Performance dropAccuracy < threshold on labeled dataAlert + manual investigation
New data availableSignificant new labeled dataRetrain with expanded dataset
Concept driftBusiness rules or environment changedFeature re-engineering + retrain

MLOps Maturity Model

LevelDescriptionCapabilitiesTime to Reach
0: ManualJupyter notebooks, manual deploymentNo versioning, no monitoringStarting point
1: Automated TrainingScripted training, experiment trackingMLflow, DVC, manual deployment1-2 months
2: CI/CD for MLAutomated testing, model registryAuto-validation, staged deployment3-6 months
3: Full MLOpsFeature store, monitoring, auto-retrainingData drift detection, A/B testing6-12 months
4: Self-HealingAuto-retraining triggers, auto-rollbackClosed-loop ML lifecycle12-18 months

Implementation Checklist

  • Experiment tracking set up (MLflow / W&B / Neptune)
  • What to track defined (params, metrics, artifacts, env, tags)
  • Model artifacts versioned and registered
  • Model lifecycle stages defined (None → Staging → Production → Archived)
  • Training pipeline automated (Airflow / Kubeflow / GitHub Actions)
  • Data validation gates before training
  • Model evaluation suite with minimum quality thresholds
  • Feature store implemented (Feast / Tecton / custom)
  • Model serving deployed (FastAPI / Seldon / SageMaker)
  • Data drift monitoring active (daily checks per feature)
  • Prediction monitoring dashboards built
  • Retraining triggers configured (scheduled + drift-based)
  • Rollback procedure documented and tested
  • Team aligned on MLOps maturity roadmap

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For MLOps consulting, visit garnetgrid.com. :::

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →