GuideAvancé

Streaming RAG : Implémenter des réponses en temps réel

14 mars 2026
20 min de lecture
Équipe Ailog

Guide technique complet pour implémenter le streaming dans votre système RAG : architecture, code Python/TypeScript, gestion des erreurs et optimisation UX.

TL;DR

Le streaming RAG permet d'afficher les réponses token par token au lieu d'attendre la génération complète. Résultat : une latence perçue divisée par 5 et une meilleure expérience utilisateur. Ce guide couvre l'architecture technique, l'implémentation backend/frontend, et les patterns avancés pour un streaming robuste en production.

Pourquoi le streaming est essentiel

Le problème de latence en RAG

Un pipeline RAG typique prend 2-5 secondes avant d'afficher quoi que ce soit :

Requête utilisateur
    ↓ 200ms - Embedding de la requête
    ↓ 300ms - Recherche vectorielle
    ↓ 100ms - Reranking
    ↓ 2000ms - Génération LLM
    ↓ Réponse complète

Avec le streaming :

Requête utilisateur
    ↓ 200ms - Embedding de la requête
    ↓ 300ms - Recherche vectorielle
    ↓ 100ms - Reranking
    ↓ 100ms - Premier token
    ↓ ... tokens en continu ...
    ↓ Réponse complète

Impact sur l'expérience utilisateur

MétriqueSans streamingAvec streaming
Time to First Byte (TTFB)2500ms600ms
Latence perçueÉlevéeFaible
Abandon utilisateur15%3%
Satisfaction (NPS)+25+58

Architecture du streaming RAG

Vue d'ensemble

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Client    │←──→│   Backend   │←──→│     LLM     │
│  (Browser)  │SSE │  (FastAPI)  │    │  (OpenAI)   │
└─────────────┘    └─────────────┘    └─────────────┘
       ↑                  ↑                  ↑
       │                  │                  │
   Affichage         Orchestration      Génération
   progressif        RAG + Stream        streaming

Choix du protocole

ProtocoleAvantagesInconvénientsUtilisation
SSESimple, HTTP natifUnidirectionnelRecommandé
WebSocketBidirectionnelComplexitéChat interactif
HTTP/2 PushPerformantSupport limitéCas avancés

Implémentation Backend (Python/FastAPI)

Configuration de base

DEVELOPERpython
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import OpenAI import json import asyncio from typing import AsyncGenerator app = FastAPI() client = OpenAI() class RAGStreamingPipeline: def __init__(self, vector_db, embedding_model, llm_client): self.vector_db = vector_db self.embedding_model = embedding_model self.llm_client = llm_client async def stream_response( self, query: str, system_prompt: str ) -> AsyncGenerator[str, None]: """Pipeline RAG complet avec streaming.""" # 1. Retrieval (non-streamé) query_embedding = await self.embedding_model.embed(query) documents = await self.vector_db.search(query_embedding, top_k=5) # 2. Construire le contexte context = self._format_context(documents) # 3. Envoyer les métadonnées d'abord yield self._format_sse_event("metadata", { "sources": [doc.source for doc in documents], "retrieval_time_ms": 350 }) # 4. Streamer la génération messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] stream = self.llm_client.chat.completions.create( model="gpt-4", messages=messages, stream=True ) full_response = "" for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token yield self._format_sse_event("token", {"content": token}) # 5. Envoyer le signal de fin yield self._format_sse_event("done", { "total_tokens": len(full_response.split()), "generation_time_ms": 1500 }) def _format_context(self, documents: list) -> str: return "\n\n".join([ f"[{doc.source}]\n{doc.content}" for doc in documents ]) def _format_sse_event(self, event_type: str, data: dict) -> str: return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"

Endpoint FastAPI

DEVELOPERpython
from fastapi import FastAPI, Query from fastapi.responses import StreamingResponse app = FastAPI() pipeline = RAGStreamingPipeline(vector_db, embedding_model, llm_client) @app.get("/api/chat/stream") async def stream_chat( query: str = Query(..., description="User query"), channel_id: str = Query(..., description="Channel ID") ): """Endpoint SSE pour le streaming de réponses RAG.""" # Charger la configuration du channel channel_config = await get_channel_config(channel_id) async def event_generator(): try: async for event in pipeline.stream_response( query=query, system_prompt=channel_config.system_prompt ): yield event except Exception as e: yield pipeline._format_sse_event("error", { "message": str(e), "code": "GENERATION_ERROR" }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Désactive le buffering nginx } )

Gestion des erreurs en streaming

DEVELOPERpython
class StreamingErrorHandler: """Gestion robuste des erreurs pendant le streaming.""" @staticmethod async def safe_stream(generator, max_retries=3): """Wrapper pour gérer les erreurs de streaming.""" retries = 0 while retries < max_retries: try: async for event in generator: yield event return # Succès except OpenAIRateLimitError: retries += 1 yield format_sse_event("warning", { "message": "Rate limit hit, retrying...", "retry": retries }) await asyncio.sleep(2 ** retries) except OpenAIConnectionError: yield format_sse_event("error", { "message": "Connection lost. Please retry.", "recoverable": True }) return except Exception as e: yield format_sse_event("error", { "message": "An error occurred.", "code": type(e).__name__ }) return yield format_sse_event("error", { "message": "Max retries exceeded.", "recoverable": False })

Implémentation Frontend (TypeScript/React)

Hook React pour le streaming

DEVELOPERtypescript
import { useState, useCallback, useRef } from 'react'; interface StreamingMessage { content: string; isComplete: boolean; sources: string[]; error?: string; } interface UseStreamingChatOptions { channelId: string; onToken?: (token: string) => void; onComplete?: (response: string) => void; onError?: (error: Error) => void; } export function useStreamingChat({ channelId, onToken, onComplete, onError }: UseStreamingChatOptions) { const [message, setMessage] = useState<StreamingMessage>({ content: '', isComplete: false, sources: [] }); const [isStreaming, setIsStreaming] = useState(false); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (query: string) => { // Annuler le stream précédent si existant if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setIsStreaming(true); setMessage({ content: '', isComplete: false, sources: [] }); try { const response = await fetch( `/api/chat/stream?query=${encodeURIComponent(query)}&channel_id=${channelId}`, { method: 'GET', headers: { 'Accept': 'text/event-stream' }, signal: abortControllerRef.current.signal } ); if (!response.ok) { throw new Error(`HTTP error: ${response.status}`); } const reader = response.body?.getReader(); const decoder = new TextDecoder(); if (!reader) { throw new Error('No response body'); } let buffer = ''; let fullContent = ''; let sources: string[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const events = buffer.split('\n\n'); buffer = events.pop() || ''; for (const eventStr of events) { if (!eventStr.trim()) continue; const event = parseSSEEvent(eventStr); switch (event.type) { case 'metadata': sources = event.data.sources; setMessage(prev => ({ ...prev, sources })); break; case 'token': fullContent += event.data.content; setMessage(prev => ({ ...prev, content: fullContent })); onToken?.(event.data.content); break; case 'done': setMessage(prev => ({ ...prev, isComplete: true })); onComplete?.(fullContent); break; case 'error': setMessage(prev => ({ ...prev, error: event.data.message })); onError?.(new Error(event.data.message)); break; } } } } catch (error) { if ((error as Error).name !== 'AbortError') { setMessage(prev => ({ ...prev, error: (error as Error).message })); onError?.(error as Error); } } finally { setIsStreaming(false); } }, [channelId, onToken, onComplete, onError]); const stopStreaming = useCallback(() => { abortControllerRef.current?.abort(); setIsStreaming(false); }, []); return { message, isStreaming, sendMessage, stopStreaming }; } function parseSSEEvent(eventStr: string): { type: string; data: any } { const lines = eventStr.split('\n'); let eventType = 'message'; let data = ''; for (const line of lines) { if (line.startsWith('event: ')) { eventType = line.slice(7); } else if (line.startsWith('data: ')) { data = line.slice(6); } } return { type: eventType, data: JSON.parse(data) }; }

Composant de chat avec streaming

DEVELOPERtypescript
import { useStreamingChat } from './useStreamingChat'; interface StreamingChatProps { channelId: string; } export function StreamingChat({ channelId }: StreamingChatProps) { const [input, setInput] = useState(''); const { message, isStreaming, sendMessage, stopStreaming } = useStreamingChat({ channelId, onComplete: (response) => { console.log('Response complete:', response.length, 'characters'); } }); const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); if (input.trim() && !isStreaming) { sendMessage(input); setInput(''); } }; return ( <div className="streaming-chat"> <div className="messages"> {message.content && ( <div className="message assistant"> <StreamingText text={message.content} isStreaming={isStreaming} /> {message.isComplete && message.sources.length > 0 && ( <SourcesList sources={message.sources} /> )} {message.error && ( <ErrorMessage error={message.error} /> )} </div> )} </div> <form onSubmit={handleSubmit} className="input-form"> <input type="text" value={input} onChange={(e) => setInput(e.target.value)} placeholder="Posez votre question..." disabled={isStreaming} /> {isStreaming ? ( <button type="button" onClick={stopStreaming}> Arrêter </button> ) : ( <button type="submit" disabled={!input.trim()}> Envoyer </button> )} </form> </div> ); } function StreamingText({ text, isStreaming }: { text: string; isStreaming: boolean; }) { return ( <div className="streaming-text"> {text} {isStreaming && <span className="cursor"></span>} </div> ); }

Optimisations avancées

1. Prefetch du contexte

Commencer la récupération avant que l'utilisateur finisse de taper :

DEVELOPERtypescript
import { useDeferredValue, useEffect, useRef } from 'react'; function usePrefetchContext(query: string, channelId: string) { const deferredQuery = useDeferredValue(query); const cacheRef = useRef<Map<string, any>>(new Map()); useEffect(() => { if (deferredQuery.length < 3) return; const cacheKey = `${channelId}:${deferredQuery}`; if (cacheRef.current.has(cacheKey)) return; // Prefetch en arrière-plan fetch(`/api/context/prefetch?q=${encodeURIComponent(deferredQuery)}&channel=${channelId}`) .then(res => res.json()) .then(context => { cacheRef.current.set(cacheKey, context); }) .catch(() => {}); // Silently fail }, [deferredQuery, channelId]); return cacheRef.current; }

2. Buffering intelligent

Accumuler les tokens pour un affichage plus fluide :

DEVELOPERtypescript
class TokenBuffer { private buffer: string[] = []; private flushCallback: (text: string) => void; private flushInterval: number; private timer: NodeJS.Timeout | null = null; constructor( flushCallback: (text: string) => void, flushInterval: number = 50 // ms ) { this.flushCallback = flushCallback; this.flushInterval = flushInterval; } add(token: string) { this.buffer.push(token); // Flush immédiat pour les caractères de ponctuation if (/[.!?\n]$/.test(token)) { this.flush(); return; } // Sinon, flush avec délai if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.buffer.length > 0) { this.flushCallback(this.buffer.join('')); this.buffer = []; } } }

3. Reconnexion automatique

DEVELOPERtypescript
class ResilientEventSource { private url: string; private onMessage: (event: any) => void; private onError: (error: Error) => void; private maxRetries: number; private retryCount: number = 0; private eventSource: EventSource | null = null; constructor(options: { url: string; onMessage: (event: any) => void; onError: (error: Error) => void; maxRetries?: number; }) { this.url = options.url; this.onMessage = options.onMessage; this.onError = options.onError; this.maxRetries = options.maxRetries || 3; this.connect(); } private connect() { this.eventSource = new EventSource(this.url); this.eventSource.onmessage = (event) => { this.retryCount = 0; // Reset on success this.onMessage(JSON.parse(event.data)); }; this.eventSource.onerror = (error) => { this.eventSource?.close(); if (this.retryCount < this.maxRetries) { this.retryCount++; const delay = Math.pow(2, this.retryCount) * 1000; setTimeout(() => this.connect(), delay); } else { this.onError(new Error('Max retries exceeded')); } }; } close() { this.eventSource?.close(); } }

Monitoring et métriques

Métriques clés à tracker

DEVELOPERpython
from prometheus_client import Histogram, Counter, Gauge # Latence du premier token TTFB_HISTOGRAM = Histogram( 'rag_streaming_ttfb_seconds', 'Time to first byte', buckets=[0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 5.0] ) # Durée totale de streaming STREAM_DURATION = Histogram( 'rag_streaming_duration_seconds', 'Total streaming duration', buckets=[1, 2, 5, 10, 20, 30, 60] ) # Erreurs de streaming STREAM_ERRORS = Counter( 'rag_streaming_errors_total', 'Total streaming errors', ['error_type'] ) # Connexions actives ACTIVE_STREAMS = Gauge( 'rag_streaming_active_connections', 'Number of active streaming connections' ) class MetricCollector: def __init__(self): self.start_time = None async def track_stream(self, stream_generator): self.start_time = time.time() ACTIVE_STREAMS.inc() first_token = True try: async for event in stream_generator: if first_token and event.type == 'token': TTFB_HISTOGRAM.observe(time.time() - self.start_time) first_token = False yield event except Exception as e: STREAM_ERRORS.labels(error_type=type(e).__name__).inc() raise finally: STREAM_DURATION.observe(time.time() - self.start_time) ACTIVE_STREAMS.dec()

Dashboard de monitoring

DEVELOPERpython
# Endpoints pour le monitoring @app.get("/metrics/streaming") async def streaming_metrics(): return { "active_connections": ACTIVE_STREAMS._value.get(), "avg_ttfb_ms": TTFB_HISTOGRAM._sum.get() / max(TTFB_HISTOGRAM._count.get(), 1) * 1000, "error_rate": STREAM_ERRORS._value.get() / max(total_streams, 1), "p95_duration_s": calculate_p95(STREAM_DURATION) }

Intégration avec Ailog

Ailog gère nativement le streaming avec :

  • SSE optimisé avec reconnexion automatique
  • Buffering intelligent pour une UX fluide
  • Métriques intégrées (TTFB, erreurs, durée)
  • SDK React prêt à l'emploi
DEVELOPERtypescript
import { useAilogChat } from '@ailog/react'; function ChatWidget() { const { messages, isStreaming, sendMessage } = useAilogChat({ channelId: 'your-channel', streaming: true, // Activé par défaut onStreamStart: () => console.log('Stream started'), onStreamEnd: () => console.log('Stream ended') }); // Le composant affiche automatiquement le streaming }

Conclusion

Le streaming transforme l'expérience utilisateur de votre RAG. Points clés :

  1. SSE est le protocole recommandé pour la simplicité
  2. Gestion des erreurs robuste avec reconnexion
  3. Buffering pour un affichage fluide
  4. Métriques pour monitorer la performance
  5. Prefetch pour réduire encore la latence

Ressources complémentaires


Envie d'un streaming clé en main ? Essayez Ailog - streaming optimisé, reconnexion automatique, SDK React inclus.

FAQ

SSE (Server-Sent Events) est recommandé pour la plupart des cas : plus simple à implémenter, fonctionne nativement avec HTTP, et suffit pour le flux unidirectionnel chatbot vers utilisateur. WebSocket est préférable uniquement si vous avez besoin de communication bidirectionnelle temps réel, comme pour un chat collaboratif.
Utilisez un buffer qui accumule les tokens pendant 50-100ms avant de les afficher. Flushez immédiatement sur les caractères de ponctuation pour un rendu naturel. Ajoutez un curseur animé pendant la génération pour indiquer que la réponse est en cours.
Implémentez une reconnexion automatique avec backoff exponentiel. Conservez le contexte de la conversation côté client pour reprendre sans perdre l'historique. Affichez un indicateur de reconnexion plutôt qu'une erreur pour ne pas interrompre l'expérience utilisateur.
Parallélisez les étapes du pipeline RAG : commencez l'embedding pendant que l'utilisateur finit de taper (prefetch). Utilisez un cache pour les recherches vectorielles similaires. Envoyez les métadonnées (sources, tokens estimés) dès qu'elles sont disponibles, avant même le premier token de réponse.
Le streaming maintient une connexion ouverte plus longtemps, ce qui augmente le nombre de connexions simultanées. Utilisez un reverse proxy (nginx) avec buffering désactivé et configurez des timeouts appropriés. En pratique, l'overhead est négligeable comparé au gain d'expérience utilisateur.

Tags

RAGstreamingtemps réelSSEWebSocketperformanceUX

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !