Streaming RAG : Implémenter des réponses en temps réel
Guide technique complet pour implémenter le streaming dans votre système RAG : architecture, code Python/TypeScript, gestion des erreurs et optimisation UX.
TL;DR
Le streaming RAG permet d'afficher les réponses token par token au lieu d'attendre la génération complète. Résultat : une latence perçue divisée par 5 et une meilleure expérience utilisateur. Ce guide couvre l'architecture technique, l'implémentation backend/frontend, et les patterns avancés pour un streaming robuste en production.
Pourquoi le streaming est essentiel
Le problème de latence en RAG
Un pipeline RAG typique prend 2-5 secondes avant d'afficher quoi que ce soit :
Requête utilisateur
↓ 200ms - Embedding de la requête
↓ 300ms - Recherche vectorielle
↓ 100ms - Reranking
↓ 2000ms - Génération LLM
↓ Réponse complète
Avec le streaming :
Requête utilisateur
↓ 200ms - Embedding de la requête
↓ 300ms - Recherche vectorielle
↓ 100ms - Reranking
↓ 100ms - Premier token
↓ ... tokens en continu ...
↓ Réponse complète
Impact sur l'expérience utilisateur
| Métrique | Sans streaming | Avec streaming |
|---|---|---|
| Time to First Byte (TTFB) | 2500ms | 600ms |
| Latence perçue | Élevée | Faible |
| Abandon utilisateur | 15% | 3% |
| Satisfaction (NPS) | +25 | +58 |
Architecture du streaming RAG
Vue d'ensemble
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │←──→│ Backend │←──→│ LLM │
│ (Browser) │SSE │ (FastAPI) │ │ (OpenAI) │
└─────────────┘ └─────────────┘ └─────────────┘
↑ ↑ ↑
│ │ │
Affichage Orchestration Génération
progressif RAG + Stream streaming
Choix du protocole
| Protocole | Avantages | Inconvénients | Utilisation |
|---|---|---|---|
| SSE | Simple, HTTP natif | Unidirectionnel | Recommandé |
| WebSocket | Bidirectionnel | Complexité | Chat interactif |
| HTTP/2 Push | Performant | Support limité | Cas avancés |
Implémentation Backend (Python/FastAPI)
Configuration de base
DEVELOPERpythonfrom fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import OpenAI import json import asyncio from typing import AsyncGenerator app = FastAPI() client = OpenAI() class RAGStreamingPipeline: def __init__(self, vector_db, embedding_model, llm_client): self.vector_db = vector_db self.embedding_model = embedding_model self.llm_client = llm_client async def stream_response( self, query: str, system_prompt: str ) -> AsyncGenerator[str, None]: """Pipeline RAG complet avec streaming.""" # 1. Retrieval (non-streamé) query_embedding = await self.embedding_model.embed(query) documents = await self.vector_db.search(query_embedding, top_k=5) # 2. Construire le contexte context = self._format_context(documents) # 3. Envoyer les métadonnées d'abord yield self._format_sse_event("metadata", { "sources": [doc.source for doc in documents], "retrieval_time_ms": 350 }) # 4. Streamer la génération messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] stream = self.llm_client.chat.completions.create( model="gpt-4", messages=messages, stream=True ) full_response = "" for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token yield self._format_sse_event("token", {"content": token}) # 5. Envoyer le signal de fin yield self._format_sse_event("done", { "total_tokens": len(full_response.split()), "generation_time_ms": 1500 }) def _format_context(self, documents: list) -> str: return "\n\n".join([ f"[{doc.source}]\n{doc.content}" for doc in documents ]) def _format_sse_event(self, event_type: str, data: dict) -> str: return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
Endpoint FastAPI
DEVELOPERpythonfrom fastapi import FastAPI, Query from fastapi.responses import StreamingResponse app = FastAPI() pipeline = RAGStreamingPipeline(vector_db, embedding_model, llm_client) @app.get("/api/chat/stream") async def stream_chat( query: str = Query(..., description="User query"), channel_id: str = Query(..., description="Channel ID") ): """Endpoint SSE pour le streaming de réponses RAG.""" # Charger la configuration du channel channel_config = await get_channel_config(channel_id) async def event_generator(): try: async for event in pipeline.stream_response( query=query, system_prompt=channel_config.system_prompt ): yield event except Exception as e: yield pipeline._format_sse_event("error", { "message": str(e), "code": "GENERATION_ERROR" }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Désactive le buffering nginx } )
Gestion des erreurs en streaming
DEVELOPERpythonclass StreamingErrorHandler: """Gestion robuste des erreurs pendant le streaming.""" @staticmethod async def safe_stream(generator, max_retries=3): """Wrapper pour gérer les erreurs de streaming.""" retries = 0 while retries < max_retries: try: async for event in generator: yield event return # Succès except OpenAIRateLimitError: retries += 1 yield format_sse_event("warning", { "message": "Rate limit hit, retrying...", "retry": retries }) await asyncio.sleep(2 ** retries) except OpenAIConnectionError: yield format_sse_event("error", { "message": "Connection lost. Please retry.", "recoverable": True }) return except Exception as e: yield format_sse_event("error", { "message": "An error occurred.", "code": type(e).__name__ }) return yield format_sse_event("error", { "message": "Max retries exceeded.", "recoverable": False })
Implémentation Frontend (TypeScript/React)
Hook React pour le streaming
DEVELOPERtypescriptimport { useState, useCallback, useRef } from 'react'; interface StreamingMessage { content: string; isComplete: boolean; sources: string[]; error?: string; } interface UseStreamingChatOptions { channelId: string; onToken?: (token: string) => void; onComplete?: (response: string) => void; onError?: (error: Error) => void; } export function useStreamingChat({ channelId, onToken, onComplete, onError }: UseStreamingChatOptions) { const [message, setMessage] = useState<StreamingMessage>({ content: '', isComplete: false, sources: [] }); const [isStreaming, setIsStreaming] = useState(false); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (query: string) => { // Annuler le stream précédent si existant if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setIsStreaming(true); setMessage({ content: '', isComplete: false, sources: [] }); try { const response = await fetch( `/api/chat/stream?query=${encodeURIComponent(query)}&channel_id=${channelId}`, { method: 'GET', headers: { 'Accept': 'text/event-stream' }, signal: abortControllerRef.current.signal } ); if (!response.ok) { throw new Error(`HTTP error: ${response.status}`); } const reader = response.body?.getReader(); const decoder = new TextDecoder(); if (!reader) { throw new Error('No response body'); } let buffer = ''; let fullContent = ''; let sources: string[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const events = buffer.split('\n\n'); buffer = events.pop() || ''; for (const eventStr of events) { if (!eventStr.trim()) continue; const event = parseSSEEvent(eventStr); switch (event.type) { case 'metadata': sources = event.data.sources; setMessage(prev => ({ ...prev, sources })); break; case 'token': fullContent += event.data.content; setMessage(prev => ({ ...prev, content: fullContent })); onToken?.(event.data.content); break; case 'done': setMessage(prev => ({ ...prev, isComplete: true })); onComplete?.(fullContent); break; case 'error': setMessage(prev => ({ ...prev, error: event.data.message })); onError?.(new Error(event.data.message)); break; } } } } catch (error) { if ((error as Error).name !== 'AbortError') { setMessage(prev => ({ ...prev, error: (error as Error).message })); onError?.(error as Error); } } finally { setIsStreaming(false); } }, [channelId, onToken, onComplete, onError]); const stopStreaming = useCallback(() => { abortControllerRef.current?.abort(); setIsStreaming(false); }, []); return { message, isStreaming, sendMessage, stopStreaming }; } function parseSSEEvent(eventStr: string): { type: string; data: any } { const lines = eventStr.split('\n'); let eventType = 'message'; let data = ''; for (const line of lines) { if (line.startsWith('event: ')) { eventType = line.slice(7); } else if (line.startsWith('data: ')) { data = line.slice(6); } } return { type: eventType, data: JSON.parse(data) }; }
Composant de chat avec streaming
DEVELOPERtypescriptimport { useStreamingChat } from './useStreamingChat'; interface StreamingChatProps { channelId: string; } export function StreamingChat({ channelId }: StreamingChatProps) { const [input, setInput] = useState(''); const { message, isStreaming, sendMessage, stopStreaming } = useStreamingChat({ channelId, onComplete: (response) => { console.log('Response complete:', response.length, 'characters'); } }); const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); if (input.trim() && !isStreaming) { sendMessage(input); setInput(''); } }; return ( <div className="streaming-chat"> <div className="messages"> {message.content && ( <div className="message assistant"> <StreamingText text={message.content} isStreaming={isStreaming} /> {message.isComplete && message.sources.length > 0 && ( <SourcesList sources={message.sources} /> )} {message.error && ( <ErrorMessage error={message.error} /> )} </div> )} </div> <form onSubmit={handleSubmit} className="input-form"> <input type="text" value={input} onChange={(e) => setInput(e.target.value)} placeholder="Posez votre question..." disabled={isStreaming} /> {isStreaming ? ( <button type="button" onClick={stopStreaming}> Arrêter </button> ) : ( <button type="submit" disabled={!input.trim()}> Envoyer </button> )} </form> </div> ); } function StreamingText({ text, isStreaming }: { text: string; isStreaming: boolean; }) { return ( <div className="streaming-text"> {text} {isStreaming && <span className="cursor">▊</span>} </div> ); }
Optimisations avancées
1. Prefetch du contexte
Commencer la récupération avant que l'utilisateur finisse de taper :
DEVELOPERtypescriptimport { useDeferredValue, useEffect, useRef } from 'react'; function usePrefetchContext(query: string, channelId: string) { const deferredQuery = useDeferredValue(query); const cacheRef = useRef<Map<string, any>>(new Map()); useEffect(() => { if (deferredQuery.length < 3) return; const cacheKey = `${channelId}:${deferredQuery}`; if (cacheRef.current.has(cacheKey)) return; // Prefetch en arrière-plan fetch(`/api/context/prefetch?q=${encodeURIComponent(deferredQuery)}&channel=${channelId}`) .then(res => res.json()) .then(context => { cacheRef.current.set(cacheKey, context); }) .catch(() => {}); // Silently fail }, [deferredQuery, channelId]); return cacheRef.current; }
2. Buffering intelligent
Accumuler les tokens pour un affichage plus fluide :
DEVELOPERtypescriptclass TokenBuffer { private buffer: string[] = []; private flushCallback: (text: string) => void; private flushInterval: number; private timer: NodeJS.Timeout | null = null; constructor( flushCallback: (text: string) => void, flushInterval: number = 50 // ms ) { this.flushCallback = flushCallback; this.flushInterval = flushInterval; } add(token: string) { this.buffer.push(token); // Flush immédiat pour les caractères de ponctuation if (/[.!?\n]$/.test(token)) { this.flush(); return; } // Sinon, flush avec délai if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.buffer.length > 0) { this.flushCallback(this.buffer.join('')); this.buffer = []; } } }
3. Reconnexion automatique
DEVELOPERtypescriptclass ResilientEventSource { private url: string; private onMessage: (event: any) => void; private onError: (error: Error) => void; private maxRetries: number; private retryCount: number = 0; private eventSource: EventSource | null = null; constructor(options: { url: string; onMessage: (event: any) => void; onError: (error: Error) => void; maxRetries?: number; }) { this.url = options.url; this.onMessage = options.onMessage; this.onError = options.onError; this.maxRetries = options.maxRetries || 3; this.connect(); } private connect() { this.eventSource = new EventSource(this.url); this.eventSource.onmessage = (event) => { this.retryCount = 0; // Reset on success this.onMessage(JSON.parse(event.data)); }; this.eventSource.onerror = (error) => { this.eventSource?.close(); if (this.retryCount < this.maxRetries) { this.retryCount++; const delay = Math.pow(2, this.retryCount) * 1000; setTimeout(() => this.connect(), delay); } else { this.onError(new Error('Max retries exceeded')); } }; } close() { this.eventSource?.close(); } }
Monitoring et métriques
Métriques clés à tracker
DEVELOPERpythonfrom prometheus_client import Histogram, Counter, Gauge # Latence du premier token TTFB_HISTOGRAM = Histogram( 'rag_streaming_ttfb_seconds', 'Time to first byte', buckets=[0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 5.0] ) # Durée totale de streaming STREAM_DURATION = Histogram( 'rag_streaming_duration_seconds', 'Total streaming duration', buckets=[1, 2, 5, 10, 20, 30, 60] ) # Erreurs de streaming STREAM_ERRORS = Counter( 'rag_streaming_errors_total', 'Total streaming errors', ['error_type'] ) # Connexions actives ACTIVE_STREAMS = Gauge( 'rag_streaming_active_connections', 'Number of active streaming connections' ) class MetricCollector: def __init__(self): self.start_time = None async def track_stream(self, stream_generator): self.start_time = time.time() ACTIVE_STREAMS.inc() first_token = True try: async for event in stream_generator: if first_token and event.type == 'token': TTFB_HISTOGRAM.observe(time.time() - self.start_time) first_token = False yield event except Exception as e: STREAM_ERRORS.labels(error_type=type(e).__name__).inc() raise finally: STREAM_DURATION.observe(time.time() - self.start_time) ACTIVE_STREAMS.dec()
Dashboard de monitoring
DEVELOPERpython# Endpoints pour le monitoring @app.get("/metrics/streaming") async def streaming_metrics(): return { "active_connections": ACTIVE_STREAMS._value.get(), "avg_ttfb_ms": TTFB_HISTOGRAM._sum.get() / max(TTFB_HISTOGRAM._count.get(), 1) * 1000, "error_rate": STREAM_ERRORS._value.get() / max(total_streams, 1), "p95_duration_s": calculate_p95(STREAM_DURATION) }
Intégration avec Ailog
Ailog gère nativement le streaming avec :
- SSE optimisé avec reconnexion automatique
- Buffering intelligent pour une UX fluide
- Métriques intégrées (TTFB, erreurs, durée)
- SDK React prêt à l'emploi
DEVELOPERtypescriptimport { useAilogChat } from '@ailog/react'; function ChatWidget() { const { messages, isStreaming, sendMessage } = useAilogChat({ channelId: 'your-channel', streaming: true, // Activé par défaut onStreamStart: () => console.log('Stream started'), onStreamEnd: () => console.log('Stream ended') }); // Le composant affiche automatiquement le streaming }
Conclusion
Le streaming transforme l'expérience utilisateur de votre RAG. Points clés :
- SSE est le protocole recommandé pour la simplicité
- Gestion des erreurs robuste avec reconnexion
- Buffering pour un affichage fluide
- Métriques pour monitorer la performance
- Prefetch pour réduire encore la latence
Ressources complémentaires
- Introduction au RAG - Fondamentaux
- Génération LLM pour RAG - Guide parent
- Réduire la latence RAG - Optimisations
- Prompt Engineering RAG - Optimiser les prompts
Envie d'un streaming clé en main ? Essayez Ailog - streaming optimisé, reconnexion automatique, SDK React inclus.
FAQ
Tags
Articles connexes
Agents RAG : Orchestrer des systemes multi-agents
Architecturez des systemes RAG multi-agents : orchestration, specialisation, collaboration et gestion des echecs pour des assistants complexes.
RAG Conversationnel : Memoire et contexte multi-sessions
Implementez un RAG avec memoire conversationnelle : gestion du contexte, historique multi-sessions et personnalisation des reponses.
Agentic RAG 2025 : Construire des Agents IA Autonomes (Guide Complet)
Guide complet Agentic RAG : architecture, design patterns, agents autonomes avec retrieval dynamique, orchestration multi-outils. Avec exemples LangGraph et CrewAI.