LLM Caching Strategies for Production: Performance & Cost Optimization
When we built Vidmation’s AI video automation pipeline, our initial Claude API costs were burning through budget at an alarming rate. The system generates scripts, voiceovers, and visual content — often making 20+ API calls per video. Without proper caching, we were paying for redundant computations and watching response times crawl.
Implementing effective LLM caching strategies for production systems isn’t just about performance — it’s about making AI applications economically viable. The right caching approach can reduce API costs by 60-80% while dramatically improving user experience.
Why LLM Caching Matters in Production
Large language model APIs are expensive and relatively slow compared to traditional web services. GPT-4 can cost $0.03 per 1K input tokens and $0.06 per 1K output tokens. Claude-3.5-Sonnet runs $0.003 per 1K input and $0.015 per 1K output. When you’re processing thousands of requests daily, costs escalate quickly.
Response latency is equally critical. Even fast LLM APIs typically take 2-5 seconds for substantial responses. Users expect sub-second interactions, especially in conversational interfaces or real-time applications.
Caching solves both problems by storing and reusing previous LLM responses when appropriate. The challenge lies in determining when responses can be reused and implementing cache invalidation strategies that maintain response quality.
Cache Key Design Patterns
The foundation of any effective caching strategy is proper cache key design. LLM responses depend on multiple variables: the prompt, model parameters, conversation context, and system instructions.
Deterministic Cache Keys
For stateless LLM operations, create deterministic cache keys from all input parameters:
import hashlib
import json
def create_cache_key(prompt: str, model: str, temperature: float,
max_tokens: int, system_prompt: str = "") -> str:
cache_input = {
"prompt": prompt,
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
"system_prompt": system_prompt
}
# Normalize and hash all parameters
normalized = json.dumps(cache_input, sort_keys=True)
return hashlib.sha256(normalized.encode()).hexdigest()
This approach works well for tasks like content generation, code analysis, or data extraction where identical inputs should produce identical outputs.
Semantic Cache Keys
For more nuanced caching, consider semantic similarity. Two prompts might be worded differently but request the same information:
from sentence_transformers import SentenceTransformer
class SemanticCache:
def __init__(self, similarity_threshold=0.85):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.threshold = similarity_threshold
self.cache = {}
self.embeddings = {}
def get_similar_key(self, prompt: str):
prompt_embedding = self.model.encode(prompt)
for cached_key, cached_embedding in self.embeddings.items():
similarity = cosine_similarity(prompt_embedding, cached_embedding)
if similarity > self.threshold:
return cached_key
return None
def set(self, prompt: str, response: str):
key = hashlib.sha256(prompt.encode()).hexdigest()
self.cache[key] = response
self.embeddings[key] = self.model.encode(prompt)
We used a variant of this approach in ClawdHub’s agent orchestration system, where different users might phrase similar agent instructions in various ways.
Cache Storage Options
Redis for Distributed Caching
Redis is our go-to choice for production LLM caching. It provides fast access, automatic expiration, and works across multiple application instances:
import redis
import json
from typing import Optional
class LLMRedisCache:
def __init__(self, redis_url: str, default_ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.default_ttl = default_ttl
def get(self, key: str) -> Optional[str]:
try:
cached = self.redis.get(f"llm_cache:{key}")
return cached.decode() if cached else None
except redis.RedisError:
return None
def set(self, key: str, value: str, ttl: Optional[int] = None):
try:
actual_ttl = ttl or self.default_ttl
self.redis.setex(f"llm_cache:{key}", actual_ttl, value)
except redis.RedisError:
pass # Fail silently for cache operations
def invalidate_pattern(self, pattern: str):
keys = self.redis.keys(f"llm_cache:{pattern}")
if keys:
self.redis.delete(*keys)
PostgreSQL for Persistent Caching
For applications requiring longer-term cache persistence or complex querying, PostgreSQL with proper indexing works well:
import asyncpg
from typing import Optional, Dict, Any
class PostgresLLMCache:
def __init__(self, connection_string: str):
self.connection_string = connection_string
async def get(self, cache_key: str) -> Optional[Dict[str, Any]]:
conn = await asyncpg.connect(self.connection_string)
try:
row = await conn.fetchrow("""
SELECT response, metadata, created_at
FROM llm_cache
WHERE cache_key = $1 AND expires_at > NOW()
""", cache_key)
return dict(row) if row else None
finally:
await conn.close()
async def set(self, cache_key: str, response: str,
metadata: Dict[str, Any], ttl_seconds: int = 3600):
conn = await asyncpg.connect(self.connection_string)
try:
await conn.execute("""
INSERT INTO llm_cache (cache_key, response, metadata, expires_at)
VALUES ($1, $2, $3, NOW() + INTERVAL '%s seconds')
ON CONFLICT (cache_key) DO UPDATE SET
response = EXCLUDED.response,
metadata = EXCLUDED.metadata,
expires_at = EXCLUDED.expires_at
""" % ttl_seconds, cache_key, response, metadata)
finally:
await conn.close()
Create the table with proper indexing:
CREATE TABLE llm_cache (
id SERIAL PRIMARY KEY,
cache_key VARCHAR(64) UNIQUE NOT NULL,
response TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP NOT NULL
);
CREATE INDEX idx_llm_cache_key ON llm_cache (cache_key);
CREATE INDEX idx_llm_cache_expires ON llm_cache (expires_at);
Multi-Layered Cache Architecture
Production systems benefit from multiple cache layers with different characteristics:
class MultiLayerLLMCache:
def __init__(self, memory_cache, redis_cache, postgres_cache):
self.memory = memory_cache # L1: Fast, small, process-local
self.redis = redis_cache # L2: Fast, shared, temporary
self.postgres = postgres_cache # L3: Slow, persistent, searchable
async def get(self, key: str) -> Optional[str]:
# Check L1 cache first
result = self.memory.get(key)
if result:
return result
# Check L2 cache
result = await self.redis.get(key)
if result:
self.memory.set(key, result) # Populate L1
return result
# Check L3 cache
result = await self.postgres.get(key)
if result:
self.memory.set(key, result['response'])
await self.redis.set(key, result['response'])
return result['response']
return None
async def set(self, key: str, value: str):
# Write to all layers
self.memory.set(key, value)
await self.redis.set(key, value)
await self.postgres.set(key, value, {})
Context-Aware Cache Invalidation
Simple TTL-based expiration works for basic cases, but production systems need smarter invalidation strategies.
Version-Based Invalidation
Track versions of underlying data that influences LLM responses:
class VersionedLLMCache:
def __init__(self, cache_backend):
self.cache = cache_backend
self.versions = {}
def create_versioned_key(self, base_key: str, dependencies: List[str]) -> str:
version_str = "_".join([
f"{dep}:{self.versions.get(dep, 0)}"
for dep in dependencies
])
return f"{base_key}:{version_str}"
def invalidate_dependency(self, dependency: str):
self.versions[dependency] = self.versions.get(dependency, 0) + 1
# All cache keys depending on this will now miss
Tag-Based Invalidation
Group related cache entries with tags for bulk invalidation:
class TaggedLLMCache:
def __init__(self, redis_client):
self.redis = redis_client
def set_with_tags(self, key: str, value: str, tags: List[str], ttl: int = 3600):
# Store the cached value
self.redis.setex(f"cache:{key}", ttl, value)
# Store tag associations
for tag in tags:
self.redis.sadd(f"tag:{tag}", key)
self.redis.expire(f"tag:{tag}", ttl + 300) # Tags live slightly longer
def invalidate_by_tag(self, tag: str):
# Get all keys with this tag
keys = self.redis.smembers(f"tag:{tag}")
if keys:
# Delete cache entries
cache_keys = [f"cache:{key.decode()}" for key in keys]
self.redis.delete(*cache_keys)
# Clean up the tag set
self.redis.delete(f"tag:{tag}")
Prompt-Specific Caching Strategies
Different types of LLM operations require different caching approaches.
Content Generation Caching
For content generation tasks, cache based on topic and style parameters:
async def generate_content_with_cache(topic: str, style: str, length: int):
cache_key = f"content:{hashlib.sha256(f'{topic}:{style}:{length}'.encode()).hexdigest()}"
cached = await cache.get(cache_key)
if cached:
return json.loads(cached)
prompt = f"Write a {length}-word {style} article about {topic}"
response = await llm_client.generate(prompt)
# Cache for 24 hours - content doesn't change frequently
await cache.set(cache_key, json.dumps(response), ttl=86400)
return response
Code Analysis Caching
Code analysis can be cached by file content hash:
import hashlib
async def analyze_code_with_cache(code: str, analysis_type: str):
code_hash = hashlib.sha256(code.encode()).hexdigest()
cache_key = f"code_analysis:{analysis_type}:{code_hash}"
cached = await cache.get(cache_key)
if cached:
return json.loads(cached)
prompt = f"Analyze this {analysis_type}:\n\n{code}"
response = await llm_client.generate(prompt)
# Cache indefinitely - code analysis for specific content won't change
await cache.set(cache_key, json.dumps(response), ttl=None)
return response
We implement this pattern in our AI Schematic Generator, where circuit analysis for identical component configurations can be safely cached long-term.
Conversational Context Caching
For chat applications, cache conversation states to avoid reprocessing entire conversation histories:
class ConversationCache:
def __init__(self, cache_backend):
self.cache = cache_backend
async def get_conversation_state(self, conversation_id: str,
message_count: int) -> Optional[Dict]:
key = f"conv:{conversation_id}:state:{message_count}"
cached = await self.cache.get(key)
return json.loads(cached) if cached else None
async def cache_conversation_state(self, conversation_id: str,
message_count: int, state: Dict):
key = f"conv:{conversation_id}:state:{message_count}"
# Cache conversation states for 1 hour
await self.cache.set(key, json.dumps(state), ttl=3600)
Cache Warming Strategies
Proactively populate caches with likely-needed responses to minimize cache misses.
Batch Pre-computation
For predictable workloads, pre-compute common responses:
async def warm_content_cache():
common_topics = ["AI trends", "productivity tips", "tech reviews"]
common_styles = ["blog", "social", "email"]
tasks = []
for topic in common_topics:
for style in common_styles:
task = generate_content_with_cache(topic, style, 500)
tasks.append(task)
# Pre-populate cache with common combinations
await asyncio.gather(*tasks, return_exceptions=True)
Usage Pattern Analysis
Monitor cache misses to identify warming opportunities:
class CacheAnalytics:
def __init__(self, cache_backend):
self.cache = cache_backend
self.miss_patterns = defaultdict(int)
def record_miss(self, cache_key: str, prompt_type: str):
pattern = self.extract_pattern(cache_key)
self.miss_patterns[f"{prompt_type}:{pattern}"] += 1
def get_warming_candidates(self, threshold: int = 5) -> List[str]:
return [
pattern for pattern, count in self.miss_patterns.items()
if count >= threshold
]
Performance Monitoring and Optimization
Track cache performance to optimize your strategy over time.
Cache Metrics Collection
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class CacheMetrics:
hits: int = 0
misses: int = 0
hit_time_total: float = 0.0
miss_time_total: float = 0.0
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def average_hit_time(self) -> float:
return self.hit_time_total / self.hits if self.hits > 0 else 0.0
class MonitoredLLMCache:
def __init__(self, cache_backend):
self.cache = cache_backend
self.metrics = CacheMetrics()
async def get(self, key: str) -> Optional[str]:
start_time = time.time()
result = await self.cache.get(key)
elapsed = time.time() - start_time
if result:
self.metrics.hits += 1
self.metrics.hit_time_total += elapsed
else:
self.metrics.misses += 1
self.metrics.miss_time_total += elapsed
return result
Cost Impact Analysis
In our experience across projects like Vidmation and ClawdHub, well-implemented
More from the blog
Need help with AI?
We build production AI systems — from strategy and architecture to deployment and evaluation.
Get our AI implementation playbook
A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.
Check your inbox.
Something went wrong. Please try again.