AI Engineering

Building an AI Video Automation Pipeline with Python: Vidmation Case Study

Connor O'Dea · · 6 min read
Python AI Automation Video Processing Case Study

Building an AI video automation pipeline with Python that actually works in production requires solving dozens of technical challenges most tutorials skip over. We learned this firsthand developing Vidmation, our AI-powered YouTube content automation system that generates scripts, voiceovers, visuals, and complete videos from simple prompts.

This case study breaks down the real architecture, code patterns, and engineering decisions behind a production AI video automation pipeline Python implementation that processes hundreds of videos monthly.

The Challenge: End-to-End Video Automation

Traditional video creation involves multiple manual steps: ideation, scripting, voiceover recording, visual selection, editing, and publishing. Each step requires different tools, skills, and significant time investment. Our goal was to compress this entire workflow into a single automated pipeline.

The technical requirements were ambitious:

  • Generate coherent, engaging video scripts from topic inputs
  • Create natural-sounding voiceovers with emotional variation
  • Select and sequence appropriate visuals automatically
  • Handle video editing, transitions, and timing
  • Scale to process multiple videos concurrently
  • Maintain consistent quality across outputs

Architecture Overview

Our AI video automation pipeline Python implementation follows a modular orchestration pattern we’ve refined across projects like ClawdHub and AgentAgent. The core architecture separates concerns into specialized components:

from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional
import asyncio
from pathlib import Path

class VideoStage(Enum):
    SCRIPT_GENERATION = "script_generation"
    VOICEOVER_SYNTHESIS = "voiceover_synthesis"
    VISUAL_SELECTION = "visual_selection"
    VIDEO_ASSEMBLY = "video_assembly"
    POST_PROCESSING = "post_processing"

@dataclass
class VideoJob:
    id: str
    topic: str
    duration_target: int  # seconds
    style_preferences: Dict
    status: VideoStage
    artifacts: Dict[str, Path]
    metadata: Dict

The pipeline orchestrator manages job flow through each stage, with comprehensive error handling and retry logic we detailed in our AI agent error handling guide.

Script Generation Engine

The first stage transforms topic inputs into structured video scripts. We use Claude API with custom prompts optimized for video content:

import anthropic
from typing import Dict, List
import json

class ScriptGenerator:
    def __init__(self, api_key: str):
        self.client = anthropic.Anthropic(api_key=api_key)
        
    async def generate_script(self, topic: str, duration: int, style: Dict) -> Dict:
        prompt = self._build_script_prompt(topic, duration, style)
        
        response = await self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=4000,
            temperature=0.7,
            system="You are an expert video scriptwriter who creates engaging, well-structured content.",
            messages=[{"role": "user", "content": prompt}]
        )
        
        return self._parse_script_response(response.content[0].text)
    
    def _build_script_prompt(self, topic: str, duration: int, style: Dict) -> str:
        return f"""
        Create a {duration}-second video script about: {topic}
        
        Style requirements:
        - Tone: {style.get('tone', 'informative')}
        - Target audience: {style.get('audience', 'general')}
        - Format: {style.get('format', 'educational')}
        
        Structure the output as JSON with these fields:
        - title: Video title
        - hook: Opening 10-15 seconds to grab attention
        - main_content: Core content broken into segments
        - conclusion: Strong closing with call-to-action
        - visual_cues: Descriptions for each segment
        - timing: Approximate duration for each part
        """
    
    def _parse_script_response(self, response: str) -> Dict:
        # Extract JSON from response, handle formatting issues
        try:
            # Remove markdown formatting if present
            if "```json" in response:
                response = response.split("```json")[1].split("```")[0]
            return json.loads(response.strip())
        except json.JSONDecodeError:
            # Fallback parsing logic for malformed responses
            return self._fallback_parse(response)

The script generation incorporates lessons from our prompt engineering for production applications guide, including structured output validation and error recovery.

Voiceover Synthesis Pipeline

Converting scripts to natural speech requires careful handling of pacing, emphasis, and emotional variation:

from elevenlabs import generate, Voice, VoiceSettings
import asyncio
from pathlib import Path
import wave
import numpy as np

class VoiceoverSynthesizer:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.voice_settings = VoiceSettings(
            stability=0.75,
            similarity_boost=0.85,
            style=0.6,
            use_speaker_boost=True
        )
    
    async def synthesize_script(self, script_data: Dict, voice_id: str) -> Path:
        """Convert script segments to audio with appropriate pacing"""
        audio_segments = []
        
        for segment in script_data['segments']:
            # Add natural pauses between segments
            processed_text = self._add_speech_markers(segment['text'])
            
            audio = await self._generate_segment_audio(
                processed_text, 
                voice_id,
                segment.get('emotion', 'neutral')
            )
            
            audio_segments.append(audio)
            
            # Add pause between segments
            if segment != script_data['segments'][-1]:
                pause = self._generate_pause(segment.get('pause_duration', 1.0))
                audio_segments.append(pause)
        
        # Combine all segments
        final_audio = self._combine_audio_segments(audio_segments)
        output_path = Path(f"temp/voiceover_{script_data['id']}.wav")
        self._save_audio(final_audio, output_path)
        
        return output_path
    
    def _add_speech_markers(self, text: str) -> str:
        """Add SSML-like markers for natural speech patterns"""
        # Add pauses after punctuation
        text = text.replace('. ', '.<break time="0.5s"/> ')
        text = text.replace('! ', '!<break time="0.7s"/> ')
        text = text.replace('? ', '?<break time="0.6s"/> ')
        
        # Emphasize important words
        text = self._add_emphasis_markers(text)
        
        return text
    
    async def _generate_segment_audio(self, text: str, voice_id: str, emotion: str) -> np.ndarray:
        """Generate audio for a single segment with emotion adjustment"""
        # Adjust voice settings based on emotion
        settings = self._adjust_voice_for_emotion(emotion)
        
        audio = generate(
            text=text,
            voice=Voice(voice_id=voice_id, settings=settings),
            model="eleven_multilingual_v2"
        )
        
        return np.frombuffer(audio, dtype=np.int16)

Visual Asset Management

Selecting appropriate visuals requires understanding both script content and timing requirements:

import cv2
import numpy as np
from typing import List, Tuple
import requests
from pathlib import Path
import asyncio
import aiohttp

class VisualAssetManager:
    def __init__(self, stock_api_key: str):
        self.stock_api_key = stock_api_key
        self.asset_cache = {}
    
    async def select_visuals_for_script(self, script_data: Dict) -> List[Dict]:
        """Select and download appropriate visuals for each script segment"""
        visual_plan = []
        
        for i, segment in enumerate(script_data['segments']):
            visual_cues = segment.get('visual_cues', [])
            duration = segment.get('duration', 5.0)
            
            # Determine visual type and search terms
            search_terms = self._extract_visual_keywords(segment['text'])
            visual_type = self._determine_visual_type(segment.get('type', 'general'))
            
            # Select assets based on segment requirements
            assets = await self._search_and_select_assets(
                search_terms, 
                visual_type, 
                duration
            )
            
            visual_plan.append({
                'segment_id': i,
                'duration': duration,
                'assets': assets,
                'transitions': self._plan_transitions(assets, duration)
            })
        
        return visual_plan
    
    async def _search_and_select_assets(self, terms: List[str], visual_type: str, duration: float) -> List[Dict]:
        """Search stock APIs and select best matching assets"""
        # Calculate how many assets we need based on duration
        assets_needed = max(1, int(duration / 3))  # ~3 seconds per asset
        
        selected_assets = []
        
        for term in terms[:2]:  # Limit search terms to prevent overload
            async with aiohttp.ClientSession() as session:
                # Search multiple stock providers
                results = await asyncio.gather(
                    self._search_unsplash(session, term, visual_type),
                    self._search_pexels(session, term, visual_type),
                    return_exceptions=True
                )
                
                # Filter and rank results
                for result_set in results:
                    if isinstance(result_set, list):
                        selected_assets.extend(result_set[:assets_needed])
            
            if len(selected_assets) >= assets_needed:
                break
        
        # Download and prepare assets
        prepared_assets = await self._prepare_assets(selected_assets[:assets_needed])
        return prepared_assets
    
    async def _prepare_assets(self, assets: List[Dict]) -> List[Dict]:
        """Download and standardize asset formats"""
        prepared = []
        
        for asset in assets:
            try:
                # Download the asset
                local_path = await self._download_asset(asset['url'])
                
                # Standardize format and resolution
                standardized_path = await self._standardize_asset(local_path)
                
                prepared.append({
                    'path': standardized_path,
                    'type': asset['type'],
                    'duration': asset.get('suggested_duration', 3.0),
                    'metadata': asset.get('metadata', {})
                })
                
            except Exception as e:
                print(f"Failed to prepare asset {asset['url']}: {e}")
                continue
        
        return prepared

This visual selection system connects to the computer vision techniques we use in projects like QuickVisionz, our warehouse sorting pipeline.

Video Assembly Engine

The final assembly stage combines all assets into polished video output:

import moviepy.editor as mp
from moviepy.video.fx import fadeout, fadein, resize
import json
from pathlib import Path
from typing import List, Dict

class VideoAssembler:
    def __init__(self, output_dir: Path):
        self.output_dir = output_dir
        self.temp_dir = output_dir / "temp"
        self.temp_dir.mkdir(exist_ok=True)
    
    async def assemble_video(self, job: VideoJob) -> Path:
        """Combine audio, visuals, and timing into final video"""
        script_data = self._load_script_data(job.artifacts['script'])
        voiceover_path = job.artifacts['voiceover']
        visual_plan = self._load_visual_plan(job.artifacts['visual_plan'])
        
        # Create video clips for each segment
        video_clips = []
        audio_offset = 0
        
        for segment, visuals in zip(script_data['segments'], visual_plan):
            segment_clip = await self._create_segment_clip(
                segment, 
                visuals, 
                audio_offset
            )
            
            video_clips.append(segment_clip)
            audio_offset += segment['duration']
        
        # Combine all segments
        final_video = mp.concatenate_videoclips(video_clips, method="compose")
        
        # Load and sync audio
        audio = mp.AudioFileClip(str(voiceover_path))
        final_video = final_video.set_audio(audio)
        
        # Apply final effects and optimization
        final_video = self._apply_final_effects(final_video, job.style_preferences)
        
        # Export with optimal settings
        output_path = self.output_dir / f"{job.id}_final.mp4"
        final_video.write_videofile(
            str(output_path),
            fps=30,
            codec='libx264',
            audio_codec='aac',
            temp_audiofile_path=str(self.temp_dir / "temp_audio.m4a"),
            remove_temp=True,
            verbose=False,
            logger=None
        )
        
        return output_path
    
    async def _create_segment_clip(self, segment: Dict, visuals: Dict, start_time: float) -> mp.VideoClip:
        """Create video clip for a single script segment"""
        segment_duration = segment['duration']
        assets = visuals['assets']
        transitions = visuals['transitions']
        
        if not assets:
            # Fallback to generated background if no assets
            return self._create_fallback_clip(segment_duration)
        
        # Create clips from assets
        asset_clips = []
        time_per_asset = segment_duration / len(assets)
        
        for i, asset in enumerate(assets):
            clip = mp.VideoFileClip(str(asset['path']))
            
            # Resize and fit to target resolution (1920x1080)
            clip = clip.resize(height=1080).crop(x_center=clip.w/2, width=1920)
            
            # Set duration for this asset
            asset_duration = min(time_per_asset, clip.duration)
            clip = clip.subclip(0, asset_duration)
            
            # Apply transitions
            if i > 0:
                clip = clip.fadein(0.5)
            if i < len(assets) - 1:
                clip = clip.fadeout(0.5)
            
            asset_clips.append(clip)
        
        # Combine assets for this segment
        segment_clip = mp.concatenate_videoclips(asset_clips, method="compose")
        
        # Ensure exact duration matching
        if segment_clip.duration != segment_duration:
            segment_clip = segment_clip.subclip(0, segment_duration)
        
        return segment_clip
    
    def _apply_final_effects(self, video: mp.VideoClip, style: Dict) -> mp.VideoClip:
        """Apply final styling and effects"""
        # Color correction
        if style.get('color_grade'):
            video = video.fx(mp.vfx.colorx, factor=style['color_grade'])
        
        # Add intro/outro if specified
        if style.get('intro_duration'):
            intro = self._create_intro_clip(style['intro_duration'])
            video = mp.concatenate_videoclips([intro, video])
        
        if style.get('outro_duration'):
            outro = self._create_outro_clip(style['outro_duration'])
            video = mp.concatenate_videoclips([video, outro])
        
        return video

Production Deployment and Orchestration

Running this AI video automation pipeline Python system in production requires robust orchestration and monitoring. We use a FastAPI service to handle job queuing and status tracking:

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
import asyncio
import uuid
from datetime import datetime
import redis

app = FastAPI(title="Vidm

More from the blog

Need help with AI?

We build production AI systems — from strategy and architecture to deployment and evaluation.

Get our AI implementation playbook

A practical guide to evaluating, planning, and deploying AI in your business. Free, no spam.