GuideAdvanced

Streaming RAG: Implementing Real-Time Responses

March 14, 2026
20 min read
Ailog Team

Complete technical guide for implementing streaming in your RAG system: architecture, Python/TypeScript code, error handling, and UX optimization.

TL;DR

Streaming RAG allows displaying responses token by token instead of waiting for complete generation. Result: perceived latency divided by 5 and better user experience. This guide covers technical architecture, backend/frontend implementation, and advanced patterns for robust production streaming.

Why Streaming is Essential

The Latency Problem in RAG

A typical RAG pipeline takes 2-5 seconds before displaying anything:

User query
    ↓ 200ms - Query embedding
    ↓ 300ms - Vector search
    ↓ 100ms - Reranking
    ↓ 2000ms - LLM generation
    ↓ Complete response

With streaming:

User query
    ↓ 200ms - Query embedding
    ↓ 300ms - Vector search
    ↓ 100ms - Reranking
    ↓ 100ms - First token
    ↓ ... continuous tokens ...
    ↓ Complete response

Impact on User Experience

MetricWithout streamingWith streaming
Time to First Byte (TTFB)2500ms600ms
Perceived latencyHighLow
User abandonment15%3%
Satisfaction (NPS)+25+58

Streaming RAG Architecture

Overview

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Client    │←──→│   Backend   │←──→│     LLM     │
│  (Browser)  │SSE │  (FastAPI)  │    │  (OpenAI)   │
└─────────────┘    └─────────────┘    └─────────────┘
       ↑                  ↑                  ↑
       │                  │                  │
   Progressive        RAG +             Streaming
   display         orchestration        generation

Protocol Choice

ProtocolAdvantagesDisadvantagesUse case
SSESimple, native HTTPUnidirectionalRecommended
WebSocketBidirectionalComplexityInteractive chat
HTTP/2 PushPerformantLimited supportAdvanced cases

Backend Implementation (Python/FastAPI)

Basic Setup

DEVELOPERpython
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import OpenAI import json import asyncio from typing import AsyncGenerator app = FastAPI() client = OpenAI() class RAGStreamingPipeline: def __init__(self, vector_db, embedding_model, llm_client): self.vector_db = vector_db self.embedding_model = embedding_model self.llm_client = llm_client async def stream_response( self, query: str, system_prompt: str ) -> AsyncGenerator[str, None]: """Complete RAG pipeline with streaming.""" # 1. Retrieval (non-streamed) query_embedding = await self.embedding_model.embed(query) documents = await self.vector_db.search(query_embedding, top_k=5) # 2. Build context context = self._format_context(documents) # 3. Send metadata first yield self._format_sse_event("metadata", { "sources": [doc.source for doc in documents], "retrieval_time_ms": 350 }) # 4. Stream generation messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"} ] stream = self.llm_client.chat.completions.create( model="gpt-4", messages=messages, stream=True ) full_response = "" for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content full_response += token yield self._format_sse_event("token", {"content": token}) # 5. Send completion signal yield self._format_sse_event("done", { "total_tokens": len(full_response.split()), "generation_time_ms": 1500 }) def _format_context(self, documents: list) -> str: return "\n\n".join([ f"[{doc.source}]\n{doc.content}" for doc in documents ]) def _format_sse_event(self, event_type: str, data: dict) -> str: return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"

FastAPI Endpoint

DEVELOPERpython
from fastapi import FastAPI, Query from fastapi.responses import StreamingResponse app = FastAPI() pipeline = RAGStreamingPipeline(vector_db, embedding_model, llm_client) @app.get("/api/chat/stream") async def stream_chat( query: str = Query(..., description="User query"), channel_id: str = Query(..., description="Channel ID") ): """SSE endpoint for streaming RAG responses.""" # Load channel configuration channel_config = await get_channel_config(channel_id) async def event_generator(): try: async for event in pipeline.stream_response( query=query, system_prompt=channel_config.system_prompt ): yield event except Exception as e: yield pipeline._format_sse_event("error", { "message": str(e), "code": "GENERATION_ERROR" }) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } )

Error Handling in Streaming

DEVELOPERpython
class StreamingErrorHandler: """Robust error handling during streaming.""" @staticmethod async def safe_stream(generator, max_retries=3): """Wrapper to handle streaming errors.""" retries = 0 while retries < max_retries: try: async for event in generator: yield event return # Success except OpenAIRateLimitError: retries += 1 yield format_sse_event("warning", { "message": "Rate limit hit, retrying...", "retry": retries }) await asyncio.sleep(2 ** retries) except OpenAIConnectionError: yield format_sse_event("error", { "message": "Connection lost. Please retry.", "recoverable": True }) return except Exception as e: yield format_sse_event("error", { "message": "An error occurred.", "code": type(e).__name__ }) return yield format_sse_event("error", { "message": "Max retries exceeded.", "recoverable": False })

