Verified by Garnet Grid

LLM Guardrails & Safety Architecture

Build production-grade LLM safety systems. Covers input validation, output filtering, content classifiers, PII detection, prompt injection defense, rate limiting, and incident response.

Every LLM deployed in production is a potential liability. Without guardrails, models will happily generate harmful content, leak private information, follow injected instructions, and produce outputs that violate your company’s policies. Guardrails are not optional safety theater — they’re the engineering controls that make the difference between a useful AI system and a lawsuit waiting to happen.

This guide covers the architecture of production-grade LLM safety: input validation, output filtering, content classification, PII detection, prompt injection defense, and the operational infrastructure needed to monitor and respond to safety incidents in real time.


Guardrail Architecture Overview

User Input

┌──────────────────────┐
│  INPUT GUARDRAILS     │
│  • PII Detection      │
│  • Prompt Injection   │
│  • Topic Restriction  │
│  • Rate Limiting      │
│  • Input Sanitization │
└──────────────────────┘
    ↓ (pass/block)
┌──────────────────────┐
│  LLM PROCESSING       │
│  • System Prompt       │
│  • Context Injection   │
│  • Model Call          │
└──────────────────────┘

┌──────────────────────┐
│  OUTPUT GUARDRAILS     │
│  • Content Safety      │
│  • Factual Grounding   │
│  • PII Scrubbing       │
│  • Format Validation   │
│  • Toxicity Detection  │
└──────────────────────┘
    ↓ (pass/block/modify)
User Output

Input Guardrails

PII Detection

Stop sensitive data from reaching the LLM in the first place:

import re
from presidio_analyzer import AnalyzerEngine

analyzer = AnalyzerEngine()

def detect_and_redact_pii(text):
    """Detect PII and replace with placeholders before sending to LLM."""
    results = analyzer.analyze(
        text=text,
        language="en",
        entities=[
            "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
            "CREDIT_CARD", "US_SSN", "IBAN_CODE",
            "IP_ADDRESS", "LOCATION", "DATE_TIME",
        ],
        score_threshold=0.7,
    )
    
    # Sort by start position (reverse) to preserve indices during replacement
    results.sort(key=lambda x: x.start, reverse=True)
    
    redacted = text
    pii_map = {}  # Store for re-insertion after LLM response
    
    for result in results:
        placeholder = f"[{result.entity_type}_{result.start}]"
        original = text[result.start:result.end]
        pii_map[placeholder] = original
        redacted = redacted[:result.start] + placeholder + redacted[result.end:]
    
    return redacted, pii_map

def restore_pii(response, pii_map):
    """Re-insert original PII into the response (for internal use only)."""
    restored = response
    for placeholder, original in pii_map.items():
        restored = restored.replace(placeholder, original)
    return restored

Prompt Injection Detection

INJECTION_PATTERNS = [
    r"ignore\s+(previous|all|above)\s+(instructions|prompts|rules)",
    r"forget\s+(everything|your\s+instructions|what\s+you\s+were\s+told)",
    r"you\s+are\s+now\s+(a|an|in)\s+",
    r"system\s*prompt\s*[:=]",
    r"act\s+as\s+(if\s+you\s+are|a|an)\s+",
    r"pretend\s+(to\s+be|you\s+are)",
    r"override\s+(your|the)\s+(instructions|system|rules)",
    r"new\s+instructions?\s*[:=]",
    r"\]\s*\}\s*\{",  # JSON injection
    r"<\/?system>",   # XML tag injection
]

def detect_prompt_injection(user_input):
    """Multi-layer prompt injection detection."""
    risk_score = 0.0
    detections = []
    
    # Layer 1: Regex pattern matching
    for pattern in INJECTION_PATTERNS:
        if re.search(pattern, user_input, re.IGNORECASE):
            risk_score += 0.3
            detections.append(f"pattern_match: {pattern}")
    
    # Layer 2: Structural analysis
    if len(user_input) > 2000:
        risk_score += 0.1  # Unusually long inputs are suspicious
    
    if user_input.count('\n') > 20:
        risk_score += 0.1  # Many newlines may indicate instruction embedding
    
    # Layer 3: LLM-based classifier (most accurate, highest latency)
    if risk_score > 0.2:  # Only run expensive check if regex flags something
        classifier_result = classify_injection(user_input)
        risk_score = max(risk_score, classifier_result["score"])
        detections.append(f"classifier: {classifier_result['reason']}")
    
    return {
        "risk_score": min(risk_score, 1.0),
        "is_injection": risk_score >= 0.5,
        "detections": detections,
        "action": "block" if risk_score >= 0.7 else "warn" if risk_score >= 0.5 else "allow",
    }

Topic Restriction

ALLOWED_TOPICS = [
    "product_questions", "technical_support", "billing",
    "account_management", "feature_requests",
]

BLOCKED_TOPICS = [
    "medical_advice", "legal_advice", "financial_advice",
    "weapons", "drugs", "self_harm", "politics", "religion",
    "competitor_comparisons", "internal_company_info",
]

def classify_topic(user_input, model="gpt-4o-mini"):
    """Classify input topic and block restricted categories."""
    prompt = f"""Classify this user input into exactly one category.

Allowed: {', '.join(ALLOWED_TOPICS)}
Blocked: {', '.join(BLOCKED_TOPICS)}

If the input doesn't fit any category, classify as "other".

Input: {user_input}
Category:"""
    
    category = model.generate(prompt, temperature=0).strip().lower()
    
    return {
        "category": category,
        "allowed": category in ALLOWED_TOPICS or category == "other",
        "blocked": category in BLOCKED_TOPICS,
    }

Output Guardrails

Content Safety Classification

from openai import OpenAI

client = OpenAI()

def check_content_safety(text):
    """Use OpenAI Moderation API for content safety check."""
    response = client.moderations.create(input=text)
    result = response.results[0]
    
    flagged_categories = {
        cat: score 
        for cat, score in result.category_scores.dict().items() 
        if score > 0.5
    }
    
    return {
        "flagged": result.flagged,
        "categories": flagged_categories,
        "action": "block" if result.flagged else "allow",
    }

def toxicity_filter(text, threshold=0.7):
    """Multi-provider toxicity detection for defense in depth."""
    scores = []
    
    # Provider 1: OpenAI Moderation
    openai_result = check_content_safety(text)
    scores.append(max(openai_result["categories"].values(), default=0))
    
    # Provider 2: Perspective API (Google)
    perspective_score = perspective_api.analyze(text, ["TOXICITY"])["TOXICITY"]
    scores.append(perspective_score)
    
    max_score = max(scores)
    
    return {
        "toxicity_score": max_score,
        "toxic": max_score >= threshold,
        "action": "block" if max_score >= threshold else "allow",
    }

Factual Grounding Check

For RAG systems, verify the response is grounded in retrieved context:

def check_grounding(response, retrieved_context):
    """Verify LLM response is grounded in retrieved context."""
    prompt = f"""Analyze whether the RESPONSE is fully supported by the CONTEXT.

CONTEXT:
{retrieved_context}

RESPONSE:
{response}

For each claim in the response, determine if it is:
- SUPPORTED: Directly stated or clearly implied by the context
- UNSUPPORTED: Not found in the context (potential hallucination)
- CONTRADICTED: Directly contradicts the context

Return JSON:
{{
    "grounding_score": 0.0-1.0,
    "claims": [
        {{"claim": "...", "verdict": "SUPPORTED|UNSUPPORTED|CONTRADICTED"}}
    ],
    "ungrounded_claims": ["..."]
}}"""
    
    result = json.loads(llm.generate(prompt, temperature=0))
    
    return {
        "grounded": result["grounding_score"] > 0.8,
        "score": result["grounding_score"],
        "issues": result["ungrounded_claims"],
        "action": "allow" if result["grounding_score"] > 0.8 else "flag",
    }

Rate Limiting & Abuse Prevention

from collections import defaultdict
from time import time

class RateLimiter:
    def __init__(self):
        self.user_requests = defaultdict(list)
        self.user_tokens = defaultdict(int)
    
    LIMITS = {
        "requests_per_minute": 20,
        "requests_per_hour": 200,
        "tokens_per_day": 100_000,
        "max_input_length": 4000,
        "max_concurrent": 3,
    }
    
    def check(self, user_id, input_tokens):
        now = time()
        
        # Clean old entries
        self.user_requests[user_id] = [
            t for t in self.user_requests[user_id] if now - t < 3600
        ]
        
        # Check request rate
        recent = [t for t in self.user_requests[user_id] if now - t < 60]
        if len(recent) >= self.LIMITS["requests_per_minute"]:
            return {"allowed": False, "reason": "Rate limit: too many requests per minute"}
        
        hourly = self.user_requests[user_id]
        if len(hourly) >= self.LIMITS["requests_per_hour"]:
            return {"allowed": False, "reason": "Rate limit: hourly quota exceeded"}
        
        # Check token budget
        if self.user_tokens[user_id] + input_tokens > self.LIMITS["tokens_per_day"]:
            return {"allowed": False, "reason": "Rate limit: daily token budget exceeded"}
        
        # Track
        self.user_requests[user_id].append(now)
        self.user_tokens[user_id] += input_tokens
        
        return {"allowed": True}

Incident Response

Safety Incident Severity Levels

LevelDescriptionExampleResponse SLA
SEV-1Harmful content reaches user at scaleJailbreak vector discovered and exploited15 minutes: block endpoint
SEV-2PII leak in outputModel surfaces customer data from training1 hour: patch guardrail
SEV-3Factual error at scaleRAG returns outdated policy information4 hours: update knowledge base
SEV-4Off-topic behaviorModel discusses competitors when asked24 hours: update topic restrictions

Automated Incident Detection

class SafetyMonitor:
    def __init__(self):
        self.blocked_count = defaultdict(int)
        self.alert_thresholds = {
            "injection_blocks_per_hour": 10,
            "toxicity_blocks_per_hour": 5,
            "pii_detections_per_hour": 20,
        }
    
    def log_event(self, event_type, user_id, details):
        self.blocked_count[event_type] += 1
        
        # Check for coordinated attacks
        if self.blocked_count[event_type] > self.alert_thresholds.get(
            f"{event_type}_per_hour", 100
        ):
            self.escalate(
                severity="SEV-1" if event_type == "injection" else "SEV-2",
                message=f"Unusual {event_type} volume: {self.blocked_count[event_type]}/hour",
                details=details,
            )
    
    def escalate(self, severity, message, details):
        send_pagerduty(severity=severity, message=message)
        send_slack(channel="#ai-safety", message=f"🚨 {severity}: {message}")
        log_to_audit_trail(severity=severity, details=details)

Anti-Patterns

Anti-PatternProblemFix
Guardrails only on outputSensitive input reaches the LLM and may be logged/leakedAdd input guardrails (PII, injection, topic) before the model call
Single-layer defenseOne detection method has blind spotsLayer multiple methods: regex + classifier + LLM judge
Blocking without explanationUsers get frustrated by opaque rejectionsReturn clear, helpful messages: “I can’t help with X, but I can help with Y”
No monitoringAttacks and failures go undetectedReal-time dashboards + automated alerting on anomalies
Static guardrailsNew attack vectors bypass fixed rulesUpdate detection patterns continuously, red-team quarterly
Over-blockingGuardrails too aggressive, blocking legitimate requestsTrack false positive rate, tune thresholds based on production data

LLM Safety Checklist

  • Input guardrails: PII detection, injection defense, topic restriction
  • Output guardrails: toxicity filter, grounding check, PII scrubbing
  • Rate limiting configured per user, per token, per endpoint
  • Content safety classification with multi-provider defense
  • Prompt injection detection using layered approach (regex + ML classifier)
  • Audit logging for all blocked/flagged interactions
  • Monitoring dashboard with real-time alerting
  • Incident response playbook with severity levels and SLAs
  • Red teaming conducted quarterly with updated attack vectors
  • False positive tracking and threshold optimization
  • User-facing error messages are helpful, not opaque
  • Model outputs never contain internal system prompts or tool definitions

:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For LLM safety architecture consulting, visit garnetgrid.com. :::

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →