AI Engineering

Claude API Integration Tutorial: Complete Python Implementation Guide

Connor O'Dea · · 6 min read
Claude API Python Integration AI Development API Tutorial Anthropic

Claude API Integration Tutorial: Complete Python Implementation Guide

Building production AI applications requires robust API integrations that handle authentication, error management, and scaling challenges. In this comprehensive Claude API integration tutorial, we’ll walk through everything from basic setup to advanced patterns we’ve implemented across real projects like ClawdHub and Vidmation.

Claude’s API offers powerful capabilities for text generation, code analysis, and complex reasoning tasks. Whether you’re building AI agents, content automation systems, or intelligent data processing pipelines, understanding how to properly integrate Claude’s API is crucial for reliable production systems.

Prerequisites and Initial Setup

Before diving into the integration, ensure you have the necessary components in place. You’ll need Python 3.8+ and an Anthropic API key from the Claude console.

First, install the official Anthropic SDK:

pip install anthropic python-dotenv

Create a .env file for secure credential management:

ANTHROPIC_API_KEY=your_api_key_here
CLAUDE_MODEL=claude-3-5-sonnet-20241022

Set up your basic project structure:

import os
from dotenv import load_dotenv
from anthropic import Anthropic

load_dotenv()

client = Anthropic(
    api_key=os.environ.get("ANTHROPIC_API_KEY")
)

This foundation provides secure authentication and environment configuration that scales across development and production environments.

Basic Claude API Integration

Let’s start with a fundamental implementation that demonstrates core concepts:

import os
from typing import Optional, Dict, Any
from dotenv import load_dotenv
from anthropic import Anthropic

class ClaudeClient:
    def __init__(self, api_key: Optional[str] = None):
        load_dotenv()
        self.client = Anthropic(
            api_key=api_key or os.environ.get("ANTHROPIC_API_KEY")
        )
        self.model = os.environ.get("CLAUDE_MODEL", "claude-3-5-sonnet-20241022")
    
    def generate_response(
        self, 
        prompt: str, 
        max_tokens: int = 1000,
        temperature: float = 0.7,
        system_prompt: Optional[str] = None
    ) -> str:
        """Generate a response using Claude API."""
        try:
            messages = [{"role": "user", "content": prompt}]
            
            response = self.client.messages.create(
                model=self.model,
                max_tokens=max_tokens,
                temperature=temperature,
                system=system_prompt,
                messages=messages
            )
            
            return response.content[0].text
            
        except Exception as e:
            print(f"Error generating response: {e}")
            return ""

# Usage example
claude = ClaudeClient()
result = claude.generate_response(
    prompt="Explain quantum computing in simple terms",
    system_prompt="You are a technical educator who explains complex topics clearly."
)
print(result)

This basic implementation handles authentication, model configuration, and error management while providing a clean interface for single-turn conversations.

Advanced Integration Patterns

Streaming Responses

For real-time applications, streaming responses provide better user experience by showing incremental results:

def stream_response(
    self, 
    prompt: str, 
    max_tokens: int = 1000,
    system_prompt: Optional[str] = None
) -> None:
    """Stream response tokens in real-time."""
    try:
        messages = [{"role": "user", "content": prompt}]
        
        with self.client.messages.stream(
            model=self.model,
            max_tokens=max_tokens,
            system=system_prompt,
            messages=messages
        ) as stream:
            for text in stream.text_stream:
                print(text, end="", flush=True)
                
    except Exception as e:
        print(f"Error streaming response: {e}")

Multi-Turn Conversations

Managing conversation history enables context-aware interactions:

class ConversationManager:
    def __init__(self, claude_client: ClaudeClient):
        self.client = claude_client
        self.messages = []
        
    def add_message(self, role: str, content: str):
        """Add a message to conversation history."""
        self.messages.append({"role": role, "content": content})
        
    def generate_response(
        self, 
        user_message: str,
        system_prompt: Optional[str] = None
    ) -> str:
        """Generate response with conversation context."""
        self.add_message("user", user_message)
        
        try:
            response = self.client.client.messages.create(
                model=self.client.model,
                max_tokens=1000,
                system=system_prompt,
                messages=self.messages
            )
            
            assistant_message = response.content[0].text
            self.add_message("assistant", assistant_message)
            
            return assistant_message
            
        except Exception as e:
            print(f"Error in conversation: {e}")
            return ""
    
    def clear_history(self):
        """Clear conversation history."""
        self.messages = []

