AnleitungExperte

Streaming RAG: Echtzeit-Antworten implementieren

14. März 2026
20 Minuten Lesezeit
Équipe Ailog

Umfassender technischer Leitfaden zur Implementierung von Streaming in Ihrem RAG-System: Architektur, Python/TypeScript-Code, Fehlerbehandlung und UX-Optimierung.

TL;DR

Das Streaming RAG ermöglicht es, Antworten Token für Token anzuzeigen, statt auf die vollständige Generierung zu warten. Ergebnis: eine wahrgenommene Latenz, die sich um den Faktor 5 reduziert, und eine bessere Nutzererfahrung. Dieser Leitfaden behandelt die technische Architektur, die Backend/Frontend-Implementierung und fortgeschrittene Patterns für ein robustes Streaming in Produktion.

Warum Streaming essenziell ist

Das Latenzproblem bei RAG

Eine typische RAG-Pipeline benötigt 2–5 Sekunden, bevor irgendetwas angezeigt wird:

Requête utilisateur
    ↓ 200ms - Embedding de la requête
    ↓ 300ms - Recherche vectorielle
    ↓ 100ms - Reranking
    ↓ 2000ms - Génération LLM
    ↓ Réponse complète

Mit Streaming:

Requête utilisateur
    ↓ 200ms - Embedding de la requête
    ↓ 300ms - Recherche vectorielle
    ↓ 100ms - Reranking
    ↓ 100ms - Premier token
    ↓ ... tokens en continu ...
    ↓ Réponse complète

Einfluss auf die Nutzererfahrung

MetrikOhne StreamingMit Streaming
Time to First Byte (TTFB)2500ms600ms
Wahrgenommene LatenzHochNiedrig
User-Abbrüche15%3%
Zufriedenheit (NPS)+25+58

Architektur des Streaming RAG

Überblick

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Client    │←──→│   Backend   │←──→│     LLM     │
│  (Browser)  │SSE │  (FastAPI)  │    │  (OpenAI)   │
└─────────────┘    └─────────────┘    └─────────────┘
       ↑                  ↑                  ↑
       │                  │                  │
   Affichage         Orchestration      Génération
   progressif        RAG + Stream        streaming

Protokollwahl

ProtokollVorteileNachteileVerwendung
SSEEinfach, native HTTPUnidirektionalEmpfohlen
WebSocketBidirektionalKomplexitätInteraktiver Chat
HTTP/2 PushLeistungsfähigBegrenzte UnterstützungFortgeschrittene Anwendungsfälle

Backend-Implementierung (Python/FastAPI)

Grundkonfiguration

DEVELOPERpython
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import OpenAI import json import asyncio from typing import AsyncGenerator app = FastAPI() client = OpenAI() class RAGStreamingPipeline: def __init__(self, vector_db, embedding_model, llm_client): self.vector_db = vector_db self.embedding_model = embedding_model self.llm_client = llm_client async def stream_response( self, query: str, system_prompt: str ) -> AsyncGenerator[str, None]: """Pipeline RAG complet avec streaming.""" # 1. Retrieval (nicht gestreamt) query_embedding = await self.embedding_model.embed(query) documents = await self.vector_db.search(query_embedding, top_k=5) # 2. Kontext aufbauen context = self._format_context(documents) # 3. Sende zuerst die Metadaten yield self._format_sse_event("metadata", { "sources": [doc.source for doc in documents], "retrieval_time_ms": 350 }) # 4. Streamen der Generierung messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] stream = self.llm_client.chat.completions.create( model="gpt-4", messages=messages, stream=True ) full_response = "" for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token yield self._format_sse_event("token", {"content": token}) # 5. Sende das Endsignal yield self._format_sse_event("done", { "total_tokens": len(full_response.split()), "generation_time_ms": 1500 }) def _format_context(self, documents: list) -> str: return "\n\n".join([ f"[{doc.source}]\n{doc.content}" for doc in documents ]) def _format_sse_event(self, event_type: str, data: dict) -> str: return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"

FastAPI-Endpoint

DEVELOPERpython
from fastapi import FastAPI, Query from fastapi.responses import StreamingResponse app = FastAPI() pipeline = RAGStreamingPipeline(vector_db, embedding_model, llm_client) @app.get("/api/chat/stream") async def stream_chat( query: str = Query(..., description="User query"), channel_id: str = Query(..., description="Channel ID") ): """Endpoint SSE pour le streaming de réponses RAG.""" # Lade die Channel-Konfiguration channel_config = await get_channel_config(channel_id) async def event_generator(): try: async for event in pipeline.stream_response( query=query, system_prompt=channel_config.system_prompt ): yield event except Exception as e: yield pipeline._format_sse_event("error", { "message": str(e), "code": "GENERATION_ERROR" }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Deaktiviert das nginx-Buffering } )

