Context Window Optimization: Managing Token Limits
Strategies for fitting more information in limited context windows: compression, summarization, smart selection, and window management techniques.
The Context Window Challenge
LLMs have fixed context windows:
| Model | Context Window |
|---|---|
| GPT-3.5 Turbo | 16K tokens |
| GPT-4 | 8K / 32K tokens |
| GPT-4 Turbo | 128K tokens |
| Claude 2 | 100K tokens |
| Claude 3 | 200K tokens |
| Llama 2 | 4K tokens |
| Gemini 1.5 Pro | 1M tokens |
The problem:
System prompt: 500 tokens
User query: 100 tokens
Retrieved contexts: 5 chunks × 512 tokens = 2,560 tokens
Conversation history: 1,000 tokens
─────────────────────────────────────────
Total input: 4,160 tokens
Max output: 1,000 tokens
─────────────────────────────────────────
Total: 5,160 tokens (fits in 8K window)
As your RAG system scales:
- More retrieved chunks
- Longer conversations
- More complex prompts
- Larger documents
You'll hit context limits. Optimization is essential.
Token Counting
Accurate Token Counting
DEVELOPERpythonimport tiktoken def count_tokens(text: str, model="gpt-4") -> int: encoding = tiktoken.encoding_for_model(model) return len(encoding.encode(text)) # Example text = "Hello, how are you?" tokens = count_tokens(text) # ~5 tokens
Context Budget Calculation
DEVELOPERpythonclass ContextBudget: def __init__(self, model="gpt-4", max_output_tokens=1000): self.model = model self.max_output = max_output_tokens # Context windows self.windows = { "gpt-3.5-turbo": 16385, "gpt-4": 8192, "gpt-4-32k": 32768, "gpt-4-turbo": 128000, "claude-2": 100000, "claude-3": 200000, } self.total_window = self.windows.get(model, 8192) self.available_input = self.total_window - max_output_tokens def allocate(self, system=500, query=200, history=1000): """ Allocate tokens for different components """ fixed_tokens = system + query + history available_for_context = self.available_input - fixed_tokens return { 'system_prompt': system, 'query': query, 'history': history, 'context': available_for_context, 'output': self.max_output, 'total': fixed_tokens + available_for_context + self.max_output } # Usage budget = ContextBudget(model="gpt-4") allocation = budget.allocate() print(f"Available for retrieved context: {allocation['context']} tokens") # Available for retrieved context: 5492 tokens
Chunk Selection Strategies
Top-K with Token Limit
DEVELOPERpythondef select_chunks_within_budget(chunks: List[dict], budget: int, model="gpt-4") -> List[dict]: """ Select as many top chunks as possible within token budget """ selected = [] total_tokens = 0 for chunk in chunks: chunk_tokens = count_tokens(chunk['content'], model) if total_tokens + chunk_tokens <= budget: selected.append(chunk) total_tokens += chunk_tokens else: break return selected # Usage chunks = retriever.retrieve(query, k=20) # Get more than needed selected = select_chunks_within_budget(chunks, budget=5000) # Might return 8-12 chunks depending on length
Priority-Based Selection
DEVELOPERpythondef priority_select_chunks(chunks: List[dict], budget: int, weights: dict) -> List[dict]: """ Select chunks based on multiple criteria """ # Score chunks for chunk in chunks: score = ( weights['relevance'] * chunk['similarity_score'] + weights['recency'] * chunk['recency_score'] + weights['authority'] * chunk['authority_score'] ) chunk['priority_score'] = score # Sort by priority sorted_chunks = sorted(chunks, key=lambda x: x['priority_score'], reverse=True) # Select within budget return select_chunks_within_budget(sorted_chunks, budget)
Compression Techniques
Extractive Summarization
Extract only relevant sentences.
DEVELOPERpythonfrom transformers import pipeline summarizer = pipeline("summarization", model="facebook/bart-large-cnn") def compress_chunk(chunk: str, max_length: int = 130) -> str: """ Compress chunk while preserving key information """ summary = summarizer( chunk, max_length=max_length, min_length=30, do_sample=False ) return summary[0]['summary_text'] # Usage original = "Very long chunk of text..." # 500 tokens compressed = compress_chunk(original, max_length=130) # ~100 tokens
LLM-Based Compression
DEVELOPERpythonasync def llm_compress_context(query: str, chunk: str, llm) -> str: """ Use LLM to extract only relevant information """ prompt = f"""Extract only the information relevant to the query from this text. Query: {query} Text: {chunk} Relevant excerpt:""" return await llm.generate(prompt, max_tokens=200) # Example query = "How do I reset my password?" chunk = "Our system offers many features including user management, password reset, data export..." compressed = await llm_compress_context(query, chunk, llm) # "Password reset: Click 'Forgot Password' on the login page..."
Semantic Compression
Remove redundant information across chunks.
DEVELOPERpythondef remove_redundant_chunks(chunks: List[str], threshold=0.85) -> List[str]: """ Remove chunks with high semantic overlap """ embeddings = embed_batch(chunks) selected = [0] # Always keep first (highest relevance) for i in range(1, len(chunks)): # Check similarity to already selected max_similarity = max( cosine_similarity(embeddings[i], embeddings[j]) for j in selected ) # Only add if sufficiently different if max_similarity < threshold: selected.append(i) return [chunks[i] for i in selected]
Sliding Window Approaches
Chunked Processing
Process long documents in windows.
DEVELOPERpythondef sliding_window_qa(query: str, long_document: str, window_size=2000, stride=1000): """ Process long document with sliding window """ answers = [] # Create windows tokens = tokenize(long_document) for i in range(0, len(tokens), stride): window = tokens[i:i + window_size] window_text = detokenize(window) # Generate answer for this window answer = llm.generate( query=query, context=window_text ) if answer and answer != "Information not found": answers.append({ 'answer': answer, 'position': i, 'confidence': estimate_confidence(answer) }) # Combine answers (take highest confidence or synthesize) return best_answer(answers)
Hierarchical Processing
Process at multiple granularities.
DEVELOPERpythonasync def hierarchical_processing(query: str, document: str, llm): """ 1. Summarize entire document 2. Find relevant sections in summary 3. Process full sections for answer """ # Level 1: Document summary doc_summary = await llm.generate( f"Summarize this document:\n\n{document}", max_tokens=500 ) # Level 2: Identify relevant sections relevance_check = await llm.generate( f"Which parts of this summary are relevant to: {query}\n\nSummary: {doc_summary}", max_tokens=100 ) # Level 3: Process full relevant sections relevant_sections = extract_sections(document, relevance_check) # Generate final answer from relevant sections answer = await llm.generate( query=query, context=relevant_sections ) return answer
Conversation History Management
Fixed Window
Keep only recent history.
DEVELOPERpythonclass FixedWindowHistory: def __init__(self, max_turns=5): self.max_turns = max_turns self.history = [] def add_turn(self, query: str, answer: str): self.history.append({'query': query, 'answer': answer}) # Keep only recent turns if len(self.history) > self.max_turns: self.history = self.history[-self.max_turns:] def get_context(self) -> str: return "\n".join([ f"User: {turn['query']}\nAssistant: {turn['answer']}" for turn in self.history ])
Token-Based Window
Keep history within token budget.
DEVELOPERpythonclass TokenBudgetHistory: def __init__(self, max_tokens=2000, model="gpt-4"): self.max_tokens = max_tokens self.model = model self.history = [] def add_turn(self, query: str, answer: str): self.history.append({'query': query, 'answer': answer}) self._trim_to_budget() def _trim_to_budget(self): while self.history: context = self.get_context() tokens = count_tokens(context, self.model) if tokens <= self.max_tokens: break # Remove oldest turn self.history.pop(0) def get_context(self) -> str: return "\n".join([ f"User: {turn['query']}\nAssistant: {turn['answer']}" for turn in self.history ])
Summarized History
Summarize old history to save tokens.
DEVELOPERpythonclass SummarizedHistory: def __init__(self, llm, summary_threshold=10): self.llm = llm self.summary_threshold = summary_threshold self.summary = "" self.recent_history = [] async def add_turn(self, query: str, answer: str): self.recent_history.append({'query': query, 'answer': answer}) # When recent history gets long, summarize if len(self.recent_history) >= self.summary_threshold: await self._summarize_old_turns() async def _summarize_old_turns(self): # Summarize all but last 3 turns to_summarize = self.recent_history[:-3] if to_summarize: history_text = format_history(to_summarize) new_summary = await self.llm.generate( f"Summarize this conversation:\n\n{self.summary}\n\n{history_text}", max_tokens=200 ) self.summary = new_summary self.recent_history = self.recent_history[-3:] async def get_context(self) -> str: recent = format_history(self.recent_history) if self.summary: return f"Earlier: {self.summary}\n\nRecent:\n{recent}" else: return recent
Prompt Optimization
Template Compression
DEVELOPERpython# Verbose prompt (wasteful) verbose_prompt = """ You are a helpful AI assistant. Your job is to answer questions based on the provided context. Please read the context carefully and provide accurate answers. If you don't know the answer, say so. Always be polite and professional. Context: {context} Question: {query} Answer: """ # Compressed prompt (efficient) compressed_prompt = """Answer based on context. Say "I don't know" if uncertain. Context: {context} Q: {query} A:""" # Token savings: ~50 tokens per query
Dynamic Prompts
Adjust prompt based on query complexity.
DEVELOPERpythondef get_optimal_prompt(query: str, context: str, complexity: str) -> str: if complexity == "simple": # Minimal prompt for simple queries return f"Context: {context}\n\nQ: {query}\nA:" elif complexity == "medium": # Standard prompt return f"Answer based on context:\n\n{context}\n\nQ: {query}\nA:" else: # Detailed prompt for complex queries return f"""Analyze the context carefully and provide a detailed answer. Context: {context} Question: {query} Detailed answer:"""
Adaptive Context Loading
Lazy Loading
Load context incrementally.
DEVELOPERpythonasync def adaptive_context_loading(query: str, vector_db, llm, max_chunks=10): """ Start with few chunks, add more if needed """ chunk_counts = [3, 5, 8, max_chunks] for num_chunks in chunk_counts: # Retrieve chunks chunks = await vector_db.search(query, k=num_chunks) context = format_chunks(chunks) # Generate answer answer = await llm.generate(query=query, context=context) # Check confidence confidence = await estimate_confidence(answer, llm) if confidence > 0.8: return answer # Good enough # Used all chunks, return best effort return answer
Confidence-Based Retrieval
DEVELOPERpythonasync def confidence_based_retrieval(query: str, vector_db, llm): """ Retrieve more context if initial answer has low confidence """ # Start with top 3 chunks = await vector_db.search(query, k=3) context = format_chunks(chunks) answer = await llm.generate(query=query, context=context) confidence = await estimate_confidence(answer, llm) # If low confidence, retrieve more if confidence < 0.6: additional_chunks = await vector_db.search(query, k=7) chunks.extend(additional_chunks[3:]) # Skip first 3 (duplicates) context = format_chunks(chunks) answer = await llm.generate(query=query, context=context) return answer
Multi-Turn Optimization
Context Carryover
Avoid re-sending unchanged context.
DEVELOPERpythonclass EfficientConversation: def __init__(self, llm): self.llm = llm self.static_context = None self.conversation_history = [] async def query(self, user_query: str, retrieve_new_context=True): # Retrieve context only if query changed topic if retrieve_new_context: self.static_context = await retrieve_context(user_query) # Build minimal prompt prompt = f"""Context (same as before): [Ref: {hash(self.static_context)}] Previous conversation: {format_recent_history(self.conversation_history[-2:])} New question: {user_query} Answer:""" answer = await self.llm.generate(prompt) self.conversation_history.append({ 'query': user_query, 'answer': answer }) return answer
Monitoring Token Usage
DEVELOPERpythonclass TokenUsageTracker: def __init__(self): self.usage = [] def track(self, prompt_tokens: int, completion_tokens: int, model: str): self.usage.append({ 'timestamp': time.time(), 'prompt_tokens': prompt_tokens, 'completion_tokens': completion_tokens, 'total_tokens': prompt_tokens + completion_tokens, 'model': model }) def get_stats(self): if not self.usage: return {} total_tokens = sum(u['total_tokens'] for u in self.usage) avg_prompt = np.mean([u['prompt_tokens'] for u in self.usage]) avg_completion = np.mean([u['completion_tokens'] for u in self.usage]) return { 'total_tokens': total_tokens, 'avg_prompt_tokens': avg_prompt, 'avg_completion_tokens': avg_completion, 'num_requests': len(self.usage) } # Usage tracker = TokenUsageTracker() response = await llm.generate(prompt) tracker.track( prompt_tokens=count_tokens(prompt), completion_tokens=count_tokens(response), model="gpt-4" ) stats = tracker.get_stats() print(f"Average prompt tokens: {stats['avg_prompt_tokens']}")
Best Practices
- Measure first: Count tokens before optimizing
- Allocate budgets: Reserve tokens for each component
- Compress intelligently: Only compress what won't hurt quality
- Trim history: Don't send full conversation every time
- Start small: Retrieve fewer chunks, expand if needed
- Monitor usage: Track token consumption over time
- Test impact: Ensure compression doesn't hurt quality
Trade-offs
| Strategy | Token Savings | Quality Impact | Complexity |
|---|---|---|---|
| Chunk selection | 20-40% | Low | Low |
| Extractive summary | 50-70% | Medium | Medium |
| LLM compression | 60-80% | Low-Medium | Medium |
| Prompt optimization | 10-30% | Low | Low |
| History summarization | 40-60% | Low | Medium |
| Adaptive loading | Variable | Low | High |
Next Steps
You now have a complete understanding of RAG fundamentals, from embeddings and chunking to production deployment and optimization. Apply these guides incrementally, measure results, and iterate based on your specific use case and constraints.
Tags
Articles connexes
Reduce RAG Latency: From 2000ms to 200ms
10x faster RAG: parallel retrieval, streaming responses, and architectural optimizations for sub-200ms latency.
RAG Monitoring and Observability
Monitor RAG systems in production: track latency, costs, accuracy, and user satisfaction with metrics and dashboards.
Caching Strategies to Reduce RAG Latency and Cost
Cut costs by 80%: implement semantic caching, embedding caching, and response caching for production RAG.