# Usage
conversation = ConversationManager(claude)
response1 = conversation.generate_response("What is machine learning?")
response2 = conversation.generate_response("How does it relate to AI?")

Production-Ready Error Handling

Robust error handling is essential for production systems. Here’s a comprehensive approach:

import time
import logging
from typing import Optional
from anthropic import APIError, RateLimitError, APIConnectionError

class ProductionClaudeClient(ClaudeClient):
    def __init__(self, api_key: Optional[str] = None, max_retries: int = 3):
        super().__init__(api_key)
        self.max_retries = max_retries
        self.logger = logging.getLogger(__name__)
        
    def _exponential_backoff(self, attempt: int) -> float:
        """Calculate exponential backoff delay."""
        return min(60, (2 ** attempt) + (time.time() % 1))
    
    def generate_with_retry(
        self, 
        prompt: str, 
        **kwargs
    ) -> Optional[str]:
        """Generate response with retry logic and error handling."""
        for attempt in range(self.max_retries):
            try:
                return self.generate_response(prompt, **kwargs)
                
            except RateLimitError as e:
                if attempt < self.max_retries - 1:
                    delay = self._exponential_backoff(attempt)
                    self.logger.warning(f"Rate limit hit, retrying in {delay}s")
                    time.sleep(delay)
                    continue
                else:
                    self.logger.error("Max retries exceeded for rate limiting")
                    raise
                    
            except APIConnectionError as e:
                if attempt < self.max_retries - 1:
                    delay = self._exponential_backoff(attempt)
                    self.logger.warning(f"Connection error, retrying in {delay}s")
                    time.sleep(delay)
                    continue
                else:
                    self.logger.error("Max retries exceeded for connection errors")
                    raise
                    
            except APIError as e:
                self.logger.error(f"API error: {e}")
                raise
                
        return None

Real-World Implementation Examples

Content Generation Pipeline

Based on our Vidmation project, here’s how we implement batch content generation:

class ContentGenerator:
    def __init__(self, claude_client: ProductionClaudeClient):
        self.client = claude_client
        
    def generate_video_script(
        self, 
        topic: str, 
        duration: int = 300,
        style: str = "educational"
    ) -> Dict[str, Any]:
        """Generate video script with timestamps and scenes."""
        system_prompt = f"""You are a video script writer. Create a {duration}-second {style} script about the given topic. 
        Format your response as JSON with 'title', 'scenes' (array with 'timestamp', 'dialogue', 'visuals'), and 'summary'."""
        
        prompt = f"Create a video script about: {topic}"
        
        response = self.client.generate_with_retry(
            prompt=prompt,
            system_prompt=system_prompt,
            temperature=0.8
        )
        
        try:
            import json
            return json.loads(response)
        except json.JSONDecodeError:
            return {"error": "Failed to parse script response"}
    
    def batch_generate_content(self, topics: list) -> list:
        """Generate content for multiple topics."""
        results = []
        
        for topic in topics:
            try:
                script = self.generate_video_script(topic)
                results.append({
                    "topic": topic,
                    "script": script,
                    "status": "success"
                })
            except Exception as e:
                results.append({
                    "topic": topic,
                    "error": str(e),
                    "status": "failed"
                })
                
        return results

AI Agent Orchestration