Frontend Implementation (TypeScript/React)

React Hook for Streaming

DEVELOPERtypescript
import { useState, useCallback, useRef } from 'react'; interface StreamingMessage { content: string; isComplete: boolean; sources: string[]; error?: string; } interface UseStreamingChatOptions { channelId: string; onToken?: (token: string) => void; onComplete?: (response: string) => void; onError?: (error: Error) => void; } export function useStreamingChat({ channelId, onToken, onComplete, onError }: UseStreamingChatOptions) { const [message, setMessage] = useState<StreamingMessage>({ content: '', isComplete: false, sources: [] }); const [isStreaming, setIsStreaming] = useState(false); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (query: string) => { // Cancel previous stream if exists if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setIsStreaming(true); setMessage({ content: '', isComplete: false, sources: [] }); try { const response = await fetch( `/api/chat/stream?query=${encodeURIComponent(query)}&channel_id=${channelId}`, { method: 'GET', headers: { 'Accept': 'text/event-stream' }, signal: abortControllerRef.current.signal } ); if (!response.ok) { throw new Error(`HTTP error: ${response.status}`); } const reader = response.body?.getReader(); const decoder = new TextDecoder(); if (!reader) { throw new Error('No response body'); } let buffer = ''; let fullContent = ''; let sources: string[] = []; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const events = buffer.split('\n\n'); buffer = events.pop() || ''; for (const eventStr of events) { if (!eventStr.trim()) continue; const event = parseSSEEvent(eventStr); switch (event.type) { case 'metadata': sources = event.data.sources; setMessage(prev => ({ ...prev, sources })); break; case 'token': fullContent += event.data.content; setMessage(prev => ({ ...prev, content: fullContent })); onToken?.(event.data.content); break; case 'done': setMessage(prev => ({ ...prev, isComplete: true })); onComplete?.(fullContent); break; case 'error': setMessage(prev => ({ ...prev, error: event.data.message })); onError?.(new Error(event.data.message)); break; } } } } catch (error) { if ((error as Error).name !== 'AbortError') { setMessage(prev => ({ ...prev, error: (error as Error).message })); onError?.(error as Error); } } finally { setIsStreaming(false); } }, [channelId, onToken, onComplete, onError]); const stopStreaming = useCallback(() => { abortControllerRef.current?.abort(); setIsStreaming(false); }, []); return { message, isStreaming, sendMessage, stopStreaming }; } function parseSSEEvent(eventStr: string): { type: string; data: any } { const lines = eventStr.split('\n'); let eventType = 'message'; let data = ''; for (const line of lines) { if (line.startsWith('event: ')) { eventType = line.slice(7); } else if (line.startsWith('data: ')) { data = line.slice(6); } } return { type: eventType, data: JSON.parse(data) }; }

Chat Component with Streaming

DEVELOPERtypescript
import { useStreamingChat } from './useStreamingChat'; interface StreamingChatProps { channelId: string; } export function StreamingChat({ channelId }: StreamingChatProps) { const [input, setInput] = useState(''); const { message, isStreaming, sendMessage, stopStreaming } = useStreamingChat({ channelId, onComplete: (response) => { console.log('Response complete:', response.length, 'characters'); } }); const handleSubmit = (e: React.FormEvent) => { e.preventDefault(); if (input.trim() && !isStreaming) { sendMessage(input); setInput(''); } }; return ( <div className="streaming-chat"> <div className="messages"> {message.content && ( <div className="message assistant"> <StreamingText text={message.content} isStreaming={isStreaming} /> {message.isComplete && message.sources.length > 0 && ( <SourcesList sources={message.sources} /> )} {message.error && ( <ErrorMessage error={message.error} /> )} </div> )} </div> <form onSubmit={handleSubmit} className="input-form"> <input type="text" value={input} onChange={(e) => setInput(e.target.value)} placeholder="Ask your question..." disabled={isStreaming} /> {isStreaming ? ( <button type="button" onClick={stopStreaming}> Stop </button> ) : ( <button type="submit" disabled={!input.trim()}> Send </button> )} </form> </div> ); } function StreamingText({ text, isStreaming }: { text: string; isStreaming: boolean; }) { return ( <div className="streaming-text"> {text} {isStreaming && <span className="cursor"></span>} </div> ); }

Advanced Optimizations

1. Context Prefetch

Start retrieval before user finishes typing:

DEVELOPERtypescript
import { useDeferredValue, useEffect, useRef } from 'react'; function usePrefetchContext(query: string, channelId: string) { const deferredQuery = useDeferredValue(query); const cacheRef = useRef<Map<string, any>>(new Map()); useEffect(() => { if (deferredQuery.length < 3) return; const cacheKey = `${channelId}:${deferredQuery}`; if (cacheRef.current.has(cacheKey)) return; // Background prefetch fetch(`/api/context/prefetch?q=${encodeURIComponent(deferredQuery)}&channel=${channelId}`) .then(res => res.json()) .then(context => { cacheRef.current.set(cacheKey, context); }) .catch(() => {}); // Silently fail }, [deferredQuery, channelId]); return cacheRef.current; }

2. Smart Buffering

Accumulate tokens for smoother display:

DEVELOPERtypescript
class TokenBuffer { private buffer: string[] = []; private flushCallback: (text: string) => void; private flushInterval: number; private timer: NodeJS.Timeout | null = null; constructor( flushCallback: (text: string) => void, flushInterval: number = 50 // ms ) { this.flushCallback = flushCallback; this.flushInterval = flushInterval; } add(token: string) { this.buffer.push(token); // Immediate flush for punctuation characters if (/[.!?\n]$/.test(token)) { this.flush(); return; } // Otherwise, delayed flush if (!this.timer) { this.timer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.buffer.length > 0) { this.flushCallback(this.buffer.join('')); this.buffer = []; } } }

3. Automatic Reconnection

DEVELOPERtypescript
class ResilientEventSource { private url: string; private onMessage: (event: any) => void; private onError: (error: Error) => void; private maxRetries: number; private retryCount: number = 0; private eventSource: EventSource | null = null; constructor(options: { url: string; onMessage: (event: any) => void; onError: (error: Error) => void; maxRetries?: number; }) { this.url = options.url; this.onMessage = options.onMessage; this.onError = options.onError; this.maxRetries = options.maxRetries || 3; this.connect(); } private connect() { this.eventSource = new EventSource(this.url); this.eventSource.onmessage = (event) => { this.retryCount = 0; // Reset on success this.onMessage(JSON.parse(event.data)); }; this.eventSource.onerror = (error) => { this.eventSource?.close(); if (this.retryCount < this.maxRetries) { this.retryCount++; const delay = Math.pow(2, this.retryCount) * 1000; setTimeout(() => this.connect(), delay); } else { this.onError(new Error('Max retries exceeded')); } }; } close() { this.eventSource?.close(); } }

Monitoring and Metrics

Key Metrics to Track

DEVELOPERpython
from prometheus_client import Histogram, Counter, Gauge # First token latency TTFB_HISTOGRAM = Histogram( 'rag_streaming_ttfb_seconds', 'Time to first byte', buckets=[0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 5.0] ) # Total streaming duration STREAM_DURATION = Histogram( 'rag_streaming_duration_seconds', 'Total streaming duration', buckets=[1, 2, 5, 10, 20, 30, 60] ) # Streaming errors STREAM_ERRORS = Counter( 'rag_streaming_errors_total', 'Total streaming errors', ['error_type'] ) # Active connections ACTIVE_STREAMS = Gauge( 'rag_streaming_active_connections', 'Number of active streaming connections' ) class MetricCollector: def __init__(self): self.start_time = None async def track_stream(self, stream_generator): self.start_time = time.time() ACTIVE_STREAMS.inc() first_token = True try: async for event in stream_generator: if first_token and event.type == 'token': TTFB_HISTOGRAM.observe(time.time() - self.start_time) first_token = False yield event except Exception as e: STREAM_ERRORS.labels(error_type=type(e).__name__).inc() raise finally: STREAM_DURATION.observe(time.time() - self.start_time) ACTIVE_STREAMS.dec()

Monitoring Dashboard

DEVELOPERpython
# Endpoints for monitoring @app.get("/metrics/streaming") async def streaming_metrics(): return { "active_connections": ACTIVE_STREAMS._value.get(), "avg_ttfb_ms": TTFB_HISTOGRAM._sum.get() / max(TTFB_HISTOGRAM._count.get(), 1) * 1000, "error_rate": STREAM_ERRORS._value.get() / max(total_streams, 1), "p95_duration_s": calculate_p95(STREAM_DURATION) }

Integration with Ailog

Ailog natively handles streaming with:

  • Optimized SSE with automatic reconnection
  • Smart buffering for smooth UX
  • Built-in metrics (TTFB, errors, duration)
  • Ready-to-use React SDK
DEVELOPERtypescript
import { useAilogChat } from '@ailog/react'; function ChatWidget() { const { messages, isStreaming, sendMessage } = useAilogChat({ channelId: 'your-channel', streaming: true, // Enabled by default onStreamStart: () => console.log('Stream started'), onStreamEnd: () => console.log('Stream ended') }); // Component automatically displays streaming }

Conclusion

Streaming transforms your RAG user experience. Key points:

  1. SSE is the recommended protocol for simplicity
  2. Robust error handling with reconnection
  3. Buffering for smooth display
  4. Metrics to monitor performance
  5. Prefetch to further reduce latency

Additional Resources


Want turnkey streaming? Try Ailog - optimized streaming, automatic reconnection, React SDK included.

Tags

RAGstreamingreal-timeSSEWebSocketperformanceUX

Related Posts

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !