Video RAG: Indexieren und in Ihren Videos suchen
Vollständiger Leitfaden zur Integration von Video in Ihr RAG-System: Extraktion von Frames, Audio-Transkription, Szenenerkennung und multimodale Indexierung.
Video RAG : Indexer et rechercher dans vos videos
La video combine audio, visuel et texte. C'est le format le plus riche mais aussi le plus complexe a indexer pour un systeme RAG. Ce guide vous montre comment decomposer, analyser et rendre cherchable n'importe quel contenu video.
Pourquoi le Video RAG ?
Le defi des donnees video
- Volume explosif : 500 heures de video uploadees sur YouTube chaque minute
- Richesse d'information : Un tutoriel video contient plus que sa transcription
- Temporalite : L'information est repartie dans le temps
- Multimodalite : Audio + visuel + texte a l'ecran
Cas d'usage concrets
| Secteur | Type de video | Valeur extraite |
|---|---|---|
| E-learning | Cours video | Recherche par concept dans les cours |
| Support | Tutoriels produit | "Comment faire X ?" avec timestamp |
| Media | Archives video | Recherche dans les archives |
| Corporate | Meetings enregistres | Retrouver qui a dit quoi |
| Marketing | Contenu YouTube | Analyse de la concurrence |
ROI typique
- 80% reduction du temps de recherche dans les archives video
- +60% d'engagement sur les contenus educatifs (grace aux chapitres auto)
- Compliance : Preuve video cherchable pour les audits
Architecture Video RAG
┌─────────────────────────────────────────────────────────────────┐
│ VIDEO RAG PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ Video │ │
│ │ Input │ │
│ └────┬─────┘ │
│ │ │
│ ├─────────────────┬─────────────────┬────────────────┐ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────┐ │
│ │ Audio │ │ Frames │ │ OCR │ │Metadata│ │
│ │Extraction│ │ Sampling │ │ (text ecran) │ │ │ │
│ └────┬─────┘ └──────┬───────┘ └──────┬───────┘ └───┬────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ │ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ Whisper │ │ Vision Model │ │ Text Index │ │ │
│ │Transcribe│ │ (GPT-4V) │ │ │ │ │
│ └────┬─────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │
│ └────────┬────────┴────────┬────────┘ │ │
│ ▼ │ │ │
│ ┌────────────────┐ │ │ │
│ │ Scene Detection│ │ │ │
│ │ & Chaptering │ │ │ │
│ └───────┬────────┘ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Multimodal Fusion │ │
│ │ (text + visual + audio embeddings) │ │
│ └────────────────────────┬─────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Vector Store (Qdrant) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Extraction et decomposition
Extraction de l'audio
DEVELOPERpythonimport subprocess from pathlib import Path def extract_audio(video_path: str, output_path: str = None) -> str: """Extrait la piste audio d'une video.""" if output_path is None: output_path = str(Path(video_path).with_suffix('.wav')) cmd = [ 'ffmpeg', '-i', video_path, '-vn', # No video '-acodec', 'pcm_s16le', '-ar', '16000', # 16kHz pour Whisper '-ac', '1', # Mono '-y', # Overwrite output_path ] subprocess.run(cmd, capture_output=True, check=True) return output_path
Extraction de frames
DEVELOPERpythonimport cv2 from dataclasses import dataclass from typing import List import numpy as np @dataclass class VideoFrame: timestamp: float frame_number: int image: np.ndarray is_keyframe: bool class FrameExtractor: def __init__(self, video_path: str): self.video_path = video_path self.cap = cv2.VideoCapture(video_path) self.fps = self.cap.get(cv2.CAP_PROP_FPS) self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.duration = self.total_frames / self.fps def extract_at_interval(self, interval_seconds: float = 1.0) -> List[VideoFrame]: """Extrait une frame toutes les N secondes.""" frames = [] frame_interval = int(self.fps * interval_seconds) for frame_num in range(0, self.total_frames, frame_interval): self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = self.cap.read() if ret: frames.append(VideoFrame( timestamp=frame_num / self.fps, frame_number=frame_num, image=frame, is_keyframe=False )) return frames def extract_keyframes(self, threshold: float = 30.0) -> List[VideoFrame]: """ Extrait les keyframes (changements de scene significatifs). Utilise la difference d'histogramme entre frames consecutives. """ keyframes = [] prev_hist = None frame_num = 0 while True: ret, frame = self.cap.read() if not ret: break # Calculer l'histogramme gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) hist = cv2.normalize(hist, hist).flatten() if prev_hist is not None: # Comparer avec la frame precedente diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) is_keyframe = diff > threshold / 100 if is_keyframe: keyframes.append(VideoFrame( timestamp=frame_num / self.fps, frame_number=frame_num, image=frame, is_keyframe=True )) else: # Premiere frame = toujours keyframe keyframes.append(VideoFrame( timestamp=0, frame_number=0, image=frame, is_keyframe=True )) prev_hist = hist frame_num += 1 return keyframes def __del__(self): self.cap.release()
Detection de scenes avec PySceneDetect
DEVELOPERpythonfrom scenedetect import detect, ContentDetector, split_video_ffmpeg def detect_scenes(video_path: str, threshold: float = 27.0) -> List[dict]: """ Detecte les changements de scene dans une video. Retourne les timestamps de debut/fin de chaque scene. """ scene_list = detect(video_path, ContentDetector(threshold=threshold)) scenes = [] for i, scene in enumerate(scene_list): scenes.append({ "scene_number": i + 1, "start_time": scene[0].get_seconds(), "end_time": scene[1].get_seconds(), "start_frame": scene[0].get_frames(), "end_frame": scene[1].get_frames(), "duration": scene[1].get_seconds() - scene[0].get_seconds() }) return scenes
Analyse multimodale des frames
Description de frames avec GPT-4V
DEVELOPERpythonimport base64 from openai import OpenAI def analyze_frame( frame: np.ndarray, context: str = "", client: OpenAI = None ) -> dict: """Analyse une frame video avec GPT-4V.""" if client is None: client = OpenAI() # Encoder en base64 _, buffer = cv2.imencode('.jpg', frame) img_base64 = base64.b64encode(buffer).decode('utf-8') prompt = """Analyse cette frame de video pour un systeme RAG. Decris: 1. **Contenu principal** : Que montre cette frame ? 2. **Texte visible** : Tout texte a l'ecran (titres, sous-titres, UI) 3. **Elements visuels** : Graphiques, diagrammes, demonstrations 4. **Contexte** : Est-ce une intro, une demo, une conclusion ? Sois precis et factuel. L'objectif est de permettre la recherche.""" if context: prompt += f"\n\nContexte: {context}" response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}", "detail": "high" } } ] }], max_tokens=500 ) return { "description": response.choices[0].message.content, "tokens_used": response.usage.total_tokens }
Analyse par lot (batch processing)
DEVELOPERpythonasync def analyze_frames_batch( frames: List[VideoFrame], client: OpenAI, max_concurrent: int = 5 ) -> List[dict]: """Analyse plusieurs frames en parallele.""" import asyncio from openai import AsyncOpenAI async_client = AsyncOpenAI() semaphore = asyncio.Semaphore(max_concurrent) async def analyze_one(frame: VideoFrame) -> dict: async with semaphore: # Encoder _, buffer = cv2.imencode('.jpg', frame.image) img_base64 = base64.b64encode(buffer).decode('utf-8') response = await async_client.chat.completions.create( model="gpt-4o-mini", # Plus economique pour le batch messages=[{ "role": "user", "content": [ {"type": "text", "text": "Decris brievement cette frame de video."}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_base64}", "detail": "low" } } ] }], max_tokens=200 ) return { "timestamp": frame.timestamp, "description": response.choices[0].message.content } tasks = [analyze_one(f) for f in frames] results = await asyncio.gather(*tasks) return results
Pipeline complet d'indexation
Structure de donnees
DEVELOPERpythonfrom dataclasses import dataclass, field from typing import Optional, List @dataclass class VideoSegment: """Represente un segment video indexable.""" segment_id: str video_id: str video_title: str # Temporel start_time: float end_time: float duration: float # Contenu transcript: str frame_descriptions: List[str] on_screen_text: Optional[str] # Semantique topic: Optional[str] summary: Optional[str] keywords: List[str] = field(default_factory=list) # Metadata speaker: Optional[str] = None scene_type: Optional[str] = None # intro, demo, explanation, outro def to_embedding_text(self) -> str: """Texte combine pour l'embedding.""" parts = [] if self.topic: parts.append(f"Topic: {self.topic}") if self.summary: parts.append(f"Summary: {self.summary}") parts.append(f"Transcript: {self.transcript}") if self.frame_descriptions: parts.append(f"Visual: {' '.join(self.frame_descriptions[:3])}") if self.on_screen_text: parts.append(f"On-screen text: {self.on_screen_text}") return "\n".join(parts)
Pipeline d'indexation complete
DEVELOPERpythonfrom qdrant_client import QdrantClient from qdrant_client.models import VectorParams, Distance, PointStruct import hashlib class VideoRAGPipeline: def __init__(self): self.qdrant = QdrantClient(url="http://localhost:6333") self.openai = OpenAI() self.collection_name = "video_rag" def create_collection(self): """Cree la collection avec embeddings multimodaux.""" self.qdrant.recreate_collection( collection_name=self.collection_name, vectors_config={ "text": VectorParams(size=1536, distance=Distance.COSINE), "visual": VectorParams(size=768, distance=Distance.COSINE) # CLIP } ) def process_video(self, video_path: str, title: str) -> List[VideoSegment]: """Pipeline complet de traitement video.""" video_id = hashlib.md5(video_path.encode()).hexdigest() print("1. Extraction audio...") audio_path = extract_audio(video_path) print("2. Transcription...") transcriber = AudioTranscriber() transcription = transcriber.transcribe(audio_path) print("3. Detection de scenes...") scenes = detect_scenes(video_path) print("4. Extraction de keyframes...") extractor = FrameExtractor(video_path) keyframes = extractor.extract_keyframes() print("5. Analyse des frames...") frame_analyses = [] for kf in keyframes[:20]: # Limiter pour les couts analysis = analyze_frame(kf.image, client=self.openai) frame_analyses.append({ "timestamp": kf.timestamp, **analysis }) print("6. Creation des segments...") segments = self._create_segments( video_id=video_id, video_title=title, transcription=transcription, scenes=scenes, frame_analyses=frame_analyses ) print("7. Generation des topics...") segments = self._add_topics(segments) return segments def _create_segments( self, video_id: str, video_title: str, transcription: dict, scenes: List[dict], frame_analyses: List[dict] ) -> List[VideoSegment]: """Cree les segments a partir des donnees extraites.""" segments = [] for scene in scenes: # Trouver le transcript correspondant scene_transcript = [] for seg in transcription["segments"]: if seg["start"] >= scene["start_time"] and seg["end"] <= scene["end_time"]: scene_transcript.append(seg["text"]) # Trouver les descriptions de frames frame_descs = [] for fa in frame_analyses: if scene["start_time"] <= fa["timestamp"] <= scene["end_time"]: frame_descs.append(fa["description"]) segment = VideoSegment( segment_id=f"{video_id}_{scene['scene_number']}", video_id=video_id, video_title=video_title, start_time=scene["start_time"], end_time=scene["end_time"], duration=scene["duration"], transcript=" ".join(scene_transcript), frame_descriptions=frame_descs, on_screen_text=None, # Ajouter OCR si necessaire topic=None, summary=None ) segments.append(segment) return segments def _add_topics(self, segments: List[VideoSegment]) -> List[VideoSegment]: """Ajoute les topics et summaries via LLM.""" for segment in segments: if not segment.transcript: continue prompt = f"""Analyse ce segment video: Transcription: {segment.transcript[:1000]} Visuels: {' '.join(segment.frame_descriptions[:2]) if segment.frame_descriptions else 'N/A'} Genere: 1. Un titre de topic (5-10 mots) 2. Un resume (1-2 phrases) 3. 3-5 mots-cles Format JSON: {{"topic": "", "summary": "", "keywords": []}}""" response = self.openai.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) import json data = json.loads(response.choices[0].message.content) segment.topic = data.get("topic") segment.summary = data.get("summary") segment.keywords = data.get("keywords", []) return segments def index_segments(self, segments: List[VideoSegment]): """Indexe les segments dans Qdrant.""" from sentence_transformers import SentenceTransformer # Modele CLIP pour les embeddings visuels clip_model = SentenceTransformer('clip-ViT-B-32') points = [] for segment in segments: # Embedding textuel text = segment.to_embedding_text() text_response = self.openai.embeddings.create( model="text-embedding-3-small", input=text ) text_embedding = text_response.data[0].embedding # Embedding visuel (moyenne des descriptions) if segment.frame_descriptions: visual_text = " ".join(segment.frame_descriptions) visual_embedding = clip_model.encode(visual_text).tolist() else: visual_embedding = [0.0] * 768 point = PointStruct( id=hash(segment.segment_id) % (2**63), vector={ "text": text_embedding, "visual": visual_embedding }, payload={ "segment_id": segment.segment_id, "video_id": segment.video_id, "video_title": segment.video_title, "start_time": segment.start_time, "end_time": segment.end_time, "duration": segment.duration, "transcript": segment.transcript, "topic": segment.topic, "summary": segment.summary, "keywords": segment.keywords } ) points.append(point) self.qdrant.upsert( collection_name=self.collection_name, points=points ) print(f"Indexe {len(points)} segments")
Recherche et generation
Recherche hybride video
DEVELOPERpythondef search_video_rag( query: str, pipeline: VideoRAGPipeline, search_type: str = "hybrid", # text, visual, hybrid limit: int = 5 ) -> List[dict]: """Recherche dans les videos indexees.""" # Embedding de la requete text_response = pipeline.openai.embeddings.create( model="text-embedding-3-small", input=query ) text_embedding = text_response.data[0].embedding if search_type == "text": results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("text", text_embedding), limit=limit ) elif search_type == "visual": from sentence_transformers import SentenceTransformer clip = SentenceTransformer('clip-ViT-B-32') visual_embedding = clip.encode(query).tolist() results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("visual", visual_embedding), limit=limit ) else: # hybrid # RRF sur les deux recherches text_results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("text", text_embedding), limit=limit * 2 ) from sentence_transformers import SentenceTransformer clip = SentenceTransformer('clip-ViT-B-32') visual_embedding = clip.encode(query).tolist() visual_results = pipeline.qdrant.search( collection_name=pipeline.collection_name, query_vector=("visual", visual_embedding), limit=limit * 2 ) # Fusion RRF scores = {} for rank, r in enumerate(text_results): scores[r.payload["segment_id"]] = { "score": 0.6 / (rank + 60), "payload": r.payload } for rank, r in enumerate(visual_results): sid = r.payload["segment_id"] if sid in scores: scores[sid]["score"] += 0.4 / (rank + 60) else: scores[sid] = { "score": 0.4 / (rank + 60), "payload": r.payload } sorted_results = sorted(scores.items(), key=lambda x: x[1]["score"], reverse=True) results = [{"payload": v["payload"], "score": v["score"]} for _, v in sorted_results[:limit]] return [ { "video_title": r.payload["video_title"] if hasattr(r, 'payload') else r["payload"]["video_title"], "topic": r.payload["topic"] if hasattr(r, 'payload') else r["payload"]["topic"], "transcript": r.payload["transcript"][:200] + "..." if hasattr(r, 'payload') else r["payload"]["transcript"][:200] + "...", "timestamp": f"{r.payload['start_time']:.0f}s - {r.payload['end_time']:.0f}s" if hasattr(r, 'payload') else f"{r['payload']['start_time']:.0f}s - {r['payload']['end_time']:.0f}s", "score": r.score if hasattr(r, 'score') else r["score"] } for r in results ]
Generation de reponse avec timestamp
DEVELOPERpythondef generate_video_answer( query: str, retrieved_segments: List[dict], client: OpenAI ) -> str: """Genere une reponse avec references video.""" context = "\n\n".join([ f"**{s['video_title']}** [{s['timestamp']}]\n" f"Topic: {s['topic']}\n" f"Contenu: {s['transcript']}" for s in retrieved_segments ]) prompt = f"""Tu es un assistant qui repond aux questions en utilisant des videos comme source. Segments video disponibles: {context} Question: {query} Instructions: 1. Base ta reponse uniquement sur les segments fournis 2. Cite tes sources avec [Video: titre, timestamp] 3. Si la question porte sur un element visuel, mentionne-le 4. Propose d'aller au timestamp exact si pertinent""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=1000 ) return response.choices[0].message.content
Optimisations avancees
Chapitrage automatique YouTube
DEVELOPERpythondef generate_youtube_chapters( segments: List[VideoSegment] ) -> str: """Genere les chapitres format YouTube.""" chapters = [] for segment in segments: if segment.topic: # Format: MM:SS Titre minutes = int(segment.start_time // 60) seconds = int(segment.start_time % 60) chapters.append(f"{minutes:02d}:{seconds:02d} {segment.topic}") return "\n".join(chapters) # Exemple de sortie: # 00:00 Introduction # 02:15 Installation et configuration # 05:30 Premier exemple pratique # 10:45 Cas d'usage avances # 15:20 Conclusion et ressources
Extraction de moments cles
DEVELOPERpythondef extract_highlight_moments( segments: List[VideoSegment], client: OpenAI ) -> List[dict]: """Identifie les moments forts de la video.""" all_content = "\n\n".join([ f"[{s.start_time:.0f}s-{s.end_time:.0f}s] {s.topic}: {s.transcript[:300]}" for s in segments ]) prompt = f"""Analyse cette video et identifie les 5 moments les plus importants: {all_content} Pour chaque moment, donne: - timestamp (secondes) - type: tutorial_step, key_insight, demo, announcement, qa - description courte - pourquoi c'est important JSON: [{{"timestamp": X, "type": "", "description": "", "importance": ""}}]""" response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"} ) import json return json.loads(response.choices[0].message.content)
Couts et performance
Couts par video de 1 heure
| Operation | Cout estime | Notes |
|---|---|---|
| Extraction audio | $0 | ffmpeg local |
| Transcription Whisper | $0.36 | API OpenAI |
| Detection scenes | $0 | PySceneDetect local |
| Analyse 20 keyframes | $0.30-0.60 | GPT-4V |
| Topics/summaries | $0.10 | GPT-4o-mini |
| Embeddings | $0.02 | text-embedding-3-small |
| Total | ~$0.80-1.10 | Par heure de video |
Temps de traitement
| Etape | Duree (video 1h) |
|---|---|
| Extraction audio | 30s |
| Transcription | 5-10min (API) |
| Detection scenes | 2-3min |
| Analyse frames | 3-5min |
| Indexation | 1min |
| Total | ~15-20min |
Stockage
- 1 heure video = ~50-100 segments
- Embeddings: ~1MB
- Metadata: ~100KB
- Thumbnails (optionnel): ~5MB
Integration avec Ailog
Ailog supporte l'indexation video native :
- Upload video : MP4, MOV, WEBM, AVI
- Processing automatique : Transcription + scenes + keyframes
- Chapitrage intelligent : Topics generes automatiquement
- Recherche unifiee : "Dans quelle video on parle de X ?"
Essayez le Video RAG sur Ailog
FAQ
Guides connexes
Tags
Verwandte Artikel
Audio RAG: Podcasts, Anrufe und Transkriptionen
Vollständiger Leitfaden zum Integrieren von Audio in Ihr RAG-System: Transkription mit Whisper, Diarisierung, Indexierung von Podcasts und Anrufaufzeichnungen.
RAG für Bilder: Vision models und visuelle Suche
Umfassender Leitfaden zur Integration von Bildern in Ihr RAG-System: Vision models, multimodal embeddings, Indexierung und visuelle Suche mit GPT-4V, Claude Vision und CLIP.
RAG Multimodal: Bilder, PDFs und über den Text hinaus
Erweitern Sie Ihr RAG über den Text hinaus: Indexierung von Bildern, Extraktion von PDFs, Tabellen und Grafiken für einen wirklich umfassenden Assistenten.