AI Engineering

How to Build a RAG Pipeline in Python: Production-Ready Implementation Guide

Connor O'Dea · · 6 min read
RAG Python Vector Databases LLMs Production AI

Retrieval-Augmented Generation (RAG) has become the gold standard for building AI systems that need access to specific knowledge bases. When we built ClawdHub, our AI agent orchestration platform with over 13K lines of Python, we implemented multiple RAG pipelines to give agents contextual access to project documentation and code repositories. The architecture patterns we developed have since powered everything from our Vidmation content automation system to custom knowledge bases for enterprise clients.

Learning how to build RAG pipeline Python implementations isn’t just about connecting an LLM to a vector database. It’s about creating a robust, scalable system that can handle real-world data complexity, user queries at scale, and production reliability requirements. In this guide, we’ll walk through building a complete RAG pipeline from scratch, covering everything from document processing to deployment strategies.

Understanding RAG Pipeline Architecture

A production RAG pipeline consists of five core components that work together to transform raw documents into contextually-aware AI responses:

Document Ingestion & Processing: Your pipeline needs to handle various document formats (PDFs, Word docs, web pages, code files) and extract meaningful text while preserving structure and metadata.

Chunking Strategy: Raw documents must be split into optimal chunks that balance context preservation with retrieval accuracy. This is where many implementations fail — poor chunking destroys the semantic coherence needed for accurate retrieval.

Embedding Generation: Converting text chunks into vector representations using models like OpenAI’s text-embedding-ada-002 or open-source alternatives like Sentence Transformers.

Vector Storage & Retrieval: Storing embeddings in a vector database (Pinecone, Weaviate, Chroma) with efficient similarity search capabilities.

Generation Pipeline: Combining retrieved context with user queries to generate accurate, grounded responses using LLMs like GPT-4 or Claude.

Setting Up Your Development Environment

Before diving into implementation, let’s establish the core dependencies for our RAG pipeline:

# requirements.txt
openai==1.12.0
langchain==0.1.0
chromadb==0.4.22
sentence-transformers==2.3.1
pypdf2==3.0.1
python-docx==0.8.11
beautifulsoup4==4.12.2
fastapi==0.108.0
uvicorn==0.25.0
pydantic==2.5.2
tiktoken==0.5.2

Our base pipeline will use ChromaDB for vector storage (easy local development) and OpenAI for embeddings and generation, but we’ll show how to swap components for production deployments.

Building the Document Processing Layer

The foundation of any RAG system is robust document processing. Here’s our production-tested approach:

import os
from typing import List, Dict, Any
from pathlib import Path
import PyPDF2
from docx import Document
from bs4 import BeautifulSoup
import tiktoken

class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
    
    def extract_text(self, file_path: str) -> Dict[str, Any]:
        """Extract text from various file formats."""
        path = Path(file_path)
        extension = path.suffix.lower()
        
        extractors = {
            '.pdf': self._extract_pdf,
            '.docx': self._extract_docx,
            '.txt': self._extract_txt,
            '.py': self._extract_code,
            '.js': self._extract_code,
            '.md': self._extract_txt
        }
        
        if extension not in extractors:
            raise ValueError(f"Unsupported file type: {extension}")
        
        text = extractors[extension](file_path)
        
        return {
            'text': text,
            'source': file_path,
            'type': extension[1:],  # Remove the dot
            'tokens': len(self.tokenizer.encode(text))
        }
    
    def _extract_pdf(self, file_path: str) -> str:
        """Extract text from PDF files."""
        text = ""
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text.strip()
    
    def _extract_docx(self, file_path: str) -> str:
        """Extract text from Word documents."""
        doc = Document(file_path)
        return "\n".join([paragraph.text for paragraph in doc.paragraphs])
    
    def _extract_txt(self, file_path: str) -> str:
        """Extract text from plain text files."""
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    
    def _extract_code(self, file_path: str) -> str:
        """Extract code with syntax preservation."""
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        return f"```{Path(file_path).suffix[1:]}\n{content}\n```"

This processor handles the most common document types and preserves important metadata. The key insight here is treating different file types differently — code files get wrapped in markdown code blocks to preserve syntax highlighting context.

Implementing Smart Chunking Strategies

