Streaming RAG: Echtzeit-Antworten implementieren
Umfassender technischer Leitfaden zur Implementierung von Streaming in Ihrem RAG-System: Architektur, Python/TypeScript-Code, Fehlerbehandlung und UX-Optimierung.
TL;DR
Das Streaming RAG ermöglicht es, Antworten Token für Token anzuzeigen, statt auf die vollständige Generierung zu warten. Ergebnis: eine wahrgenommene Latenz, die sich um den Faktor 5 reduziert, und eine bessere Nutzererfahrung. Dieser Leitfaden behandelt die technische Architektur, die Backend/Frontend-Implementierung und fortgeschrittene Patterns für ein robustes Streaming in Produktion.
Warum Streaming essenziell ist
Das Latenzproblem bei RAG
Eine typische RAG-Pipeline benötigt 2–5 Sekunden, bevor irgendetwas angezeigt wird:
Requête utilisateur
↓ 200ms - Embedding de la requête
↓ 300ms - Recherche vectorielle
↓ 100ms - Reranking
↓ 2000ms - Génération LLM
↓ Réponse complète
Mit Streaming:
Requête utilisateur
↓ 200ms - Embedding de la requête
↓ 300ms - Recherche vectorielle
↓ 100ms - Reranking
↓ 100ms - Premier token
↓ ... tokens en continu ...
↓ Réponse complète
Einfluss auf die Nutzererfahrung
| Metrik | Ohne Streaming | Mit Streaming |
|---|---|---|
| Time to First Byte (TTFB) | 2500ms | 600ms |
| Wahrgenommene Latenz | Hoch | Niedrig |
| User-Abbrüche | 15% | 3% |
| Zufriedenheit (NPS) | +25 | +58 |
Architektur des Streaming RAG
Überblick
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │←──→│ Backend │←──→│ LLM │
│ (Browser) │SSE │ (FastAPI) │ │ (OpenAI) │
└─────────────┘ └─────────────┘ └─────────────┘
↑ ↑ ↑
│ │ │
Affichage Orchestration Génération
progressif RAG + Stream streaming
Protokollwahl
| Protokoll | Vorteile | Nachteile | Verwendung |
|---|---|---|---|
| SSE | Einfach, native HTTP | Unidirektional | Empfohlen |
| WebSocket | Bidirektional | Komplexität | Interaktiver Chat |
| HTTP/2 Push | Leistungsfähig | Begrenzte Unterstützung | Fortgeschrittene Anwendungsfälle |
Backend-Implementierung (Python/FastAPI)
Grundkonfiguration
DEVELOPERpythonfrom fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import OpenAI import json import asyncio from typing import AsyncGenerator app = FastAPI() client = OpenAI() class RAGStreamingPipeline: def __init__(self, vector_db, embedding_model, llm_client): self.vector_db = vector_db self.embedding_model = embedding_model self.llm_client = llm_client async def stream_response( self, query: str, system_prompt: str ) -> AsyncGenerator[str, None]: """Pipeline RAG complet avec streaming.""" # 1. Retrieval (nicht gestreamt) query_embedding = await self.embedding_model.embed(query) documents = await self.vector_db.search(query_embedding, top_k=5) # 2. Kontext aufbauen context = self._format_context(documents) # 3. Sende zuerst die Metadaten yield self._format_sse_event("metadata", { "sources": [doc.source for doc in documents], "retrieval_time_ms": 350 }) # 4. Streamen der Generierung messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] stream = self.llm_client.chat.completions.create( model="gpt-4", messages=messages, stream=True ) full_response = "" for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token yield self._format_sse_event("token", {"content": token}) # 5. Sende das Endsignal yield self._format_sse_event("done", { "total_tokens": len(full_response.split()), "generation_time_ms": 1500 }) def _format_context(self, documents: list) -> str: return "\n\n".join([ f"[{doc.source}]\n{doc.content}" for doc in documents ]) def _format_sse_event(self, event_type: str, data: dict) -> str: return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
FastAPI-Endpoint
DEVELOPERpythonfrom fastapi import FastAPI, Query from fastapi.responses import StreamingResponse app = FastAPI() pipeline = RAGStreamingPipeline(vector_db, embedding_model, llm_client) @app.get("/api/chat/stream") async def stream_chat( query: str = Query(..., description="User query"), channel_id: str = Query(..., description="Channel ID") ): """Endpoint SSE pour le streaming de réponses RAG.""" # Lade die Channel-Konfiguration channel_config = await get_channel_config(channel_id) async def event_generator(): try: async for event in pipeline.stream_response( query=query, system_prompt=channel_config.system_prompt ): yield event except Exception as e: yield pipeline._format_sse_event("error", { "message": str(e), "code": "GENERATION_ERROR" }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Deaktiviert das nginx-Buffering } )
Fehlerbehandlung beim Streaming
DEVELOPERpythonclass StreamingErrorHandler: """Gestion robuste des erreurs pendant le streaming.""" @staticmethod async def safe_stream(generator, max_retries=3): """Wrapper pour gérer les erreurs de streaming.""" retries = 0 while retries < max_retries: try: async for event in generator: yield event return # Succès except OpenAIRateLimitError: retries += 1 yield format_sse_event("warning", { "message": "Rate limit hit, retrying...", "retry": retries }) await asyncio.sleep(2 ** retries) except OpenAIConnectionError: yield format_sse_event("error", { "message": "Connection lost. Please retry.", "recoverable": True }) return except Exception as e: yield format_sse_event("error", { "message": "An error occurred.", "code": type(e).__name__ }) return yield format_sse_event("error", { "message": "Max retries exceeded.", "recoverable": False })
Frontend-Implementierung (TypeScript/React)
React-Hook für Streaming
DEVELOPERtypescriptimport { useState, useCallback, useRef } from 'react'; interface StreamingMessage { content: string; isComplete: boolean; sources: string[]; error?: string; } interface UseStreamingChatOptions { channelId: string; onToken?: (token: string) => void; onComplete?: (response: string) => void; onError?: (error: Error) => void; } export function useStreamingChat({ channelId, onToken, onComplete, onError }: UseStreamingChatOptions) { const [message, setMessage] = useState<StreamingMessage>({ content: '', isComplete: false, sources: [] }); const [isStreaming, setIsStreaming] = useState(false); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (query: string) => { // Vorherigen Stream abbrechen, falls vorhanden if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setIsStreaming(true); setMessage({ content: '', isComplete: false, sources: [] }); try { const response = await fetch( `/api/chat/stream?query=${encodeURIComponent(query)}&channel_id=${channelId}`, { method: 'GET', headers: { 'Accept': 'text/event-stream' }, signal: abortControllerRef.current.signal } ); if (!response.ok) { throw new Error(`HTTP error: ${response.status}`); } const reader = response.body?.getReader(); const decoder = new TextDecoder(); if (!reader) { throw new Error('No response body'); } let buffer = ''; let fullContent = ''; let sources: string[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const events = buffer.split('\n\n'); buffer = events.pop() || ''; for (const eventStr of events) { if (!eventStr.trim()) continue; const event = parseSSEEvent(eventStr); switch (event.type) { case 'metadata': sources = event.data.sources; setMessage(prev => ({ ...prev, sources })); break; case 'token': fullContent += event.data.content; setMessage(prev => ({ ...prev, content: fullContent })); onToken?.(event.data.content); break; case 'done': setMessage(prev => ({ ...prev, isComplete: true })); onComplete?.(fullContent); break; case 'error': setMessage(prev => ({ ...prev, error: event.data.message })); onError?.(new Error(event.data.message)); break; } } } } catch (error) { if ((error as Error).name !== 'AbortError') { setMessage(prev => ({ ...prev, error: (error as Error).message })); onError?.(error as Error); } } finally { setIsStreaming(false); } }, [channelId, onToken, onComplete, onError]); const stopStreaming = useCallback(() => { abortControllerRef.current?.abort(); setIsStreaming(false); }, []); return { message, isStreaming, sendMessage, stopStreaming }; } function parseSSEEvent(eventStr: string): { type: string; data: any } { const lines = eventStr.split('\n'); let eventType = 'message'; let data = ''; for (const line of lines) { if (line.startsWith('event: ')) { eventType = line.slice(7); } else if (line.startsWith('data: ')) { data = line.slice(6); } } return { type: eventType, data: JSON.parse(data) }; }
Chat-Komponente mit Streaming
DEVELOPERtypescriptimport { useStreamingChat } from './useStreamingChat'; interface StreamingChatProps { channelId: string; } export function StreamingChat({ channelId }: StreamingChatProps) { const [input, setInput] = useState(''); const { message, isStreaming, sendMessage, stopStreaming } = useStreamingChat({ channelId, onComplete: (response) => { console.log('Response complete:', response.length, 'characters'); } }); const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); if (input.trim() && !isStreaming) { sendMessage(input); setInput(''); } }; return ( <div className="streaming-chat"> <div className="messages"> {message.content && ( <div className="message assistant"> <StreamingText text={message.content} isStreaming={isStreaming} /> {message.isComplete && message.sources.length > 0 && ( <SourcesList sources={message.sources} /> )} {message.error && ( <ErrorMessage error={message.error} /> )} </div> )} </div> <form onSubmit={handleSubmit} className="input-form"> <input type="text" value={input} onChange={(e) => setInput(e.target.value)} placeholder="Posez votre question..." disabled={isStreaming} /> {isStreaming ? ( <button type="button" onClick={stopStreaming}> Arrêter </button> ) : ( <button type="submit" disabled={!input.trim()}> Envoyer </button> )} </form> </div> ); } function StreamingText({ text, isStreaming }: { text: string; isStreaming: boolean; }) { return ( <div className="streaming-text"> {text} {isStreaming && <span className="cursor">▊</span>} </div> ); }
Fortgeschrittene Optimierungen
1. Prefetch des Kontexts
Beginnen Sie mit der Retrieval, bevor der Nutzer das Tippen beendet:
DEVELOPERtypescriptimport { useDeferredValue, useEffect, useRef } from 'react'; function usePrefetchContext(query: string, channelId: string) { const deferredQuery = useDeferredValue(query); const cacheRef = useRef<Map<string, any>>(new Map()); useEffect(() => { if (deferredQuery.length < 3) return; const cacheKey = `${channelId}:${deferredQuery}`; if (cacheRef.current.has(cacheKey)) return; // Prefetch im Hintergrund fetch(`/api/context/prefetch?q=${encodeURIComponent(deferredQuery)}&channel=${channelId}`) .then(res => res.json()) .then(context => { cacheRef.current.set(cacheKey, context); }) .catch(() => {}); // Silently fail }, [deferredQuery, channelId]); return cacheRef.current; }
2. Intelligentes Buffering
Tokens sammeln für eine flüssigere Darstellung:
DEVELOPERtypescriptclass TokenBuffer { private buffer: string[] = []; private flushCallback: (text: string) => void; private flushInterval: number; private timer: NodeJS.Timeout | null = null; constructor( flushCallback: (text: string) => void, flushInterval: number = 50 // ms ) { this.flushCallback = flushCallback; this.flushInterval = flushInterval; } add(token: string) { this.buffer.push(token); // Sofortiges Flushen bei Satzzeichen if (/[.!?\n]$/.test(token)) { this.flush(); return; } // Andernfalls verzögert flushen if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.buffer.length > 0) { this.flushCallback(this.buffer.join('')); this.buffer = []; } } }
3. Automatische Wiederverbindung
DEVELOPERtypescriptclass ResilientEventSource { private url: string; private onMessage: (event: any) => void; private onError: (error: Error) => void; private maxRetries: number; private retryCount: number = 0; private eventSource: EventSource | null = null; constructor(options: { url: string; onMessage: (event: any) => void; onError: (error: Error) => void; maxRetries?: number; }) { this.url = options.url; this.onMessage = options.onMessage; this.onError = options.onError; this.maxRetries = options.maxRetries || 3; this.connect(); } private connect() { this.eventSource = new EventSource(this.url); this.eventSource.onmessage = (event) => { this.retryCount = 0; // Reset on success this.onMessage(JSON.parse(event.data)); }; this.eventSource.onerror = (error) => { this.eventSource?.close(); if (this.retryCount < this.maxRetries) { this.retryCount++; const delay = Math.pow(2, this.retryCount) * 1000; setTimeout(() => this.connect(), delay); } else { this.onError(new Error('Max retries exceeded')); } }; } close() { this.eventSource?.close(); } }
Monitoring und Metriken
Wichtige Metriken zum Tracken
DEVELOPERpythonfrom prometheus_client import Histogram, Counter, Gauge # Latenz bis zum ersten Token TTFB_HISTOGRAM = Histogram( 'rag_streaming_ttfb_seconds', 'Time to first byte', buckets=[0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 5.0] ) # Gesamtdauer des Streamings STREAM_DURATION = Histogram( 'rag_streaming_duration_seconds', 'Total streaming duration', buckets=[1, 2, 5, 10, 20, 30, 60] ) # Streaming-Fehler STREAM_ERRORS = Counter( 'rag_streaming_errors_total', 'Total streaming errors', ['error_type'] ) # Aktive Verbindungen ACTIVE_STREAMS = Gauge( 'rag_streaming_active_connections', 'Number of active streaming connections' ) class MetricCollector: def __init__(self): self.start_time = None async def track_stream(self, stream_generator): self.start_time = time.time() ACTIVE_STREAMS.inc() first_token = True try: async for event in stream_generator: if first_token and event.type == 'token': TTFB_HISTOGRAM.observe(time.time() - self.start_time) first_token = False yield event except Exception as e: STREAM_ERRORS.labels(error_type=type(e).__name__).inc() raise finally: STREAM_DURATION.observe(time.time() - self.start_time) ACTIVE_STREAMS.dec()
Monitoring-Dashboard
DEVELOPERpython# Endpunkte für das Monitoring @app.get("/metrics/streaming") async def streaming_metrics(): return { "active_connections": ACTIVE_STREAMS._value.get(), "avg_ttfb_ms": TTFB_HISTOGRAM._sum.get() / max(TTFB_HISTOGRAM._count.get(), 1) * 1000, "error_rate": STREAM_ERRORS._value.get() / max(total_streams, 1), "p95_duration_s": calculate_p95(STREAM_DURATION) }
Integration mit Ailog
Ailog unterstützt Streaming nativ mit:
- SSE optimiert mit automatischer Wiederverbindung
- Intelligentes Buffering für eine flüssige UX
- Integrierte Metriken (TTFB, Fehler, Dauer)
- React-SDK einsatzbereit
DEVELOPERtypescriptimport { useAilogChat } from '@ailog/react'; function ChatWidget() { const { messages, isStreaming, sendMessage } = useAilogChat({ channelId: 'your-channel', streaming: true, // Activé par défaut onStreamStart: () => console.log('Stream started'), onStreamEnd: () => console.log('Stream ended') }); // Le composant affiche automatiquement le streaming }
Fazit
Streaming verwandelt die Nutzererfahrung Ihres RAG. Wichtige Punkte:
- SSE ist das empfohlene Protokoll wegen der Einfachheit
- Robuste Fehlerbehandlung mit automatischer Wiederverbindung
- Buffering für eine flüssige Darstellung
- Metriken zur Performance-Überwachung
- Prefetch zur weiteren Verringerung der Latenz
Weiterführende Ressourcen
- Introduction au RAG - Grundlagen
- Génération LLM pour RAG - Übergeordneter Guide
- Réduire la latence RAG - Optimierungen
- Prompt Engineering RAG - Prompts optimieren
Lust auf ein schlüsselfertiges Streaming? Essayez Ailog - optimiertes Streaming, automatische Wiederverbindung, React-SDK inklusive.
FAQ
Tags
Verwandte Artikel
RAG-Agenten: Orchestrierung von Multi-Agenten-Systemen
Konzipieren Sie RAG-basierte Multi-Agenten-Systeme: Orchestrierung, Spezialisierung, Zusammenarbeit und Fehlerbehandlung für komplexe Assistenten.
Konversationelles RAG: Gedächtnis und Kontext über mehrere Sitzungen
Implementieren Sie ein RAG mit konversationellem Gedächtnis: Verwaltung des Kontexts, Verlauf über mehrere Sitzungen und Personalisierung der Antworten.
Agentic RAG 2025: Aufbau autonomer KI-Agenten (Kompletter Leitfaden)
Kompletter Agentic RAG-Leitfaden: Architektur, Design Patterns, autonome Agenten mit dynamischem Retrieval, Multi-Tool-Orchestrierung. Mit Beispielen LangGraph und CrewAI.