Video RAG: Index and Search Your Videos
Complete guide to integrating video into your RAG system: frame extraction, audio transcription, scene detection and multimodal indexing.
Video RAG: Index and Search Your Videos
Video combines audio, visual, and text. It's the richest format but also the most complex to index for a RAG system. This guide shows you how to decompose, analyze, and make any video content searchable.
Why Video RAG?
The Video Data Challenge
- Explosive volume: 500 hours of video uploaded to YouTube every minute
- Information richness: A video tutorial contains more than its transcript
- Temporality: Information is distributed over time
- Multimodality: Audio + visual + on-screen text
Concrete Use Cases
| Sector | Video Type | Extracted Value |
|---|---|---|
| E-learning | Video courses | Search by concept in courses |
| Support | Product tutorials | "How to do X?" with timestamp |
| Media | Video archives | Search through archives |
| Corporate | Recorded meetings | Find who said what |
| Marketing | YouTube content | Competitor analysis |
Typical ROI
- 80% reduction in video archive search time
- +60% engagement on educational content (thanks to auto chapters)
- Compliance: Searchable video evidence for audits
Video RAG Architecture
┌─────────────────────────────────────────────────────────────────┐
│ VIDEO RAG PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ Video │ │
│ │ Input │ │
│ └────┬─────┘ │
│ │ │
│ ├─────────────────┬─────────────────┬────────────────┐ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────┐ │
│ │ Audio │ │ Frames │ │ OCR │ │Metadata│ │
│ │Extraction│ │ Sampling │ │(screen text) │ │ │ │
│ └────┬─────┘ └──────┬───────┘ └──────┬───────┘ └───┬────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ │ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ Whisper │ │ Vision Model │ │ Text Index │ │ │
│ │Transcribe│ │ (GPT-4V) │ │ │ │ │
│ └────┬─────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │
│ └────────┬────────┴────────┬────────┘ │ │
│ ▼ │ │ │
│ ┌────────────────┐ │ │ │
│ │ Scene Detection│ │ │ │
│ │ & Chaptering │ │ │ │
│ └───────┬────────┘ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Multimodal Fusion │ │
│ │ (text + visual + audio embeddings) │ │
│ └────────────────────────┬─────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Vector Store (Qdrant) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Extraction and Decomposition
Audio Extraction
DEVELOPERpythonimport subprocess from pathlib import Path def extract_audio(video_path: str, output_path: str = None) -> str: """Extract audio track from video.""" if output_path is None: output_path = str(Path(video_path).with_suffix('.wav')) cmd = [ 'ffmpeg', '-i', video_path, '-vn', # No video '-acodec', 'pcm_s16le', '-ar', '16000', # 16kHz for Whisper '-ac', '1', # Mono '-y', # Overwrite output_path ] subprocess.run(cmd, capture_output=True, check=True) return output_path
Frame Extraction
DEVELOPERpythonimport cv2 from dataclasses import dataclass from typing import List import numpy as np @dataclass class VideoFrame: timestamp: float frame_number: int image: np.ndarray is_keyframe: bool class FrameExtractor: def __init__(self, video_path: str): self.video_path = video_path self.cap = cv2.VideoCapture(video_path) self.fps = self.cap.get(cv2.CAP_PROP_FPS) self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.duration = self.total_frames / self.fps def extract_at_interval(self, interval_seconds: float = 1.0) -> List[VideoFrame]: """Extract one frame every N seconds.""" frames = [] frame_interval = int(self.fps * interval_seconds) for frame_num in range(0, self.total_frames, frame_interval): self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = self.cap.read() if ret: frames.append(VideoFrame( timestamp=frame_num / self.fps, frame_number=frame_num, image=frame, is_keyframe=False )) return frames def extract_keyframes(self, threshold: float = 30.0) -> List[VideoFrame]: """ Extract keyframes (significant scene changes). Uses histogram difference between consecutive frames. """ keyframes = [] prev_hist = None frame_num = 0 while True: ret, frame = self.cap.read() if not ret: break # Calculate histogram gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) hist = cv2.normalize(hist, hist).flatten() if prev_hist is not None: # Compare with previous frame diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) is_keyframe = diff > threshold / 100 if is_keyframe: keyframes.append(VideoFrame( timestamp=frame_num / self.fps, frame_number=frame_num, image=frame, is_keyframe=True )) else: # First frame = always keyframe keyframes.append(VideoFrame( timestamp=0, frame_number=0, image=frame, is_keyframe=True )) prev_hist = hist frame_num += 1 return keyframes def __del__(self): self.cap.release()
Scene Detection with PySceneDetect
DEVELOPERpythonfrom scenedetect import detect, ContentDetector, split_video_ffmpeg def detect_scenes(video_path: str, threshold: float = 27.0) -> List[dict]: """ Detect scene changes in a video. Returns start/end timestamps for each scene. """ scene_list = detect(video_path, ContentDetector(threshold=threshold)) scenes = [] for i, scene in enumerate(scene_list): scenes.append({ "scene_number": i + 1, "start_time": scene[0].get_seconds(), "end_time": scene[1].get_seconds(), "start_frame": scene[0].get_frames(), "end_frame": scene[1].get_frames(), "duration": scene[1].get_seconds() - scene[0].get_seconds() }) return scenes
Multimodal Frame Analysis
Frame Description with GPT-4V
DEVELOPERpythonimport base64 from openai import OpenAI def analyze_frame( frame: np.ndarray, context: str = "", client: OpenAI = None ) -> dict: """Analyze a video frame with GPT-4V.""" if client is None: client = OpenAI() # Encode to base64 _, buffer = cv2.imencode('.jpg', frame) img_base64 = base64.b64encode(buffer).decode('utf-8') prompt = """Analyze this video frame for a RAG system. Describe: 1. **Main content**: What does this frame show? 2. **Visible text**: Any on-screen text (titles, subtitles, UI) 3. **Visual elements**: Charts, diagrams, demonstrations 4. **Context**: Is this an intro, demo, conclusion? Be precise and factual. The goal is to enable search.""" if context: prompt += f"\n\nContext: {context}" response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}", "detail": "high" } } ] }], max_tokens=500 ) return { "description": response.choices[0].message.content, "tokens_used": response.usage.total_tokens }
Batch Processing
DEVELOPERpythonasync def analyze_frames_batch( frames: List[VideoFrame], client: OpenAI, max_concurrent: int = 5 ) -> List[dict]: """Analyze multiple frames in parallel.""" import asyncio from openai import AsyncOpenAI async_client = AsyncOpenAI() semaphore = asyncio.Semaphore(max_concurrent) async def analyze_one(frame: VideoFrame) -> dict: async with semaphore: # Encode _, buffer = cv2.imencode('.jpg', frame.image) img_base64 = base64.b64encode(buffer).decode('utf-8') response = await async_client.chat.completions.create( model="gpt-4o-mini", # More economical for batch messages=[{ "role": "user", "content": [ {"type": "text", "text": "Briefly describe this video frame."}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}", "detail": "low" } } ] }], max_tokens=200 ) return { "timestamp": frame.timestamp, "description": response.choices[0].message.content } tasks = [analyze_one(f) for f in frames] results = await asyncio.gather(*tasks) return results
Complete Indexing Pipeline
Data Structure
DEVELOPERpythonfrom dataclasses import dataclass, field from typing import Optional, List @dataclass class VideoSegment: """Represents an indexable video segment.""" segment_id: str video_id: str video_title: str # Temporal start_time: float end_time: float duration: float # Content transcript: str frame_descriptions: List[str] on_screen_text: Optional[str] # Semantic topic: Optional[str] summary: Optional[str] keywords: List[str] = field(default_factory=list) # Metadata speaker: Optional[str] = None scene_type: Optional[str] = None # intro, demo, explanation, outro def to_embedding_text(self) -> str: """Combined text for embedding.""" parts = [] if self.topic: parts.append(f"Topic: {self.topic}") if self.summary: parts.append(f"Summary: {self.summary}") parts.append(f"Transcript: {self.transcript}") if self.frame_descriptions: parts.append(f"Visual: {' '.join(self.frame_descriptions[:3])}") if self.on_screen_text: parts.append(f"On-screen text: {self.on_screen_text}") return "\n".join(parts)
Complete Indexing Pipeline
DEVELOPERpythonfrom qdrant_client import QdrantClient from qdrant_client.models import VectorParams, Distance, PointStruct import hashlib class VideoRAGPipeline: def __init__(self): self.qdrant = QdrantClient(url="http://localhost:6333") self.openai = OpenAI() self.collection_name = "video_rag" def create_collection(self): """Create collection with multimodal embeddings.""" self.qdrant.recreate_collection( collection_name=self.collection_name, vectors_config={ "text": VectorParams(size=1536, distance=Distance.COSINE), "visual": VectorParams(size=768, distance=Distance.COSINE) # CLIP } ) def process_video(self, video_path: str, title: str) -> List[VideoSegment]: """Complete video processing pipeline.""" video_id = hashlib.md5(video_path.encode()).hexdigest() print("1. Extracting audio...") audio_path = extract_audio(video_path) print("2. Transcribing...") transcriber = AudioTranscriber() transcription = transcriber.transcribe(audio_path) print("3. Detecting scenes...") scenes = detect_scenes(video_path) print("4. Extracting keyframes...") extractor = FrameExtractor(video_path) keyframes = extractor.extract_keyframes() print("5. Analyzing frames...") frame_analyses = [] for kf in keyframes[:20]: # Limit for costs analysis = analyze_frame(kf.image, client=self.openai) frame_analyses.append({ "timestamp": kf.timestamp, **analysis }) print("6. Creating segments...") segments = self._create_segments( video_id=video_id, video_title=title, transcription=transcription, scenes=scenes, frame_analyses=frame_analyses ) print("7. Generating topics...") segments = self._add_topics(segments) return segments def _create_segments( self, video_id: str, video_title: str, transcription: dict, scenes: List[dict], frame_analyses: List[dict] ) -> List[VideoSegment]: """Create segments from extracted data.""" segments = [] for scene in scenes: # Find corresponding transcript scene_transcript = [] for seg in transcription["segments"]: if seg["start"] >= scene["start_time"] and seg["end"] <= scene["end_time"]: scene_transcript.append(seg["text"]) # Find frame descriptions frame_descs = [] for fa in frame_analyses: if scene["start_time"] <= fa["timestamp"] <= scene["end_time"]: frame_descs.append(fa["description"]) segment = VideoSegment( segment_id=f"{video_id}_{scene['scene_number']}", video_id=video_id, video_title=video_title, start_time=scene["start_time"], end_time=scene["end_time"], duration=scene["duration"], transcript=" ".join(scene_transcript), frame_descriptions=frame_descs, on_screen_text=None, # Add OCR if needed topic=None, summary=None ) segments.append(segment) return segments def _add_topics(self, segments: List[VideoSegment]) -> List[VideoSegment]: """Add topics and summaries via LLM.""" for segment in segments: if not segment.transcript: continue prompt = f"""Analyze this video segment: Transcription: {segment.transcript[:1000]} Visuals: {' '.join(segment.frame_descriptions[:2]) if segment.frame_descriptions else 'N/A'} Generate: 1. A topic title (5-10 words) 2. A summary (1-2 sentences) 3. 3-5 keywords JSON format: {{"topic": "", "summary": "", "keywords": []}}""" response = self.openai.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) import json data = json.loads(response.choices[0].message.content) segment.topic = data.get("topic") segment.summary = data.get("summary") segment.keywords = data.get("keywords", []) return segments def index_segments(self, segments: List[VideoSegment]): """Index segments in Qdrant.""" from sentence_transformers import SentenceTransformer # CLIP model for visual embeddings clip_model = SentenceTransformer('clip-ViT-B-32') points = [] for segment in segments: # Text embedding text = segment.to_embedding_text() text_response = self.openai.embeddings.create( model="text-embedding-3-small", input=text ) text_embedding = text_response.data[0].embedding # Visual embedding (average of descriptions) if segment.frame_descriptions: visual_text = " ".join(segment.frame_descriptions) visual_embedding = clip_model.encode(visual_text).tolist() else: visual_embedding = [0.0] * 768 point = PointStruct( id=hash(segment.segment_id) % (2**63), vector={ "text": text_embedding, "visual": visual_embedding }, payload={ "segment_id": segment.segment_id, "video_id": segment.video_id, "video_title": segment.video_title, "start_time": segment.start_time, "end_time": segment.end_time, "duration": segment.duration, "transcript": segment.transcript, "topic": segment.topic, "summary": segment.summary, "keywords": segment.keywords } ) points.append(point) self.qdrant.upsert( collection_name=self.collection_name, points=points ) print(f"Indexed {len(points)} segments")
Search and Generation
Hybrid Video Search
DEVELOPERpythondef search_video_rag( query: str, pipeline: VideoRAGPipeline, search_type: str = "hybrid", # text, visual, hybrid limit: int = 5 ) -> List[dict]: """Search indexed videos.""" # Query embedding text_response = pipeline.openai.embeddings.create( model="text-embedding-3-small", input=query ) text_embedding = text_response.data[0].embedding if search_type == "text": results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("text", text_embedding), limit=limit ) elif search_type == "visual": from sentence_transformers import SentenceTransformer clip = SentenceTransformer('clip-ViT-B-32') visual_embedding = clip.encode(query).tolist() results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("visual", visual_embedding), limit=limit ) else: # hybrid # RRF on both searches text_results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("text", text_embedding), limit=limit * 2 ) from sentence_transformers import SentenceTransformer clip = SentenceTransformer('clip-ViT-B-32') visual_embedding = clip.encode(query).tolist() visual_results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("visual", visual_embedding), limit=limit * 2 ) # RRF fusion scores = {} for rank, r in enumerate(text_results): scores[r.payload["segment_id"]] = { "score": 0.6 / (rank + 60), "payload": r.payload } for rank, r in enumerate(visual_results): sid = r.payload["segment_id"] if sid in scores: scores[sid]["score"] += 0.4 / (rank + 60) else: scores[sid] = { "score": 0.4 / (rank + 60), "payload": r.payload } sorted_results = sorted(scores.items(), key=lambda x: x[1]["score"], reverse=True) results = [{"payload": v["payload"], "score": v["score"]} for _, v in sorted_results[:limit]] return [ { "video_title": r.payload["video_title"] if hasattr(r, 'payload') else r["payload"]["video_title"], "topic": r.payload["topic"] if hasattr(r, 'payload') else r["payload"]["topic"], "transcript": r.payload["transcript"][:200] + "..." if hasattr(r, 'payload') else r["payload"]["transcript"][:200] + "...", "timestamp": f"{r.payload['start_time']:.0f}s - {r.payload['end_time']:.0f}s" if hasattr(r, 'payload') else f"{r['payload']['start_time']:.0f}s - {r['payload']['end_time']:.0f}s", "score": r.score if hasattr(r, 'score') else r["score"] } for r in results ]
Response Generation with Timestamp
DEVELOPERpythondef generate_video_answer( query: str, retrieved_segments: List[dict], client: OpenAI ) -> str: """Generate response with video references.""" context = "\n\n".join([ f"**{s['video_title']}** [{s['timestamp']}]\n" f"Topic: {s['topic']}\n" f"Content: {s['transcript']}" for s in retrieved_segments ]) prompt = f"""You are an assistant that answers questions using videos as source. Available video segments: {context} Question: {query} Instructions: 1. Base your answer only on the provided segments 2. Cite your sources with [Video: title, timestamp] 3. If the question is about a visual element, mention it 4. Suggest going to the exact timestamp if relevant""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=1000 ) return response.choices[0].message.content
Advanced Optimizations
Automatic YouTube Chaptering
DEVELOPERpythondef generate_youtube_chapters( segments: List[VideoSegment] ) -> str: """Generate YouTube format chapters.""" chapters = [] for segment in segments: if segment.topic: # Format: MM:SS Title minutes = int(segment.start_time // 60) seconds = int(segment.start_time % 60) chapters.append(f"{minutes:02d}:{seconds:02d} {segment.topic}") return "\n".join(chapters) # Example output: # 00:00 Introduction # 02:15 Installation and setup # 05:30 First practical example # 10:45 Advanced use cases # 15:20 Conclusion and resources
Key Moment Extraction
DEVELOPERpythondef extract_highlight_moments( segments: List[VideoSegment], client: OpenAI ) -> List[dict]: """Identify video highlights.""" all_content = "\n\n".join([ f"[{s.start_time:.0f}s-{s.end_time:.0f}s] {s.topic}: {s.transcript[:300]}" for s in segments ]) prompt = f"""Analyze this video and identify the 5 most important moments: {all_content} For each moment, provide: - timestamp (seconds) - type: tutorial_step, key_insight, demo, announcement, qa - short description - why it's important JSON: [{{"timestamp": X, "type": "", "description": "", "importance": ""}}]""" response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) import json return json.loads(response.choices[0].message.content)
Costs and Performance
Costs per 1-hour Video
| Operation | Estimated Cost | Notes |
|---|---|---|
| Audio extraction | $0 | Local ffmpeg |
| Whisper transcription | $0.36 | OpenAI API |
| Scene detection | $0 | Local PySceneDetect |
| 20 keyframe analysis | $0.30-0.60 | GPT-4V |
| Topics/summaries | $0.10 | GPT-4o-mini |
| Embeddings | $0.02 | text-embedding-3-small |
| Total | ~$0.80-1.10 | Per hour of video |
Processing Time
| Step | Duration (1h video) |
|---|---|
| Audio extraction | 30s |
| Transcription | 5-10min (API) |
| Scene detection | 2-3min |
| Frame analysis | 3-5min |
| Indexing | 1min |
| Total | ~15-20min |
Storage
- 1 hour video = ~50-100 segments
- Embeddings: ~1MB
- Metadata: ~100KB
- Thumbnails (optional): ~5MB
Integration with Ailog
Ailog supports native video indexing:
- Video upload: MP4, MOV, WEBM, AVI
- Automatic processing: Transcription + scenes + keyframes
- Smart chaptering: Auto-generated topics
- Unified search: "In which video do we talk about X?"
Related Guides
Tags
Related Posts
Audio RAG: Podcasts, Calls and Transcriptions
Complete guide to integrating audio into your RAG system: transcription with Whisper, speaker diarization, indexing podcasts and call recordings.
Image RAG: Vision Models and Visual Search
Complete guide to integrating images into your RAG system: vision models, multimodal embeddings, indexing and visual search with GPT-4V, Claude Vision and CLIP.
Multimodal RAG: Images, PDFs, and Beyond Text
Extend your RAG beyond text: image indexing, PDF extraction, tables, and charts for a truly complete assistant.