Poor chunking is the #1 reason RAG systems fail in production. Here’s our approach that balances semantic coherence with retrieval accuracy:

import re
from typing import List, Tuple

class SmartChunker:
    def __init__(self, chunk_size: int = 1000, overlap: int = 200):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
    
    def chunk_document(self, text: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Create semantically meaningful chunks from text."""
        # First, try semantic splitting
        if metadata['type'] in ['py', 'js', 'ts']:
            return self._chunk_code(text, metadata)
        elif metadata['type'] == 'md':
            return self._chunk_markdown(text, metadata)
        else:
            return self._chunk_text(text, metadata)
    
    def _chunk_text(self, text: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Chunk regular text by paragraphs, then sentences."""
        paragraphs = text.split('\n\n')
        chunks = []
        current_chunk = ""
        
        for paragraph in paragraphs:
            if not paragraph.strip():
                continue
                
            # Check if adding this paragraph exceeds chunk size
            potential_chunk = current_chunk + "\n\n" + paragraph if current_chunk else paragraph
            
            if len(self.tokenizer.encode(potential_chunk)) <= self.chunk_size:
                current_chunk = potential_chunk
            else:
                # Current chunk is ready, start a new one
                if current_chunk:
                    chunks.append(self._create_chunk(current_chunk, metadata, len(chunks)))
                
                # If single paragraph is too long, split by sentences
                if len(self.tokenizer.encode(paragraph)) > self.chunk_size:
                    sentence_chunks = self._split_by_sentences(paragraph, metadata, len(chunks))
                    chunks.extend(sentence_chunks)
                    current_chunk = ""
                else:
                    current_chunk = paragraph
        
        # Don't forget the last chunk
        if current_chunk:
            chunks.append(self._create_chunk(current_chunk, metadata, len(chunks)))
        
        return self._add_overlap(chunks)
    
    def _chunk_code(self, text: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Chunk code by functions/classes while preserving context."""
        # Remove code block wrapper if present
        if text.startswith('```'):
            lines = text.split('\n')
            text = '\n'.join(lines[1:-1])
        
        chunks = []
        lines = text.split('\n')
        current_chunk_lines = []
        current_indent = 0
        
        for line in lines:
            # Detect function/class definitions
            if re.match(r'^(class|def|function|const|let|var)\s+', line.strip()):
                # Save current chunk if it exists
                if current_chunk_lines:
                    chunk_text = '\n'.join(current_chunk_lines)
                    if chunk_text.strip():
                        chunks.append(self._create_chunk(
                            f"```{metadata['type']}\n{chunk_text}\n```",
                            metadata, 
                            len(chunks)
                        ))
                
                # Start new chunk
                current_chunk_lines = [line]
                current_indent = len(line) - len(line.lstrip())
            else:
                current_chunk_lines.append(line)
                
                # Check if chunk is getting too large
                chunk_text = '\n'.join(current_chunk_lines)
                if len(self.tokenizer.encode(chunk_text)) > self.chunk_size:
                    # Split at natural boundaries
                    if len(current_chunk_lines) > 1:
                        # Keep all but the last line
                        chunk_text = '\n'.join(current_chunk_lines[:-1])
                        chunks.append(self._create_chunk(
                            f"```{metadata['type']}\n{chunk_text}\n```",
                            metadata,
                            len(chunks)
                        ))
                        current_chunk_lines = [current_chunk_lines[-1]]
        
        # Add final chunk
        if current_chunk_lines:
            chunk_text = '\n'.join(current_chunk_lines)
            if chunk_text.strip():
                chunks.append(self._create_chunk(
                    f"```{metadata['type']}\n{chunk_text}\n```",
                    metadata,
                    len(chunks)
                ))
        
        return chunks
    
    def _create_chunk(self, text: str, metadata: Dict[str, Any], chunk_id: int) -> Dict[str, Any]:
        """Create a standardized chunk object."""
        return {
            'text': text,
            'metadata': {
                **metadata,
                'chunk_id': chunk_id,
                'tokens': len(self.tokenizer.encode(text))
            }
        }
    
    def _add_overlap(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Add overlap between consecutive chunks."""
        if len(chunks) <= 1:
            return chunks
        
        overlapped_chunks = [chunks[0]]  # First chunk stays the same
        
        for i in range(1, len(chunks)):
            current_chunk = chunks[i]['text']
            previous_chunk = chunks[i-1]['text']
            
            # Get last N tokens from previous chunk
            prev_tokens = self.tokenizer.encode(previous_chunk)
            overlap_tokens = prev_tokens[-self.overlap:] if len(prev_tokens) > self.overlap else prev_tokens
            overlap_text = self.tokenizer.decode(overlap_tokens)
            
            # Prepend overlap to current chunk
            overlapped_text = overlap_text + "\n" + current_chunk
            
            overlapped_chunk = chunks[i].copy()
            overlapped_chunk['text'] = overlapped_text
            overlapped_chunk['metadata']['tokens'] = len(self.tokenizer.encode(overlapped_text))
            
            overlapped_chunks.append(overlapped_chunk)
        
        return overlapped_chunks

This chunking strategy adapts to content type. For code, it preserves function boundaries. For regular text, it maintains paragraph structure. The overlap mechanism ensures no context is lost between chunks.

Vector Database Integration

Now let’s implement the vector storage and retrieval layer. We’re using ChromaDB for local development, but the interface makes it easy to swap for production databases:

import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import openai
from typing import List, Dict, Any, Optional
import numpy as np

class VectorStore:
    def __init__(self, 
                 collection_name: str = "rag_documents",
                 embedding_model: str = "openai",  # or "sentence-transformers"
                 model_name: str = "text-embedding-ada-002"):
        
        self.collection_name = collection_name
        self.embedding_model = embedding_model
        self.model_name = model_name
        
        # Initialize ChromaDB
        self.client = chromadb.Client(Settings(
            chroma_db_impl="duckdb+parquet",
            persist_directory="./chroma_db"
        ))
        
        # Get or create collection
        try:
            self.collection = self.client.get_collection(collection_name)
        except:
            self.collection = self.client.create_collection(collection_name)
        
        # Initialize embedding model
        if embedding_model == "sentence-transformers":
            self.embedder = SentenceTransformer(model_name)
        elif embedding_model == "openai":
            openai.api_key = os.getenv("OPENAI_API_KEY")
    
    def embed_text(self, text: str) -> List[float]:
        """Generate embeddings for text."""
        if self.embedding_model == "openai":
            response = openai.embeddings.create(
                model=self.model_name,
                input=text
            )
            return response.data[0].embedding
        else:
            return self.embedder.encode(text).tolist()
    
    def add_documents(self, chunks: List[Dict[str, Any]]) -> None:
        """Add document chunks to vector store."""
        texts = [chunk['text'] for chunk in chunks]
        embeddings = []
        
        # Batch embedding generation for efficiency
        if self.embedding_model == "openai":
            # OpenAI supports batch embedding
            response = openai.embeddings.create(
                model=self.model_name,
                input=texts
            )
            embeddings = [data.embedding for data in response.data]
        else:
            embeddings = self.embedder.encode(texts).tolist()
        
        # Prepare data for ChromaDB
        ids = [f"{chunk['metadata']['source']}_{chunk['metadata']['chunk_id']}" for chunk in chunks]
        metadatas = [chunk['metadata'] for chunk in chunks]
        
        # Add to collection
        self.collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids
        )
        
        print(f"Added {len(chunks)} chunks to vector store")
    
    def similarity_search(self, 
                         query: str, 
                         k: int = 5,
                         filter_dict: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
        """Search for similar documents."""
        query_embedding = self.embed_text(query)
        
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=k,
            where=filter_dict
        )
        
        # Format results
        formatted_results = []
        for i in range(len(results['documents'][0])):
            formatted_results.append({
                'text': results['documents'][0][i],
                'metadata': results['metadatas'][0][i],
                'distance': results['distances'][0][i]
            })
        
        return formatted_results
    
    def delete_collection(self):
        """Delete the entire collection."""
        self.client.delete_collection(self.collection_name)
        
    def get_collection_stats(self) -> Dict

More from the blog

Need help with AI?

We build production AI systems — from strategy and architecture to deployment and evaluation.

Get our AI implementation playbook

A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.