Fehlerbehandlung beim Streaming

DEVELOPERpython
class StreamingErrorHandler: """Gestion robuste des erreurs pendant le streaming.""" @staticmethod async def safe_stream(generator, max_retries=3): """Wrapper pour gérer les erreurs de streaming.""" retries = 0 while retries < max_retries: try: async for event in generator: yield event return # Succès except OpenAIRateLimitError: retries += 1 yield format_sse_event("warning", { "message": "Rate limit hit, retrying...", "retry": retries }) await asyncio.sleep(2 ** retries) except OpenAIConnectionError: yield format_sse_event("error", { "message": "Connection lost. Please retry.", "recoverable": True }) return except Exception as e: yield format_sse_event("error", { "message": "An error occurred.", "code": type(e).__name__ }) return yield format_sse_event("error", { "message": "Max retries exceeded.", "recoverable": False })

Frontend-Implementierung (TypeScript/React)

React-Hook für Streaming

DEVELOPERtypescript
import { useState, useCallback, useRef } from 'react'; interface StreamingMessage { content: string; isComplete: boolean; sources: string[]; error?: string; } interface UseStreamingChatOptions { channelId: string; onToken?: (token: string) => void; onComplete?: (response: string) => void; onError?: (error: Error) => void; } export function useStreamingChat({ channelId, onToken, onComplete, onError }: UseStreamingChatOptions) { const [message, setMessage] = useState<StreamingMessage>({ content: '', isComplete: false, sources: [] }); const [isStreaming, setIsStreaming] = useState(false); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (query: string) => { // Vorherigen Stream abbrechen, falls vorhanden if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setIsStreaming(true); setMessage({ content: '', isComplete: false, sources: [] }); try { const response = await fetch( `/api/chat/stream?query=${encodeURIComponent(query)}&channel_id=${channelId}`, { method: 'GET', headers: { 'Accept': 'text/event-stream' }, signal: abortControllerRef.current.signal } ); if (!response.ok) { throw new Error(`HTTP error: ${response.status}`); } const reader = response.body?.getReader(); const decoder = new TextDecoder(); if (!reader) { throw new Error('No response body'); } let buffer = ''; let fullContent = ''; let sources: string[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const events = buffer.split('\n\n'); buffer = events.pop() || ''; for (const eventStr of events) { if (!eventStr.trim()) continue; const event = parseSSEEvent(eventStr); switch (event.type) { case 'metadata': sources = event.data.sources; setMessage(prev => ({ ...prev, sources })); break; case 'token': fullContent += event.data.content; setMessage(prev => ({ ...prev, content: fullContent })); onToken?.(event.data.content); break; case 'done': setMessage(prev => ({ ...prev, isComplete: true })); onComplete?.(fullContent); break; case 'error': setMessage(prev => ({ ...prev, error: event.data.message })); onError?.(new Error(event.data.message)); break; } } } } catch (error) { if ((error as Error).name !== 'AbortError') { setMessage(prev => ({ ...prev, error: (error as Error).message })); onError?.(error as Error); } } finally { setIsStreaming(false); } }, [channelId, onToken, onComplete, onError]); const stopStreaming = useCallback(() => { abortControllerRef.current?.abort(); setIsStreaming(false); }, []); return { message, isStreaming, sendMessage, stopStreaming }; } function parseSSEEvent(eventStr: string): { type: string; data: any } { const lines = eventStr.split('\n'); let eventType = 'message'; let data = ''; for (const line of lines) { if (line.startsWith('event: ')) { eventType = line.slice(7); } else if (line.startsWith('data: ')) { data = line.slice(6); } } return { type: eventType, data: JSON.parse(data) }; }

Chat-Komponente mit Streaming

DEVELOPERtypescript
import { useStreamingChat } from './useStreamingChat'; interface StreamingChatProps { channelId: string; } export function StreamingChat({ channelId }: StreamingChatProps) { const [input, setInput] = useState(''); const { message, isStreaming, sendMessage, stopStreaming } = useStreamingChat({ channelId, onComplete: (response) => { console.log('Response complete:', response.length, 'characters'); } }); const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); if (input.trim() && !isStreaming) { sendMessage(input); setInput(''); } }; return ( <div className="streaming-chat"> <div className="messages"> {message.content && ( <div className="message assistant"> <StreamingText text={message.content} isStreaming={isStreaming} /> {message.isComplete && message.sources.length > 0 && ( <SourcesList sources={message.sources} /> )} {message.error && ( <ErrorMessage error={message.error} /> )} </div> )} </div> <form onSubmit={handleSubmit} className="input-form"> <input type="text" value={input} onChange={(e) => setInput(e.target.value)} placeholder="Posez votre question..." disabled={isStreaming} /> {isStreaming ? ( <button type="button" onClick={stopStreaming}> Arrêter </button> ) : ( <button type="submit" disabled={!input.trim()}> Envoyer </button> )} </form> </div> ); } function StreamingText({ text, isStreaming }: { text: string; isStreaming: boolean; }) { return ( <div className="streaming-text"> {text} {isStreaming && <span className="cursor"></span>} </div> ); }

