Redis Caching Patterns for Enterprise Applications
Production-ready Redis caching strategies. Covers cache-aside, write-through, read-through, cache invalidation, TTL management, and distributed locking patterns.
Redis isn’t just a cache — it’s a data structure server that happens to be incredibly fast at caching. But using it effectively requires understanding caching patterns, invalidation strategies, and failure modes. Most teams implement cache-aside and stop there. This guide covers the patterns that separate amateur caching from production-grade architectures.
The number one caching mistake: treating Redis as a durable data store when it’s configured as an eviction-based cache. If your application breaks when Redis loses a key, your caching layer is actually a storage layer — and you need to redesign.
Pattern Selection Guide
| Pattern | Consistency | Write Speed | Read Speed | Complexity | Best For |
|---|---|---|---|---|---|
| Cache-Aside | Eventual | Normal | Fast (on hit) | Low | Most read-heavy workloads |
| Write-Through | Strong | Slower | Fast | Medium | Data that must always be fresh |
| Write-Behind | Eventual | Very fast | Fast | High | High-volume writes (metrics, logs) |
| Read-Through | Eventual | Normal | Fast | Medium | ORM-integrated caching |
Core Caching Patterns
1. Cache-Aside (Lazy Loading)
The most common pattern. Application checks cache first, falls back to database on miss.
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id: str) -> dict:
cache_key = f"user:{user_id}"
# 1. Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. Cache miss — query database
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
if user is None:
return None
# 3. Populate cache with TTL
r.setex(cache_key, 3600, json.dumps(user)) # 1 hour TTL
return user
def update_user(user_id: str, data: dict):
# Update database first
db.execute("UPDATE users SET ... WHERE id = %s", user_id)
# Invalidate cache (don't update — delete)
r.delete(f"user:{user_id}")
Pros: Simple, cache only contains requested data Cons: Cache miss penalty, potential stale data window, cache stampede risk
2. Write-Through
Every write goes to both cache and database atomically.
def update_product(product_id: str, data: dict):
cache_key = f"product:{product_id}"
# Write to database
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# Write to cache simultaneously
r.setex(cache_key, 7200, json.dumps(data))
return data
def get_product(product_id: str) -> dict:
cached = r.get(f"product:{product_id}")
if cached:
return json.loads(cached)
# Fallback for cold cache
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(f"product:{product_id}", 7200, json.dumps(product))
return product
Pros: Cache always has latest data, no stale reads Cons: Higher write latency, cache may hold data that’s never read
3. Write-Behind (Write-Back)
Writes go to cache immediately, then asynchronously flushed to database.
import time
from threading import Thread
def update_metric(key: str, value: float):
# Write to cache immediately (fast)
r.hset(f"metrics:{key}", mapping={
"value": value,
"updated_at": time.time(),
"dirty": 1 # Mark as needing database sync
})
# Background worker flushes dirty entries to database
def flush_worker():
while True:
dirty_keys = r.keys("metrics:*")
for key in dirty_keys:
data = r.hgetall(key)
if data.get("dirty") == "1":
db.execute("UPSERT INTO metrics ...", data)
r.hset(key, "dirty", 0)
time.sleep(5) # Flush every 5 seconds
Thread(target=flush_worker, daemon=True).start()
Pros: Extremely fast writes, batched database operations Cons: Risk of data loss if Redis crashes before flush, eventual consistency
Cache Invalidation Strategies
Time-Based (TTL)
# Simple TTL — good for data that changes infrequently
r.setex("config:feature_flags", 300, json.dumps(flags)) # 5 min TTL
# Staggered TTL — prevent thundering herd
import random
base_ttl = 3600
jitter = random.randint(0, 300) # 0-5 min jitter
r.setex(f"product:{pid}", base_ttl + jitter, data)
TTL Guidelines by Data Type
| Data Type | Recommended TTL | Rationale |
|---|---|---|
| User sessions | 30 minutes | Security + activity-based |
| Product catalog | 1-6 hours | Changes infrequently |
| Feature flags | 5 minutes | Need fast rollout |
| Search results | 15-30 minutes | Freshness vs performance |
| API rate limit counters | 60 seconds | Sliding window accuracy |
| Dashboard aggregations | 5-15 minutes | Computationally expensive |
Event-Based Invalidation
# When an order is placed, invalidate all related caches
def on_order_placed(order):
pipe = r.pipeline()
pipe.delete(f"user:{order.user_id}:orders")
pipe.delete(f"product:{order.product_id}:stock")
pipe.delete(f"analytics:daily_revenue:{order.date}")
pipe.delete("analytics:total_orders")
pipe.execute() # Atomic batch delete
Tag-Based Invalidation
# Tag cache entries for group invalidation
def cache_with_tags(key: str, value: str, tags: list, ttl: int = 3600):
pipe = r.pipeline()
pipe.setex(key, ttl, value)
for tag in tags:
pipe.sadd(f"tag:{tag}", key)
pipe.expire(f"tag:{tag}", ttl + 60)
pipe.execute()
def invalidate_by_tag(tag: str):
keys = r.smembers(f"tag:{tag}")
if keys:
pipe = r.pipeline()
pipe.delete(*keys)
pipe.delete(f"tag:{tag}")
pipe.execute()
# Usage
cache_with_tags("user:123:profile", data, ["user:123", "profiles"])
cache_with_tags("user:123:orders", data, ["user:123", "orders"])
# Invalidate all caches for user 123
invalidate_by_tag("user:123")
Advanced Patterns
Cache Stampede Prevention
When a popular cache key expires, hundreds of concurrent requests all miss the cache and hit the database simultaneously.
import time
def get_with_lock(key: str, fetch_fn, ttl: int = 3600):
"""Cache-aside with distributed lock to prevent stampede."""
cached = r.get(key)
if cached:
return json.loads(cached)
lock_key = f"lock:{key}"
# Try to acquire lock (only one request populates cache)
if r.set(lock_key, "1", nx=True, ex=10): # 10s lock timeout
try:
data = fetch_fn()
r.setex(key, ttl, json.dumps(data))
return data
finally:
r.delete(lock_key)
else:
# Another request is populating — wait and retry
for _ in range(50): # Wait up to 5 seconds
time.sleep(0.1)
cached = r.get(key)
if cached:
return json.loads(cached)
# Fallback: query database directly
return fetch_fn()
Distributed Locking
import uuid
class RedisLock:
def __init__(self, redis_client, name, timeout=10):
self.redis = redis_client
self.name = f"lock:{name}"
self.timeout = timeout
self.token = str(uuid.uuid4())
def acquire(self):
return self.redis.set(self.name, self.token, nx=True, ex=self.timeout)
def release(self):
# Atomic: only delete if we still own the lock
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
self.redis.eval(script, 1, self.name, self.token)
# Usage
lock = RedisLock(r, "process_payment:order_123")
if lock.acquire():
try:
process_payment(order_id="order_123")
finally:
lock.release()
Redis Data Structures for Caching
| Structure | Use Case | Example |
|---|---|---|
STRING | Simple key-value | User sessions, JSON objects |
HASH | Object with fields | User profiles, product details |
LIST | Ordered collections | Recent activity, job queues |
SET | Unique members | Online users, tags |
SORTED SET | Ranked data | Leaderboards, rate limiting |
STREAM | Event log | Activity feeds, audit trails |
Sorted Set for Rate Limiting
def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}"
now = time.time()
window_start = now - window
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # Remove old entries
pipe.zadd(key, {str(now): now}) # Add current request
pipe.zcard(key) # Count requests in window
pipe.expire(key, window) # Auto-cleanup
_, _, count, _ = pipe.execute()
return count > limit
Production Configuration
# redis.conf for production caching
maxmemory 4gb
maxmemory-policy allkeys-lru
# Options: volatile-lru, allkeys-lru, volatile-lfu, allkeys-lfu, volatile-ttl
# Persistence (if cache data is valuable)
save 900 1 # Save if 1 key changed in 15 min
save 300 10 # Save if 10 keys changed in 5 min
# Connection limits
maxclients 10000
timeout 300
# Slow query logging
slowlog-log-slower-than 10000 # Log queries > 10ms
slowlog-max-len 128
Eviction Policy Selection
| Policy | Behavior | Use When |
|---|---|---|
allkeys-lru | Evict least recently used key | General-purpose cache |
allkeys-lfu | Evict least frequently used key | Hot/cold data with clear frequency patterns |
volatile-lru | LRU, but only keys with TTL | Mix of cache + permanent data |
volatile-ttl | Evict keys closest to expiring | TTL-driven lifecycle management |
noeviction | Return errors when full | When data loss is unacceptable |
Monitoring Commands
# Memory usage by key pattern
redis-cli --bigkeys
# Real-time command monitoring
redis-cli monitor
# Cache hit ratio
redis-cli INFO stats | grep keyspace
# keyspace_hits / (keyspace_hits + keyspace_misses) = hit ratio
# Target: > 95%
# Memory fragmentation
redis-cli INFO memory | grep mem_fragmentation_ratio
# Target: 1.0 - 1.5 (above 1.5 = fragmented, consider restart)
Cache Warming Strategies
Cold caches after deployments or restarts cause latency spikes. Use these warming strategies:
| Strategy | How It Works | Best For |
|---|---|---|
| Eager warming | Pre-load popular keys on startup | Known hot keys (top products, user sessions) |
| Lazy warming | Cache on first miss, let traffic populate | Long-tail content, unpredictable access |
| Snapshot restore | Load RDB or AOF snapshot on restart | Full cache restoration after planned maintenance |
| Background refresh | Refresh expiring keys before TTL hits | High-traffic keys where stale data is unacceptable |
| Peer replication | Warm from another Redis node | Multi-region or multi-AZ deployments |
Redis Memory Optimization
- Use appropriate data types — A Hash with 100 small fields uses less memory than 100 separate String keys
- Set maxmemory policy — Use allkeys-lru for general caching, volatile-lru when mixing persistent and cache data
- Enable compression — Use client-side compression (zstd, lz4) for values over 1KB
- Monitor memory fragmentation — If fragmentation ratio exceeds 1.5, consider a restart or MEMORY PURGE
Caching Checklist
- Pattern selected (cache-aside, write-through, write-behind) based on consistency needs
- Cache stampede prevention implemented (locking or probabilistic refresh)
- TTLs set per data type with random jitter to prevent thundering herd
- Write operations invalidate (delete), not update, cache entries
-
maxmemory-policyconfigured (LRU or LFU based on access patterns) - Eviction policy tested under memory pressure
- Cache hit ratio monitored (target >95%)
- Graceful degradation when Redis is unavailable (application still works)
- Connection pooling configured
- Serialization format chosen (JSON, MessagePack, or Protocol Buffers)
- Key naming convention documented and enforced (e.g.,
{entity}:{id}:{field}) - Tag-based invalidation set up for related cache entries
:::note[Source] This guide is derived from operational intelligence at Garnet Grid Consulting. For caching architecture consulting, visit garnetgrid.com. :::