Claude API Integration Tutorial: Complete Python Implementation Guide
Claude API Integration Tutorial: Complete Python Implementation Guide
Building production AI applications requires robust API integrations that handle authentication, error management, and scaling challenges. In this comprehensive Claude API integration tutorial, we’ll walk through everything from basic setup to advanced patterns we’ve implemented across real projects like ClawdHub and Vidmation.
Claude’s API offers powerful capabilities for text generation, code analysis, and complex reasoning tasks. Whether you’re building AI agents, content automation systems, or intelligent data processing pipelines, understanding how to properly integrate Claude’s API is crucial for reliable production systems.
Prerequisites and Initial Setup
Before diving into the integration, ensure you have the necessary components in place. You’ll need Python 3.8+ and an Anthropic API key from the Claude console.
First, install the official Anthropic SDK:
pip install anthropic python-dotenv
Create a .env file for secure credential management:
ANTHROPIC_API_KEY=your_api_key_here
CLAUDE_MODEL=claude-3-5-sonnet-20241022
Set up your basic project structure:
import os
from dotenv import load_dotenv
from anthropic import Anthropic
load_dotenv()
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
This foundation provides secure authentication and environment configuration that scales across development and production environments.
Basic Claude API Integration
Let’s start with a fundamental implementation that demonstrates core concepts:
import os
from typing import Optional, Dict, Any
from dotenv import load_dotenv
from anthropic import Anthropic
class ClaudeClient:
def __init__(self, api_key: Optional[str] = None):
load_dotenv()
self.client = Anthropic(
api_key=api_key or os.environ.get("ANTHROPIC_API_KEY")
)
self.model = os.environ.get("CLAUDE_MODEL", "claude-3-5-sonnet-20241022")
def generate_response(
self,
prompt: str,
max_tokens: int = 1000,
temperature: float = 0.7,
system_prompt: Optional[str] = None
) -> str:
"""Generate a response using Claude API."""
try:
messages = [{"role": "user", "content": prompt}]
response = self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
system=system_prompt,
messages=messages
)
return response.content[0].text
except Exception as e:
print(f"Error generating response: {e}")
return ""
# Usage example
claude = ClaudeClient()
result = claude.generate_response(
prompt="Explain quantum computing in simple terms",
system_prompt="You are a technical educator who explains complex topics clearly."
)
print(result)
This basic implementation handles authentication, model configuration, and error management while providing a clean interface for single-turn conversations.
Advanced Integration Patterns
Streaming Responses
For real-time applications, streaming responses provide better user experience by showing incremental results:
def stream_response(
self,
prompt: str,
max_tokens: int = 1000,
system_prompt: Optional[str] = None
) -> None:
"""Stream response tokens in real-time."""
try:
messages = [{"role": "user", "content": prompt}]
with self.client.messages.stream(
model=self.model,
max_tokens=max_tokens,
system=system_prompt,
messages=messages
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except Exception as e:
print(f"Error streaming response: {e}")
Multi-Turn Conversations
Managing conversation history enables context-aware interactions:
class ConversationManager:
def __init__(self, claude_client: ClaudeClient):
self.client = claude_client
self.messages = []
def add_message(self, role: str, content: str):
"""Add a message to conversation history."""
self.messages.append({"role": role, "content": content})
def generate_response(
self,
user_message: str,
system_prompt: Optional[str] = None
) -> str:
"""Generate response with conversation context."""
self.add_message("user", user_message)
try:
response = self.client.client.messages.create(
model=self.client.model,
max_tokens=1000,
system=system_prompt,
messages=self.messages
)
assistant_message = response.content[0].text
self.add_message("assistant", assistant_message)
return assistant_message
except Exception as e:
print(f"Error in conversation: {e}")
return ""
def clear_history(self):
"""Clear conversation history."""
self.messages = []
# Usage
conversation = ConversationManager(claude)
response1 = conversation.generate_response("What is machine learning?")
response2 = conversation.generate_response("How does it relate to AI?")
Production-Ready Error Handling
Robust error handling is essential for production systems. Here’s a comprehensive approach:
import time
import logging
from typing import Optional
from anthropic import APIError, RateLimitError, APIConnectionError
class ProductionClaudeClient(ClaudeClient):
def __init__(self, api_key: Optional[str] = None, max_retries: int = 3):
super().__init__(api_key)
self.max_retries = max_retries
self.logger = logging.getLogger(__name__)
def _exponential_backoff(self, attempt: int) -> float:
"""Calculate exponential backoff delay."""
return min(60, (2 ** attempt) + (time.time() % 1))
def generate_with_retry(
self,
prompt: str,
**kwargs
) -> Optional[str]:
"""Generate response with retry logic and error handling."""
for attempt in range(self.max_retries):
try:
return self.generate_response(prompt, **kwargs)
except RateLimitError as e:
if attempt < self.max_retries - 1:
delay = self._exponential_backoff(attempt)
self.logger.warning(f"Rate limit hit, retrying in {delay}s")
time.sleep(delay)
continue
else:
self.logger.error("Max retries exceeded for rate limiting")
raise
except APIConnectionError as e:
if attempt < self.max_retries - 1:
delay = self._exponential_backoff(attempt)
self.logger.warning(f"Connection error, retrying in {delay}s")
time.sleep(delay)
continue
else:
self.logger.error("Max retries exceeded for connection errors")
raise
except APIError as e:
self.logger.error(f"API error: {e}")
raise
return None
Real-World Implementation Examples
Content Generation Pipeline
Based on our Vidmation project, here’s how we implement batch content generation:
class ContentGenerator:
def __init__(self, claude_client: ProductionClaudeClient):
self.client = claude_client
def generate_video_script(
self,
topic: str,
duration: int = 300,
style: str = "educational"
) -> Dict[str, Any]:
"""Generate video script with timestamps and scenes."""
system_prompt = f"""You are a video script writer. Create a {duration}-second {style} script about the given topic.
Format your response as JSON with 'title', 'scenes' (array with 'timestamp', 'dialogue', 'visuals'), and 'summary'."""
prompt = f"Create a video script about: {topic}"
response = self.client.generate_with_retry(
prompt=prompt,
system_prompt=system_prompt,
temperature=0.8
)
try:
import json
return json.loads(response)
except json.JSONDecodeError:
return {"error": "Failed to parse script response"}
def batch_generate_content(self, topics: list) -> list:
"""Generate content for multiple topics."""
results = []
for topic in topics:
try:
script = self.generate_video_script(topic)
results.append({
"topic": topic,
"script": script,
"status": "success"
})
except Exception as e:
results.append({
"topic": topic,
"error": str(e),
"status": "failed"
})
return results
AI Agent Orchestration
Drawing from our ClawdHub project, here’s how we manage multiple Claude-powered agents:
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AgentOrchestrator:
def __init__(self, claude_client: ProductionClaudeClient):
self.client = claude_client
self.agents = {}
def create_agent(
self,
agent_id: str,
role: str,
system_prompt: str
):
"""Create a specialized agent with specific role and prompt."""
self.agents[agent_id] = {
"role": role,
"system_prompt": system_prompt,
"conversation": ConversationManager(self.client)
}
async def run_agent_task(
self,
agent_id: str,
task: str
) -> Dict[str, Any]:
"""Run a task with specific agent."""
if agent_id not in self.agents:
return {"error": f"Agent {agent_id} not found"}
agent = self.agents[agent_id]
try:
response = agent["conversation"].generate_response(
user_message=task,
system_prompt=agent["system_prompt"]
)
return {
"agent_id": agent_id,
"role": agent["role"],
"task": task,
"response": response,
"status": "completed"
}
except Exception as e:
return {
"agent_id": agent_id,
"error": str(e),
"status": "failed"
}
async def orchestrate_multi_agent_task(self, tasks: Dict[str, str]) -> Dict[str, Any]:
"""Run multiple tasks across different agents concurrently."""
results = await asyncio.gather(*[
self.run_agent_task(agent_id, task)
for agent_id, task in tasks.items()
])
return {
"orchestration_id": f"task_{int(time.time())}",
"results": results,
"summary": self._summarize_results(results)
}
def _summarize_results(self, results: list) -> Dict[str, Any]:
"""Summarize orchestration results."""
completed = len([r for r in results if r.get("status") == "completed"])
failed = len([r for r in results if r.get("status") == "failed"])
return {
"total_tasks": len(results),
"completed": completed,
"failed": failed,
"success_rate": completed / len(results) if results else 0
}
Performance Optimization and Monitoring
Request Batching and Caching
Implement intelligent caching and request optimization:
import hashlib
from typing import Optional
from functools import lru_cache
class OptimizedClaudeClient(ProductionClaudeClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.response_cache = {}
def _generate_cache_key(self, prompt: str, **kwargs) -> str:
"""Generate cache key for request."""
cache_data = f"{prompt}_{str(sorted(kwargs.items()))}"
return hashlib.md5(cache_data.encode()).hexdigest()
def generate_cached_response(
self,
prompt: str,
use_cache: bool = True,
**kwargs
) -> str:
"""Generate response with optional caching."""
cache_key = self._generate_cache_key(prompt, **kwargs)
if use_cache and cache_key in self.response_cache:
self.logger.info(f"Cache hit for key: {cache_key}")
return self.response_cache[cache_key]
response = self.generate_with_retry(prompt, **kwargs)
if response and use_cache:
self.response_cache[cache_key] = response
return response
def batch_process(
self,
prompts: list,
max_workers: int = 5
) -> list:
"""Process multiple prompts concurrently."""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(self.generate_with_retry, prompt)
for prompt in prompts
]
results = []
for i, future in enumerate(futures):
try:
result = future.result(timeout=30)
results.append({
"index": i,
"prompt": prompts[i],
"response": result,
"status": "success"
})
except Exception as e:
results.append({
"index": i,
"prompt": prompts[i],
"error": str(e),
"status": "failed"
})
return results
Integration with Popular Frameworks
FastAPI Integration
For web applications and APIs:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
claude_client = OptimizedClaudeClient()
class ChatRequest(BaseModel):
message: str
system_prompt: Optional[str] = None
temperature: Optional[float] = 0.7
class ChatResponse(BaseModel):
response: str
model: str
tokens_used: Optional[int] = None
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
try:
response = claude_client.generate_cached_response(
prompt=request.message,
system_prompt=request.system_prompt,
temperature=request.temperature
)
return ChatResponse(
response=response,
model=claude_client.model
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "claude-api-integration"}
This FastAPI integration provides a production-ready endpoint that we’ve successfully deployed in multiple client projects, including AI consulting implementations where reliable API access is crucial.
Key Takeaways
- Authentication and Security: Always use environment variables for API keys and implement proper credential management
- Error Handling: Implement exponential backoff, retry logic, and comprehensive error catching for production resilience
- Performance Optimization: Use caching, request batching, and connection pooling to optimize API usage and reduce costs
- Conversation Management: Maintain conversation history for context-aware interactions in multi-turn scenarios
- **
More from the blog
Need help with AI?
We build production AI systems — from strategy and architecture to deployment and evaluation.
Get our AI implementation playbook
A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.
Check your inbox.
Something went wrong. Please try again.