Drawing from our ClawdHub project, here’s how we manage multiple Claude-powered agents:

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AgentOrchestrator:
    def __init__(self, claude_client: ProductionClaudeClient):
        self.client = claude_client
        self.agents = {}
        
    def create_agent(
        self, 
        agent_id: str, 
        role: str, 
        system_prompt: str
    ):
        """Create a specialized agent with specific role and prompt."""
        self.agents[agent_id] = {
            "role": role,
            "system_prompt": system_prompt,
            "conversation": ConversationManager(self.client)
        }
        
    async def run_agent_task(
        self, 
        agent_id: str, 
        task: str
    ) -> Dict[str, Any]:
        """Run a task with specific agent."""
        if agent_id not in self.agents:
            return {"error": f"Agent {agent_id} not found"}
            
        agent = self.agents[agent_id]
        
        try:
            response = agent["conversation"].generate_response(
                user_message=task,
                system_prompt=agent["system_prompt"]
            )
            
            return {
                "agent_id": agent_id,
                "role": agent["role"],
                "task": task,
                "response": response,
                "status": "completed"
            }
            
        except Exception as e:
            return {
                "agent_id": agent_id,
                "error": str(e),
                "status": "failed"
            }
    
    async def orchestrate_multi_agent_task(self, tasks: Dict[str, str]) -> Dict[str, Any]:
        """Run multiple tasks across different agents concurrently."""
        results = await asyncio.gather(*[
            self.run_agent_task(agent_id, task) 
            for agent_id, task in tasks.items()
        ])
        
        return {
            "orchestration_id": f"task_{int(time.time())}",
            "results": results,
            "summary": self._summarize_results(results)
        }
    
    def _summarize_results(self, results: list) -> Dict[str, Any]:
        """Summarize orchestration results."""
        completed = len([r for r in results if r.get("status") == "completed"])
        failed = len([r for r in results if r.get("status") == "failed"])
        
        return {
            "total_tasks": len(results),
            "completed": completed,
            "failed": failed,
            "success_rate": completed / len(results) if results else 0
        }

Performance Optimization and Monitoring

Request Batching and Caching

Implement intelligent caching and request optimization:

import hashlib
from typing import Optional
from functools import lru_cache

class OptimizedClaudeClient(ProductionClaudeClient):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.response_cache = {}
        
    def _generate_cache_key(self, prompt: str, **kwargs) -> str:
        """Generate cache key for request."""
        cache_data = f"{prompt}_{str(sorted(kwargs.items()))}"
        return hashlib.md5(cache_data.encode()).hexdigest()
    
    def generate_cached_response(
        self, 
        prompt: str, 
        use_cache: bool = True,
        **kwargs
    ) -> str:
        """Generate response with optional caching."""
        cache_key = self._generate_cache_key(prompt, **kwargs)
        
        if use_cache and cache_key in self.response_cache:
            self.logger.info(f"Cache hit for key: {cache_key}")
            return self.response_cache[cache_key]
        
        response = self.generate_with_retry(prompt, **kwargs)
        
        if response and use_cache:
            self.response_cache[cache_key] = response
            
        return response
    
    def batch_process(
        self, 
        prompts: list, 
        max_workers: int = 5
    ) -> list:
        """Process multiple prompts concurrently."""
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [
                executor.submit(self.generate_with_retry, prompt)
                for prompt in prompts
            ]
            
            results = []
            for i, future in enumerate(futures):
                try:
                    result = future.result(timeout=30)
                    results.append({
                        "index": i,
                        "prompt": prompts[i],
                        "response": result,
                        "status": "success"
                    })
                except Exception as e:
                    results.append({
                        "index": i,
                        "prompt": prompts[i],
                        "error": str(e),
                        "status": "failed"
                    })
                    
            return results

FastAPI Integration

For web applications and APIs:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()
claude_client = OptimizedClaudeClient()

class ChatRequest(BaseModel):
    message: str
    system_prompt: Optional[str] = None
    temperature: Optional[float] = 0.7

class ChatResponse(BaseModel):
    response: str
    model: str
    tokens_used: Optional[int] = None

@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
    try:
        response = claude_client.generate_cached_response(
            prompt=request.message,
            system_prompt=request.system_prompt,
            temperature=request.temperature
        )
        
        return ChatResponse(
            response=response,
            model=claude_client.model
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "claude-api-integration"}

This FastAPI integration provides a production-ready endpoint that we’ve successfully deployed in multiple client projects, including AI consulting implementations where reliable API access is crucial.

Key Takeaways

  • Authentication and Security: Always use environment variables for API keys and implement proper credential management
  • Error Handling: Implement exponential backoff, retry logic, and comprehensive error catching for production resilience
  • Performance Optimization: Use caching, request batching, and connection pooling to optimize API usage and reduce costs
  • Conversation Management: Maintain conversation history for context-aware interactions in multi-turn scenarios
  • **

More from the blog

Need help with AI?

We build production AI systems — from strategy and architecture to deployment and evaluation.

Get our AI implementation playbook

A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.