LLM Guardrails & Safety Architecture
Build production-grade LLM safety systems. Covers input validation, output filtering, content classifiers, PII detection, prompt injection defense, rate limiting, and incident response.
Every LLM deployed in production is a potential liability. Without guardrails, models will happily generate harmful content, leak private information, follow injected instructions, and produce outputs that violate your company’s policies. Guardrails are not optional safety theater — they’re the engineering controls that make the difference between a useful AI system and a lawsuit waiting to happen.
This guide covers the architecture of production-grade LLM safety: input validation, output filtering, content classification, PII detection, prompt injection defense, and the operational infrastructure needed to monitor and respond to safety incidents in real time.
Guardrail Architecture Overview
User Input
↓
┌──────────────────────┐
│ INPUT GUARDRAILS │
│ • PII Detection │
│ • Prompt Injection │
│ • Topic Restriction │
│ • Rate Limiting │
│ • Input Sanitization │
└──────────────────────┘
↓ (pass/block)
┌──────────────────────┐
│ LLM PROCESSING │
│ • System Prompt │
│ • Context Injection │
│ • Model Call │
└──────────────────────┘
↓
┌──────────────────────┐
│ OUTPUT GUARDRAILS │
│ • Content Safety │
│ • Factual Grounding │
│ • PII Scrubbing │
│ • Format Validation │
│ • Toxicity Detection │
└──────────────────────┘
↓ (pass/block/modify)
User Output
Input Guardrails
PII Detection
Stop sensitive data from reaching the LLM in the first place:
import re
from presidio_analyzer import AnalyzerEngine
analyzer = AnalyzerEngine()
def detect_and_redact_pii(text):
"""Detect PII and replace with placeholders before sending to LLM."""
results = analyzer.analyze(
text=text,
language="en",
entities=[
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"CREDIT_CARD", "US_SSN", "IBAN_CODE",
"IP_ADDRESS", "LOCATION", "DATE_TIME",
],
score_threshold=0.7,
)
# Sort by start position (reverse) to preserve indices during replacement
results.sort(key=lambda x: x.start, reverse=True)
redacted = text
pii_map = {} # Store for re-insertion after LLM response
for result in results:
placeholder = f"[{result.entity_type}_{result.start}]"
original = text[result.start:result.end]
pii_map[placeholder] = original
redacted = redacted[:result.start] + placeholder + redacted[result.end:]
return redacted, pii_map
def restore_pii(response, pii_map):
"""Re-insert original PII into the response (for internal use only)."""
restored = response
for placeholder, original in pii_map.items():
restored = restored.replace(placeholder, original)
return restored
Prompt Injection Detection
INJECTION_PATTERNS = [
r"ignore\s+(previous|all|above)\s+(instructions|prompts|rules)",
r"forget\s+(everything|your\s+instructions|what\s+you\s+were\s+told)",
r"you\s+are\s+now\s+(a|an|in)\s+",
r"system\s*prompt\s*[:=]",
r"act\s+as\s+(if\s+you\s+are|a|an)\s+",
r"pretend\s+(to\s+be|you\s+are)",
r"override\s+(your|the)\s+(instructions|system|rules)",
r"new\s+instructions?\s*[:=]",
r"\]\s*\}\s*\{", # JSON injection
r"<\/?system>", # XML tag injection
]
def detect_prompt_injection(user_input):
"""Multi-layer prompt injection detection."""
risk_score = 0.0
detections = []
# Layer 1: Regex pattern matching
for pattern in INJECTION_PATTERNS:
if re.search(pattern, user_input, re.IGNORECASE):
risk_score += 0.3
detections.append(f"pattern_match: {pattern}")
# Layer 2: Structural analysis
if len(user_input) > 2000:
risk_score += 0.1 # Unusually long inputs are suspicious
if user_input.count('\n') > 20:
risk_score += 0.1 # Many newlines may indicate instruction embedding
# Layer 3: LLM-based classifier (most accurate, highest latency)
if risk_score > 0.2: # Only run expensive check if regex flags something
classifier_result = classify_injection(user_input)
risk_score = max(risk_score, classifier_result["score"])
detections.append(f"classifier: {classifier_result['reason']}")
return {
"risk_score": min(risk_score, 1.0),
"is_injection": risk_score >= 0.5,
"detections": detections,
"action": "block" if risk_score >= 0.7 else "warn" if risk_score >= 0.5 else "allow",
}
Topic Restriction
ALLOWED_TOPICS = [
"product_questions", "technical_support", "billing",
"account_management", "feature_requests",
]
BLOCKED_TOPICS = [
"medical_advice", "legal_advice", "financial_advice",
"weapons", "drugs", "self_harm", "politics", "religion",
"competitor_comparisons", "internal_company_info",
]
def classify_topic(user_input, model="gpt-4o-mini"):
"""Classify input topic and block restricted categories."""
prompt = f"""Classify this user input into exactly one category.
Allowed: {', '.join(ALLOWED_TOPICS)}
Blocked: {', '.join(BLOCKED_TOPICS)}
If the input doesn't fit any category, classify as "other".
Input: {user_input}
Category:"""
category = model.generate(prompt, temperature=0).strip().lower()
return {
"category": category,
"allowed": category in ALLOWED_TOPICS or category == "other",
"blocked": category in BLOCKED_TOPICS,
}
Output Guardrails
Content Safety Classification
from openai import OpenAI
client = OpenAI()
def check_content_safety(text):
"""Use OpenAI Moderation API for content safety check."""
response = client.moderations.create(input=text)
result = response.results[0]
flagged_categories = {
cat: score
for cat, score in result.category_scores.dict().items()
if score > 0.5
}
return {
"flagged": result.flagged,
"categories": flagged_categories,
"action": "block" if result.flagged else "allow",
}
def toxicity_filter(text, threshold=0.7):
"""Multi-provider toxicity detection for defense in depth."""
scores = []
# Provider 1: OpenAI Moderation
openai_result = check_content_safety(text)
scores.append(max(openai_result["categories"].values(), default=0))
# Provider 2: Perspective API (Google)
perspective_score = perspective_api.analyze(text, ["TOXICITY"])["TOXICITY"]
scores.append(perspective_score)
max_score = max(scores)
return {
"toxicity_score": max_score,
"toxic": max_score >= threshold,
"action": "block" if max_score >= threshold else "allow",
}
Factual Grounding Check
For RAG systems, verify the response is grounded in retrieved context:
def check_grounding(response, retrieved_context):
"""Verify LLM response is grounded in retrieved context."""
prompt = f"""Analyze whether the RESPONSE is fully supported by the CONTEXT.
CONTEXT:
{retrieved_context}
RESPONSE:
{response}
For each claim in the response, determine if it is:
- SUPPORTED: Directly stated or clearly implied by the context
- UNSUPPORTED: Not found in the context (potential hallucination)
- CONTRADICTED: Directly contradicts the context
Return JSON:
{{
"grounding_score": 0.0-1.0,
"claims": [
{{"claim": "...", "verdict": "SUPPORTED|UNSUPPORTED|CONTRADICTED"}}
],
"ungrounded_claims": ["..."]
}}"""
result = json.loads(llm.generate(prompt, temperature=0))
return {
"grounded": result["grounding_score"] > 0.8,
"score": result["grounding_score"],
"issues": result["ungrounded_claims"],
"action": "allow" if result["grounding_score"] > 0.8 else "flag",
}
Rate Limiting & Abuse Prevention
from collections import defaultdict
from time import time
class RateLimiter:
def __init__(self):
self.user_requests = defaultdict(list)
self.user_tokens = defaultdict(int)
LIMITS = {
"requests_per_minute": 20,
"requests_per_hour": 200,
"tokens_per_day": 100_000,
"max_input_length": 4000,
"max_concurrent": 3,
}
def check(self, user_id, input_tokens):
now = time()
# Clean old entries
self.user_requests[user_id] = [
t for t in self.user_requests[user_id] if now - t < 3600
]
# Check request rate
recent = [t for t in self.user_requests[user_id] if now - t < 60]
if len(recent) >= self.LIMITS["requests_per_minute"]:
return {"allowed": False, "reason": "Rate limit: too many requests per minute"}
hourly = self.user_requests[user_id]
if len(hourly) >= self.LIMITS["requests_per_hour"]:
return {"allowed": False, "reason": "Rate limit: hourly quota exceeded"}
# Check token budget
if self.user_tokens[user_id] + input_tokens > self.LIMITS["tokens_per_day"]:
return {"allowed": False, "reason": "Rate limit: daily token budget exceeded"}
# Track
self.user_requests[user_id].append(now)
self.user_tokens[user_id] += input_tokens
return {"allowed": True}
Incident Response
Safety Incident Severity Levels
| Level | Description | Example | Response SLA |
|---|---|---|---|
| SEV-1 | Harmful content reaches user at scale | Jailbreak vector discovered and exploited | 15 minutes: block endpoint |
| SEV-2 | PII leak in output | Model surfaces customer data from training | 1 hour: patch guardrail |
| SEV-3 | Factual error at scale | RAG returns outdated policy information | 4 hours: update knowledge base |
| SEV-4 | Off-topic behavior | Model discusses competitors when asked | 24 hours: update topic restrictions |
Automated Incident Detection
class SafetyMonitor:
def __init__(self):
self.blocked_count = defaultdict(int)
self.alert_thresholds = {
"injection_blocks_per_hour": 10,
"toxicity_blocks_per_hour": 5,
"pii_detections_per_hour": 20,
}
def log_event(self, event_type, user_id, details):
self.blocked_count[event_type] += 1
# Check for coordinated attacks
if self.blocked_count[event_type] > self.alert_thresholds.get(
f"{event_type}_per_hour", 100
):
self.escalate(
severity="SEV-1" if event_type == "injection" else "SEV-2",
message=f"Unusual {event_type} volume: {self.blocked_count[event_type]}/hour",
details=details,
)
def escalate(self, severity, message, details):
send_pagerduty(severity=severity, message=message)
send_slack(channel="#ai-safety", message=f"🚨 {severity}: {message}")
log_to_audit_trail(severity=severity, details=details)
Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Guardrails only on output | Sensitive input reaches the LLM and may be logged/leaked | Add input guardrails (PII, injection, topic) before the model call |
| Single-layer defense | One detection method has blind spots | Layer multiple methods: regex + classifier + LLM judge |
| Blocking without explanation | Users get frustrated by opaque rejections | Return clear, helpful messages: “I can’t help with X, but I can help with Y” |
| No monitoring | Attacks and failures go undetected | Real-time dashboards + automated alerting on anomalies |
| Static guardrails | New attack vectors bypass fixed rules | Update detection patterns continuously, red-team quarterly |
| Over-blocking | Guardrails too aggressive, blocking legitimate requests | Track false positive rate, tune thresholds based on production data |
LLM Safety Checklist
- Input guardrails: PII detection, injection defense, topic restriction
- Output guardrails: toxicity filter, grounding check, PII scrubbing
- Rate limiting configured per user, per token, per endpoint
- Content safety classification with multi-provider defense
- Prompt injection detection using layered approach (regex + ML classifier)
- Audit logging for all blocked/flagged interactions
- Monitoring dashboard with real-time alerting
- Incident response playbook with severity levels and SLAs
- Red teaming conducted quarterly with updated attack vectors
- False positive tracking and threshold optimization
- User-facing error messages are helpful, not opaque
- Model outputs never contain internal system prompts or tool definitions
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For LLM safety architecture consulting, visit garnetgrid.com. :::