Streaming RAG: Implementing Real-Time Responses
Complete technical guide for implementing streaming in your RAG system: architecture, Python/TypeScript code, error handling, and UX optimization.
TL;DR
Streaming RAG allows displaying responses token by token instead of waiting for complete generation. Result: perceived latency divided by 5 and better user experience. This guide covers technical architecture, backend/frontend implementation, and advanced patterns for robust production streaming.
Why Streaming is Essential
The Latency Problem in RAG
A typical RAG pipeline takes 2-5 seconds before displaying anything:
User query
↓ 200ms - Query embedding
↓ 300ms - Vector search
↓ 100ms - Reranking
↓ 2000ms - LLM generation
↓ Complete response
With streaming:
User query
↓ 200ms - Query embedding
↓ 300ms - Vector search
↓ 100ms - Reranking
↓ 100ms - First token
↓ ... continuous tokens ...
↓ Complete response
Impact on User Experience
| Metric | Without streaming | With streaming |
|---|---|---|
| Time to First Byte (TTFB) | 2500ms | 600ms |
| Perceived latency | High | Low |
| User abandonment | 15% | 3% |
| Satisfaction (NPS) | +25 | +58 |
Streaming RAG Architecture
Overview
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │←──→│ Backend │←──→│ LLM │
│ (Browser) │SSE │ (FastAPI) │ │ (OpenAI) │
└─────────────┘ └─────────────┘ └─────────────┘
↑ ↑ ↑
│ │ │
Progressive RAG + Streaming
display orchestration generation
Protocol Choice
| Protocol | Advantages | Disadvantages | Use case |
|---|---|---|---|
| SSE | Simple, native HTTP | Unidirectional | Recommended |
| WebSocket | Bidirectional | Complexity | Interactive chat |
| HTTP/2 Push | Performant | Limited support | Advanced cases |
Backend Implementation (Python/FastAPI)
Basic Setup
DEVELOPERpythonfrom fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import OpenAI import json import asyncio from typing import AsyncGenerator app = FastAPI() client = OpenAI() class RAGStreamingPipeline: def __init__(self, vector_db, embedding_model, llm_client): self.vector_db = vector_db self.embedding_model = embedding_model self.llm_client = llm_client async def stream_response( self, query: str, system_prompt: str ) -> AsyncGenerator[str, None]: """Complete RAG pipeline with streaming.""" # 1. Retrieval (non-streamed) query_embedding = await self.embedding_model.embed(query) documents = await self.vector_db.search(query_embedding, top_k=5) # 2. Build context context = self._format_context(documents) # 3. Send metadata first yield self._format_sse_event("metadata", { "sources": [doc.source for doc in documents], "retrieval_time_ms": 350 }) # 4. Stream generation messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] stream = self.llm_client.chat.completions.create( model="gpt-4", messages=messages, stream=True ) full_response = "" for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token yield self._format_sse_event("token", {"content": token}) # 5. Send completion signal yield self._format_sse_event("done", { "total_tokens": len(full_response.split()), "generation_time_ms": 1500 }) def _format_context(self, documents: list) -> str: return "\n\n".join([ f"[{doc.source}]\n{doc.content}" for doc in documents ]) def _format_sse_event(self, event_type: str, data: dict) -> str: return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
FastAPI Endpoint
DEVELOPERpythonfrom fastapi import FastAPI, Query from fastapi.responses import StreamingResponse app = FastAPI() pipeline = RAGStreamingPipeline(vector_db, embedding_model, llm_client) @app.get("/api/chat/stream") async def stream_chat( query: str = Query(..., description="User query"), channel_id: str = Query(..., description="Channel ID") ): """SSE endpoint for streaming RAG responses.""" # Load channel configuration channel_config = await get_channel_config(channel_id) async def event_generator(): try: async for event in pipeline.stream_response( query=query, system_prompt=channel_config.system_prompt ): yield event except Exception as e: yield pipeline._format_sse_event("error", { "message": str(e), "code": "GENERATION_ERROR" }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } )
Error Handling in Streaming
DEVELOPERpythonclass StreamingErrorHandler: """Robust error handling during streaming.""" @staticmethod async def safe_stream(generator, max_retries=3): """Wrapper to handle streaming errors.""" retries = 0 while retries < max_retries: try: async for event in generator: yield event return # Success except OpenAIRateLimitError: retries += 1 yield format_sse_event("warning", { "message": "Rate limit hit, retrying...", "retry": retries }) await asyncio.sleep(2 ** retries) except OpenAIConnectionError: yield format_sse_event("error", { "message": "Connection lost. Please retry.", "recoverable": True }) return except Exception as e: yield format_sse_event("error", { "message": "An error occurred.", "code": type(e).__name__ }) return yield format_sse_event("error", { "message": "Max retries exceeded.", "recoverable": False })
Frontend Implementation (TypeScript/React)
React Hook for Streaming
DEVELOPERtypescriptimport { useState, useCallback, useRef } from 'react'; interface StreamingMessage { content: string; isComplete: boolean; sources: string[]; error?: string; } interface UseStreamingChatOptions { channelId: string; onToken?: (token: string) => void; onComplete?: (response: string) => void; onError?: (error: Error) => void; } export function useStreamingChat({ channelId, onToken, onComplete, onError }: UseStreamingChatOptions) { const [message, setMessage] = useState<StreamingMessage>({ content: '', isComplete: false, sources: [] }); const [isStreaming, setIsStreaming] = useState(false); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (query: string) => { // Cancel previous stream if exists if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setIsStreaming(true); setMessage({ content: '', isComplete: false, sources: [] }); try { const response = await fetch( `/api/chat/stream?query=${encodeURIComponent(query)}&channel_id=${channelId}`, { method: 'GET', headers: { 'Accept': 'text/event-stream' }, signal: abortControllerRef.current.signal } ); if (!response.ok) { throw new Error(`HTTP error: ${response.status}`); } const reader = response.body?.getReader(); const decoder = new TextDecoder(); if (!reader) { throw new Error('No response body'); } let buffer = ''; let fullContent = ''; let sources: string[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const events = buffer.split('\n\n'); buffer = events.pop() || ''; for (const eventStr of events) { if (!eventStr.trim()) continue; const event = parseSSEEvent(eventStr); switch (event.type) { case 'metadata': sources = event.data.sources; setMessage(prev => ({ ...prev, sources })); break; case 'token': fullContent += event.data.content; setMessage(prev => ({ ...prev, content: fullContent })); onToken?.(event.data.content); break; case 'done': setMessage(prev => ({ ...prev, isComplete: true })); onComplete?.(fullContent); break; case 'error': setMessage(prev => ({ ...prev, error: event.data.message })); onError?.(new Error(event.data.message)); break; } } } } catch (error) { if ((error as Error).name !== 'AbortError') { setMessage(prev => ({ ...prev, error: (error as Error).message })); onError?.(error as Error); } } finally { setIsStreaming(false); } }, [channelId, onToken, onComplete, onError]); const stopStreaming = useCallback(() => { abortControllerRef.current?.abort(); setIsStreaming(false); }, []); return { message, isStreaming, sendMessage, stopStreaming }; } function parseSSEEvent(eventStr: string): { type: string; data: any } { const lines = eventStr.split('\n'); let eventType = 'message'; let data = ''; for (const line of lines) { if (line.startsWith('event: ')) { eventType = line.slice(7); } else if (line.startsWith('data: ')) { data = line.slice(6); } } return { type: eventType, data: JSON.parse(data) }; }
Chat Component with Streaming
DEVELOPERtypescriptimport { useStreamingChat } from './useStreamingChat'; interface StreamingChatProps { channelId: string; } export function StreamingChat({ channelId }: StreamingChatProps) { const [input, setInput] = useState(''); const { message, isStreaming, sendMessage, stopStreaming } = useStreamingChat({ channelId, onComplete: (response) => { console.log('Response complete:', response.length, 'characters'); } }); const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); if (input.trim() && !isStreaming) { sendMessage(input); setInput(''); } }; return ( <div className="streaming-chat"> <div className="messages"> {message.content && ( <div className="message assistant"> <StreamingText text={message.content} isStreaming={isStreaming} /> {message.isComplete && message.sources.length > 0 && ( <SourcesList sources={message.sources} /> )} {message.error && ( <ErrorMessage error={message.error} /> )} </div> )} </div> <form onSubmit={handleSubmit} className="input-form"> <input type="text" value={input} onChange={(e) => setInput(e.target.value)} placeholder="Ask your question..." disabled={isStreaming} /> {isStreaming ? ( <button type="button" onClick={stopStreaming}> Stop </button> ) : ( <button type="submit" disabled={!input.trim()}> Send </button> )} </form> </div> ); } function StreamingText({ text, isStreaming }: { text: string; isStreaming: boolean; }) { return ( <div className="streaming-text"> {text} {isStreaming && <span className="cursor">▊</span>} </div> ); }
Advanced Optimizations
1. Context Prefetch
Start retrieval before user finishes typing:
DEVELOPERtypescriptimport { useDeferredValue, useEffect, useRef } from 'react'; function usePrefetchContext(query: string, channelId: string) { const deferredQuery = useDeferredValue(query); const cacheRef = useRef<Map<string, any>>(new Map()); useEffect(() => { if (deferredQuery.length < 3) return; const cacheKey = `${channelId}:${deferredQuery}`; if (cacheRef.current.has(cacheKey)) return; // Background prefetch fetch(`/api/context/prefetch?q=${encodeURIComponent(deferredQuery)}&channel=${channelId}`) .then(res => res.json()) .then(context => { cacheRef.current.set(cacheKey, context); }) .catch(() => {}); // Silently fail }, [deferredQuery, channelId]); return cacheRef.current; }
2. Smart Buffering
Accumulate tokens for smoother display:
DEVELOPERtypescriptclass TokenBuffer { private buffer: string[] = []; private flushCallback: (text: string) => void; private flushInterval: number; private timer: NodeJS.Timeout | null = null; constructor( flushCallback: (text: string) => void, flushInterval: number = 50 // ms ) { this.flushCallback = flushCallback; this.flushInterval = flushInterval; } add(token: string) { this.buffer.push(token); // Immediate flush for punctuation characters if (/[.!?\n]$/.test(token)) { this.flush(); return; } // Otherwise, delayed flush if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.buffer.length > 0) { this.flushCallback(this.buffer.join('')); this.buffer = []; } } }
3. Automatic Reconnection
DEVELOPERtypescriptclass ResilientEventSource { private url: string; private onMessage: (event: any) => void; private onError: (error: Error) => void; private maxRetries: number; private retryCount: number = 0; private eventSource: EventSource | null = null; constructor(options: { url: string; onMessage: (event: any) => void; onError: (error: Error) => void; maxRetries?: number; }) { this.url = options.url; this.onMessage = options.onMessage; this.onError = options.onError; this.maxRetries = options.maxRetries || 3; this.connect(); } private connect() { this.eventSource = new EventSource(this.url); this.eventSource.onmessage = (event) => { this.retryCount = 0; // Reset on success this.onMessage(JSON.parse(event.data)); }; this.eventSource.onerror = (error) => { this.eventSource?.close(); if (this.retryCount < this.maxRetries) { this.retryCount++; const delay = Math.pow(2, this.retryCount) * 1000; setTimeout(() => this.connect(), delay); } else { this.onError(new Error('Max retries exceeded')); } }; } close() { this.eventSource?.close(); } }
Monitoring and Metrics
Key Metrics to Track
DEVELOPERpythonfrom prometheus_client import Histogram, Counter, Gauge # First token latency TTFB_HISTOGRAM = Histogram( 'rag_streaming_ttfb_seconds', 'Time to first byte', buckets=[0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 5.0] ) # Total streaming duration STREAM_DURATION = Histogram( 'rag_streaming_duration_seconds', 'Total streaming duration', buckets=[1, 2, 5, 10, 20, 30, 60] ) # Streaming errors STREAM_ERRORS = Counter( 'rag_streaming_errors_total', 'Total streaming errors', ['error_type'] ) # Active connections ACTIVE_STREAMS = Gauge( 'rag_streaming_active_connections', 'Number of active streaming connections' ) class MetricCollector: def __init__(self): self.start_time = None async def track_stream(self, stream_generator): self.start_time = time.time() ACTIVE_STREAMS.inc() first_token = True try: async for event in stream_generator: if first_token and event.type == 'token': TTFB_HISTOGRAM.observe(time.time() - self.start_time) first_token = False yield event except Exception as e: STREAM_ERRORS.labels(error_type=type(e).__name__).inc() raise finally: STREAM_DURATION.observe(time.time() - self.start_time) ACTIVE_STREAMS.dec()
Monitoring Dashboard
DEVELOPERpython# Endpoints for monitoring @app.get("/metrics/streaming") async def streaming_metrics(): return { "active_connections": ACTIVE_STREAMS._value.get(), "avg_ttfb_ms": TTFB_HISTOGRAM._sum.get() / max(TTFB_HISTOGRAM._count.get(), 1) * 1000, "error_rate": STREAM_ERRORS._value.get() / max(total_streams, 1), "p95_duration_s": calculate_p95(STREAM_DURATION) }
Integration with Ailog
Ailog natively handles streaming with:
- Optimized SSE with automatic reconnection
- Smart buffering for smooth UX
- Built-in metrics (TTFB, errors, duration)
- Ready-to-use React SDK
DEVELOPERtypescriptimport { useAilogChat } from '@ailog/react'; function ChatWidget() { const { messages, isStreaming, sendMessage } = useAilogChat({ channelId: 'your-channel', streaming: true, // Enabled by default onStreamStart: () => console.log('Stream started'), onStreamEnd: () => console.log('Stream ended') }); // Component automatically displays streaming }
Conclusion
Streaming transforms your RAG user experience. Key points:
- SSE is the recommended protocol for simplicity
- Robust error handling with reconnection
- Buffering for smooth display
- Metrics to monitor performance
- Prefetch to further reduce latency
Additional Resources
- Introduction to RAG - Fundamentals
- LLM Generation for RAG - Parent guide
- Reduce RAG Latency - Optimizations
- RAG Prompt Engineering - Optimize prompts
Want turnkey streaming? Try Ailog - optimized streaming, automatic reconnection, React SDK included.
Tags
Related Posts
RAG Agents: Orchestrating Multi-Agent Systems
Architect multi-agent RAG systems: orchestration, specialization, collaboration and failure handling for complex assistants.
Conversational RAG: Memory and Multi-Session Context
Implement RAG with conversational memory: context management, multi-session history, and personalized responses.
Agentic RAG: Building AI Agents with Dynamic Knowledge Retrieval
Comprehensive guide to Agentic RAG: architecture, design patterns, implementing autonomous agents with knowledge retrieval, multi-tool orchestration, and advanced use cases.