Fortgeschrittene Optimierungen

1. Prefetch des Kontexts

Beginnen Sie mit der Retrieval, bevor der Nutzer das Tippen beendet:

DEVELOPERtypescript
import { useDeferredValue, useEffect, useRef } from 'react'; function usePrefetchContext(query: string, channelId: string) { const deferredQuery = useDeferredValue(query); const cacheRef = useRef<Map<string, any>>(new Map()); useEffect(() => { if (deferredQuery.length < 3) return; const cacheKey = `${channelId}:${deferredQuery}`; if (cacheRef.current.has(cacheKey)) return; // Prefetch im Hintergrund fetch(`/api/context/prefetch?q=${encodeURIComponent(deferredQuery)}&channel=${channelId}`) .then(res => res.json()) .then(context => { cacheRef.current.set(cacheKey, context); }) .catch(() => {}); // Silently fail }, [deferredQuery, channelId]); return cacheRef.current; }

2. Intelligentes Buffering

Tokens sammeln für eine flüssigere Darstellung:

DEVELOPERtypescript
class TokenBuffer { private buffer: string[] = []; private flushCallback: (text: string) => void; private flushInterval: number; private timer: NodeJS.Timeout | null = null; constructor( flushCallback: (text: string) => void, flushInterval: number = 50 // ms ) { this.flushCallback = flushCallback; this.flushInterval = flushInterval; } add(token: string) { this.buffer.push(token); // Sofortiges Flushen bei Satzzeichen if (/[.!?\n]$/.test(token)) { this.flush(); return; } // Andernfalls verzögert flushen if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.buffer.length > 0) { this.flushCallback(this.buffer.join('')); this.buffer = []; } } }

3. Automatische Wiederverbindung

DEVELOPERtypescript
class ResilientEventSource { private url: string; private onMessage: (event: any) => void; private onError: (error: Error) => void; private maxRetries: number; private retryCount: number = 0; private eventSource: EventSource | null = null; constructor(options: { url: string; onMessage: (event: any) => void; onError: (error: Error) => void; maxRetries?: number; }) { this.url = options.url; this.onMessage = options.onMessage; this.onError = options.onError; this.maxRetries = options.maxRetries || 3; this.connect(); } private connect() { this.eventSource = new EventSource(this.url); this.eventSource.onmessage = (event) => { this.retryCount = 0; // Reset on success this.onMessage(JSON.parse(event.data)); }; this.eventSource.onerror = (error) => { this.eventSource?.close(); if (this.retryCount < this.maxRetries) { this.retryCount++; const delay = Math.pow(2, this.retryCount) * 1000; setTimeout(() => this.connect(), delay); } else { this.onError(new Error('Max retries exceeded')); } }; } close() { this.eventSource?.close(); } }

Monitoring und Metriken

Wichtige Metriken zum Tracken

DEVELOPERpython
from prometheus_client import Histogram, Counter, Gauge # Latenz bis zum ersten Token TTFB_HISTOGRAM = Histogram( 'rag_streaming_ttfb_seconds', 'Time to first byte', buckets=[0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 5.0] ) # Gesamtdauer des Streamings STREAM_DURATION = Histogram( 'rag_streaming_duration_seconds', 'Total streaming duration', buckets=[1, 2, 5, 10, 20, 30, 60] ) # Streaming-Fehler STREAM_ERRORS = Counter( 'rag_streaming_errors_total', 'Total streaming errors', ['error_type'] ) # Aktive Verbindungen ACTIVE_STREAMS = Gauge( 'rag_streaming_active_connections', 'Number of active streaming connections' ) class MetricCollector: def __init__(self): self.start_time = None async def track_stream(self, stream_generator): self.start_time = time.time() ACTIVE_STREAMS.inc() first_token = True try: async for event in stream_generator: if first_token and event.type == 'token': TTFB_HISTOGRAM.observe(time.time() - self.start_time) first_token = False yield event except Exception as e: STREAM_ERRORS.labels(error_type=type(e).__name__).inc() raise finally: STREAM_DURATION.observe(time.time() - self.start_time) ACTIVE_STREAMS.dec()

Monitoring-Dashboard

DEVELOPERpython
# Endpunkte für das Monitoring @app.get("/metrics/streaming") async def streaming_metrics(): return { "active_connections": ACTIVE_STREAMS._value.get(), "avg_ttfb_ms": TTFB_HISTOGRAM._sum.get() / max(TTFB_HISTOGRAM._count.get(), 1) * 1000, "error_rate": STREAM_ERRORS._value.get() / max(total_streams, 1), "p95_duration_s": calculate_p95(STREAM_DURATION) }

Integration mit Ailog

Ailog unterstützt Streaming nativ mit:

  • SSE optimiert mit automatischer Wiederverbindung
  • Intelligentes Buffering für eine flüssige UX
  • Integrierte Metriken (TTFB, Fehler, Dauer)
  • React-SDK einsatzbereit
DEVELOPERtypescript
import { useAilogChat } from '@ailog/react'; function ChatWidget() { const { messages, isStreaming, sendMessage } = useAilogChat({ channelId: 'your-channel', streaming: true, // Activé par défaut onStreamStart: () => console.log('Stream started'), onStreamEnd: () => console.log('Stream ended') }); // Le composant affiche automatiquement le streaming }

Fazit

Streaming verwandelt die Nutzererfahrung Ihres RAG. Wichtige Punkte:

  1. SSE ist das empfohlene Protokoll wegen der Einfachheit
  2. Robuste Fehlerbehandlung mit automatischer Wiederverbindung
  3. Buffering für eine flüssige Darstellung
  4. Metriken zur Performance-Überwachung
  5. Prefetch zur weiteren Verringerung der Latenz

Weiterführende Ressourcen


Lust auf ein schlüsselfertiges Streaming? Essayez Ailog - optimiertes Streaming, automatische Wiederverbindung, React-SDK inklusive.

FAQ

SSE (Server-Sent Events) wird für die meisten Fälle empfohlen: einfacher umzusetzen, funktioniert nativ mit HTTP und ist ausreichend für den unidirektionalen Datenfluss vom Chatbot zum Nutzer. WebSocket ist nur dann vorzuziehen, wenn Sie bidirektionale, latenzkritische Kommunikation benötigen, z. B. bei kollaborativen Chat-Szenarien.
Verwenden Sie einen Buffer, der Tokens für 50–100 ms sammelt, bevor sie gerendert werden. Flushen Sie sofort bei Satzzeichen für ein natürliches Rendering. Fügen Sie während der Generierung einen animierten Cursor hinzu, um anzuzeigen, dass die Antwort noch läuft.
Implementieren Sie automatische Wiederverbindung mit exponentiellem Backoff. Bewahren Sie den Gesprächskontext clientseitig, um ohne Verlust der Historie fortzusetzen. Zeigen Sie einen Reconnect-Indikator statt einer harten Fehlermeldung, um die Nutzererfahrung nicht zu unterbrechen.
Parallelisieren Sie die Pipeline-Schritte: starten Sie das Embedding, während der Nutzer noch tippt (prefetch). Verwenden Sie Caches für ähnliche vector-Suchen. Senden Sie Metadaten (Quellen, geschätzte Tokens) sobald verfügbar — noch vor dem ersten Token der Antwort.
Streaming hält Verbindungen länger offen, was die Anzahl gleichzeitiger Verbindungen erhöht. Nutzen Sie einen Reverse-Proxy (nginx) mit deaktiviertem Buffering und setzen Sie angemessene Timeouts. In der Praxis ist der Overhead im Vergleich zum Gewinn in der Nutzererfahrung vernachlässigbar.

Tags

RAGstreamingtemps réelSSEWebSocketperformanceUX

Verwandte Artikel

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !