AI Engineering

LLM Caching Strategies for Production: Performance & Cost Optimization

Connor O'Dea · · 6 min read
LLM Caching Production Performance Cost-Optimization

When we built Vidmation’s AI video automation pipeline, our initial Claude API costs were burning through budget at an alarming rate. The system generates scripts, voiceovers, and visual content — often making 20+ API calls per video. Without proper caching, we were paying for redundant computations and watching response times crawl.

Implementing effective LLM caching strategies for production systems isn’t just about performance — it’s about making AI applications economically viable. The right caching approach can reduce API costs by 60-80% while dramatically improving user experience.

Why LLM Caching Matters in Production

Large language model APIs are expensive and relatively slow compared to traditional web services. GPT-4 can cost $0.03 per 1K input tokens and $0.06 per 1K output tokens. Claude-3.5-Sonnet runs $0.003 per 1K input and $0.015 per 1K output. When you’re processing thousands of requests daily, costs escalate quickly.

Response latency is equally critical. Even fast LLM APIs typically take 2-5 seconds for substantial responses. Users expect sub-second interactions, especially in conversational interfaces or real-time applications.

Caching solves both problems by storing and reusing previous LLM responses when appropriate. The challenge lies in determining when responses can be reused and implementing cache invalidation strategies that maintain response quality.

Cache Key Design Patterns

The foundation of any effective caching strategy is proper cache key design. LLM responses depend on multiple variables: the prompt, model parameters, conversation context, and system instructions.

Deterministic Cache Keys

For stateless LLM operations, create deterministic cache keys from all input parameters:

import hashlib
import json

def create_cache_key(prompt: str, model: str, temperature: float, 
                    max_tokens: int, system_prompt: str = "") -> str:
    cache_input = {
        "prompt": prompt,
        "model": model,
        "temperature": temperature,
        "max_tokens": max_tokens,
        "system_prompt": system_prompt
    }
    # Normalize and hash all parameters
    normalized = json.dumps(cache_input, sort_keys=True)
    return hashlib.sha256(normalized.encode()).hexdigest()

This approach works well for tasks like content generation, code analysis, or data extraction where identical inputs should produce identical outputs.

Semantic Cache Keys

For more nuanced caching, consider semantic similarity. Two prompts might be worded differently but request the same information:

from sentence_transformers import SentenceTransformer

class SemanticCache:
    def __init__(self, similarity_threshold=0.85):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.threshold = similarity_threshold
        self.cache = {}
        self.embeddings = {}
    
    def get_similar_key(self, prompt: str):
        prompt_embedding = self.model.encode(prompt)
        
        for cached_key, cached_embedding in self.embeddings.items():
            similarity = cosine_similarity(prompt_embedding, cached_embedding)
            if similarity > self.threshold:
                return cached_key
        return None
    
    def set(self, prompt: str, response: str):
        key = hashlib.sha256(prompt.encode()).hexdigest()
        self.cache[key] = response
        self.embeddings[key] = self.model.encode(prompt)

We used a variant of this approach in ClawdHub’s agent orchestration system, where different users might phrase similar agent instructions in various ways.

Cache Storage Options

Redis for Distributed Caching

Redis is our go-to choice for production LLM caching. It provides fast access, automatic expiration, and works across multiple application instances:

import redis
import json
from typing import Optional

class LLMRedisCache:
    def __init__(self, redis_url: str, default_ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.default_ttl = default_ttl
    
    def get(self, key: str) -> Optional[str]:
        try:
            cached = self.redis.get(f"llm_cache:{key}")
            return cached.decode() if cached else None
        except redis.RedisError:
            return None
    
    def set(self, key: str, value: str, ttl: Optional[int] = None):
        try:
            actual_ttl = ttl or self.default_ttl
            self.redis.setex(f"llm_cache:{key}", actual_ttl, value)
        except redis.RedisError:
            pass  # Fail silently for cache operations
    
    def invalidate_pattern(self, pattern: str):
        keys = self.redis.keys(f"llm_cache:{pattern}")
        if keys:
            self.redis.delete(*keys)

PostgreSQL for Persistent Caching

For applications requiring longer-term cache persistence or complex querying, PostgreSQL with proper indexing works well:

import asyncpg
from typing import Optional, Dict, Any

class PostgresLLMCache:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
    
    async def get(self, cache_key: str) -> Optional[Dict[str, Any]]:
        conn = await asyncpg.connect(self.connection_string)
        try:
            row = await conn.fetchrow("""
                SELECT response, metadata, created_at 
                FROM llm_cache 
                WHERE cache_key = $1 AND expires_at > NOW()
            """, cache_key)
            return dict(row) if row else None
        finally:
            await conn.close()
    
    async def set(self, cache_key: str, response: str, 
                  metadata: Dict[str, Any], ttl_seconds: int = 3600):
        conn = await asyncpg.connect(self.connection_string)
        try:
            await conn.execute("""
                INSERT INTO llm_cache (cache_key, response, metadata, expires_at)
                VALUES ($1, $2, $3, NOW() + INTERVAL '%s seconds')
                ON CONFLICT (cache_key) DO UPDATE SET
                    response = EXCLUDED.response,
                    metadata = EXCLUDED.metadata,
                    expires_at = EXCLUDED.expires_at
            """ % ttl_seconds, cache_key, response, metadata)
        finally:
            await conn.close()

Create the table with proper indexing:

CREATE TABLE llm_cache (
    id SERIAL PRIMARY KEY,
    cache_key VARCHAR(64) UNIQUE NOT NULL,
    response TEXT NOT NULL,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP DEFAULT NOW(),
    expires_at TIMESTAMP NOT NULL
);

CREATE INDEX idx_llm_cache_key ON llm_cache (cache_key);
CREATE INDEX idx_llm_cache_expires ON llm_cache (expires_at);

Multi-Layered Cache Architecture

Production systems benefit from multiple cache layers with different characteristics:

class MultiLayerLLMCache:
    def __init__(self, memory_cache, redis_cache, postgres_cache):
        self.memory = memory_cache    # L1: Fast, small, process-local
        self.redis = redis_cache      # L2: Fast, shared, temporary
        self.postgres = postgres_cache # L3: Slow, persistent, searchable
    
    async def get(self, key: str) -> Optional[str]:
        # Check L1 cache first
        result = self.memory.get(key)
        if result:
            return result
        
        # Check L2 cache
        result = await self.redis.get(key)
        if result:
            self.memory.set(key, result)  # Populate L1
            return result
        
        # Check L3 cache
        result = await self.postgres.get(key)
        if result:
            self.memory.set(key, result['response'])
            await self.redis.set(key, result['response'])
            return result['response']
        
        return None
    
    async def set(self, key: str, value: str):
        # Write to all layers
        self.memory.set(key, value)
        await self.redis.set(key, value)
        await self.postgres.set(key, value, {})

Context-Aware Cache Invalidation

Simple TTL-based expiration works for basic cases, but production systems need smarter invalidation strategies.

Version-Based Invalidation

Track versions of underlying data that influences LLM responses:

class VersionedLLMCache:
    def __init__(self, cache_backend):
        self.cache = cache_backend
        self.versions = {}
    
    def create_versioned_key(self, base_key: str, dependencies: List[str]) -> str:
        version_str = "_".join([
            f"{dep}:{self.versions.get(dep, 0)}" 
            for dep in dependencies
        ])
        return f"{base_key}:{version_str}"
    
    def invalidate_dependency(self, dependency: str):
        self.versions[dependency] = self.versions.get(dependency, 0) + 1
        # All cache keys depending on this will now miss

Tag-Based Invalidation

Group related cache entries with tags for bulk invalidation:

class TaggedLLMCache:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def set_with_tags(self, key: str, value: str, tags: List[str], ttl: int = 3600):
        # Store the cached value
        self.redis.setex(f"cache:{key}", ttl, value)
        
        # Store tag associations
        for tag in tags:
            self.redis.sadd(f"tag:{tag}", key)
            self.redis.expire(f"tag:{tag}", ttl + 300)  # Tags live slightly longer
    
    def invalidate_by_tag(self, tag: str):
        # Get all keys with this tag
        keys = self.redis.smembers(f"tag:{tag}")
        if keys:
            # Delete cache entries
            cache_keys = [f"cache:{key.decode()}" for key in keys]
            self.redis.delete(*cache_keys)
            # Clean up the tag set
            self.redis.delete(f"tag:{tag}")

Prompt-Specific Caching Strategies

Different types of LLM operations require different caching approaches.

Content Generation Caching

For content generation tasks, cache based on topic and style parameters:

async def generate_content_with_cache(topic: str, style: str, length: int):
    cache_key = f"content:{hashlib.sha256(f'{topic}:{style}:{length}'.encode()).hexdigest()}"
    
    cached = await cache.get(cache_key)
    if cached:
        return json.loads(cached)
    
    prompt = f"Write a {length}-word {style} article about {topic}"
    response = await llm_client.generate(prompt)
    
    # Cache for 24 hours - content doesn't change frequently
    await cache.set(cache_key, json.dumps(response), ttl=86400)
    return response

Code Analysis Caching

Code analysis can be cached by file content hash:

import hashlib

async def analyze_code_with_cache(code: str, analysis_type: str):
    code_hash = hashlib.sha256(code.encode()).hexdigest()
    cache_key = f"code_analysis:{analysis_type}:{code_hash}"
    
    cached = await cache.get(cache_key)
    if cached:
        return json.loads(cached)
    
    prompt = f"Analyze this {analysis_type}:\n\n{code}"
    response = await llm_client.generate(prompt)
    
    # Cache indefinitely - code analysis for specific content won't change
    await cache.set(cache_key, json.dumps(response), ttl=None)
    return response

We implement this pattern in our AI Schematic Generator, where circuit analysis for identical component configurations can be safely cached long-term.

Conversational Context Caching

For chat applications, cache conversation states to avoid reprocessing entire conversation histories:

class ConversationCache:
    def __init__(self, cache_backend):
        self.cache = cache_backend
    
    async def get_conversation_state(self, conversation_id: str, 
                                   message_count: int) -> Optional[Dict]:
        key = f"conv:{conversation_id}:state:{message_count}"
        cached = await self.cache.get(key)
        return json.loads(cached) if cached else None
    
    async def cache_conversation_state(self, conversation_id: str,
                                     message_count: int, state: Dict):
        key = f"conv:{conversation_id}:state:{message_count}"
        # Cache conversation states for 1 hour
        await self.cache.set(key, json.dumps(state), ttl=3600)

Cache Warming Strategies

Proactively populate caches with likely-needed responses to minimize cache misses.

Batch Pre-computation

For predictable workloads, pre-compute common responses:

async def warm_content_cache():
    common_topics = ["AI trends", "productivity tips", "tech reviews"]
    common_styles = ["blog", "social", "email"]
    
    tasks = []
    for topic in common_topics:
        for style in common_styles:
            task = generate_content_with_cache(topic, style, 500)
            tasks.append(task)
    
    # Pre-populate cache with common combinations
    await asyncio.gather(*tasks, return_exceptions=True)

Usage Pattern Analysis

Monitor cache misses to identify warming opportunities:

class CacheAnalytics:
    def __init__(self, cache_backend):
        self.cache = cache_backend
        self.miss_patterns = defaultdict(int)
    
    def record_miss(self, cache_key: str, prompt_type: str):
        pattern = self.extract_pattern(cache_key)
        self.miss_patterns[f"{prompt_type}:{pattern}"] += 1
    
    def get_warming_candidates(self, threshold: int = 5) -> List[str]:
        return [
            pattern for pattern, count in self.miss_patterns.items()
            if count >= threshold
        ]

Performance Monitoring and Optimization

Track cache performance to optimize your strategy over time.

Cache Metrics Collection

from dataclasses import dataclass
from typing import Dict
import time

@dataclass
class CacheMetrics:
    hits: int = 0
    misses: int = 0
    hit_time_total: float = 0.0
    miss_time_total: float = 0.0
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0
    
    @property
    def average_hit_time(self) -> float:
        return self.hit_time_total / self.hits if self.hits > 0 else 0.0

class MonitoredLLMCache:
    def __init__(self, cache_backend):
        self.cache = cache_backend
        self.metrics = CacheMetrics()
    
    async def get(self, key: str) -> Optional[str]:
        start_time = time.time()
        result = await self.cache.get(key)
        elapsed = time.time() - start_time
        
        if result:
            self.metrics.hits += 1
            self.metrics.hit_time_total += elapsed
        else:
            self.metrics.misses += 1
            self.metrics.miss_time_total += elapsed
        
        return result

Cost Impact Analysis

In our experience across projects like Vidmation and ClawdHub, well-implemented

More from the blog

Need help with AI?

We build production AI systems — from strategy and architecture to deployment and evaluation.

Get our AI implementation playbook

A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.