LangGraph: Complex RAG Workflows
Complete guide to building advanced RAG pipelines with LangGraph: decision graphs, conditional loops, multi-step orchestration and state management.
LangGraph: Complex RAG Workflows
LangGraph is LangChain's framework for building stateful LLM applications with complex workflows. Unlike linear chains, LangGraph allows you to create graphs with loops, conditional branches, and advanced state management. It's the ideal tool for RAG systems that need more than simple retrieve-then-generate.
Why LangGraph for RAG?
Limitations of Linear Pipelines
A classic RAG follows a simple flow:
Query → Retrieve → Generate → Response
But real cases are more complex:
- What if retrieved documents aren't relevant?
- How to handle questions requiring multiple searches?
- How to integrate human validation?
- How to route to different strategies based on the question?
What LangGraph Provides
| Feature | Classic Chain | LangGraph |
|---|---|---|
| Linear flow | Yes | Yes |
| Conditional branches | Limited | Native |
| Loops | No | Yes |
| Persistent state | No | Yes |
| Parallelism | Manual | Native |
| Checkpoints | No | Yes |
| Human-in-the-loop | Difficult | Integrated |
Fundamental Concepts
The Graph
DEVELOPERpythonfrom langgraph.graph import StateGraph, END from typing import TypedDict, List, Annotated import operator # Define graph state class RAGState(TypedDict): query: str documents: List[str] answer: str relevance_score: float retry_count: int messages: Annotated[List[str], operator.add] # Create graph workflow = StateGraph(RAGState)
Nodes
Each node is a function that transforms state:
DEVELOPERpythondef retrieve_documents(state: RAGState) -> RAGState: """Document retrieval node.""" query = state["query"] # Retrieval logic docs = vector_store.similarity_search(query, k=5) return { **state, "documents": [doc.page_content for doc in docs], "messages": [f"Retrieved {len(docs)} documents"] } def generate_answer(state: RAGState) -> RAGState: """Answer generation node.""" query = state["query"] context = "\n\n".join(state["documents"]) response = llm.invoke(f""" Context: {context} Question: {query} Answer based only on the context provided. """) return { **state, "answer": response.content, "messages": ["Generated answer"] } # Add nodes to graph workflow.add_node("retrieve", retrieve_documents) workflow.add_node("generate", generate_answer)
Edges
Edges define flow between nodes:
DEVELOPERpython# Simple edge workflow.add_edge("retrieve", "generate") # Conditional edge def should_retry(state: RAGState) -> str: if state["relevance_score"] < 0.5 and state["retry_count"] < 3: return "retrieve" # Back to retrieval return "generate" # Continue to generation workflow.add_conditional_edges( "check_relevance", should_retry, { "retrieve": "retrieve", "generate": "generate" } )
Advanced RAG Patterns
Pattern 1: Self-Correcting RAG
A RAG that self-corrects if documents aren't relevant:
DEVELOPERpythonfrom langgraph.graph import StateGraph, END from typing import TypedDict, List class SelfCorrectingState(TypedDict): query: str original_query: str documents: List[str] answer: str is_relevant: bool retry_count: int def retrieve(state: SelfCorrectingState) -> SelfCorrectingState: docs = vector_store.similarity_search(state["query"], k=5) return {**state, "documents": [d.page_content for d in docs]} def check_relevance(state: SelfCorrectingState) -> SelfCorrectingState: """Check if documents are relevant.""" prompt = f""" Query: {state['query']} Documents: {chr(10).join(state['documents'][:3])} Are these documents relevant to answer the query? Answer only 'yes' or 'no'. """ response = llm.invoke(prompt) is_relevant = "yes" in response.content.lower() return {**state, "is_relevant": is_relevant} def rewrite_query(state: SelfCorrectingState) -> SelfCorrectingState: """Rewrite query to improve retrieval.""" prompt = f""" Original query: {state['original_query']} Previous attempt: {state['query']} The retrieved documents were not relevant. Rewrite the query to find better documents. Focus on key concepts and use different terms. Rewritten query: """ response = llm.invoke(prompt) new_query = response.content.strip() return { **state, "query": new_query, "retry_count": state["retry_count"] + 1 } def generate(state: SelfCorrectingState) -> SelfCorrectingState: context = "\n\n".join(state["documents"]) prompt = f""" Context: {context} Question: {state['original_query']} Provide a comprehensive answer based on the context. """ response = llm.invoke(prompt) return {**state, "answer": response.content} def route_after_relevance_check(state: SelfCorrectingState) -> str: if state["is_relevant"]: return "generate" elif state["retry_count"] < 3: return "rewrite" else: return "generate" # Generate anyway after 3 attempts # Build graph workflow = StateGraph(SelfCorrectingState) workflow.add_node("retrieve", retrieve) workflow.add_node("check_relevance", check_relevance) workflow.add_node("rewrite", rewrite_query) workflow.add_node("generate", generate) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "check_relevance") workflow.add_conditional_edges( "check_relevance", route_after_relevance_check, {"generate": "generate", "rewrite": "rewrite"} ) workflow.add_edge("rewrite", "retrieve") workflow.add_edge("generate", END) app = workflow.compile()
Pattern 2: Multi-Query RAG
Decomposes a complex question into sub-questions:
DEVELOPERpythonclass MultiQueryState(TypedDict): original_query: str sub_queries: List[str] all_documents: List[str] sub_answers: List[str] final_answer: str def decompose_query(state: MultiQueryState) -> MultiQueryState: """Decompose question into sub-questions.""" prompt = f""" Complex question: {state['original_query']} Break this down into 2-4 simpler sub-questions that, when answered together, will provide a complete answer. Format: One question per line, no numbering. """ response = llm.invoke(prompt) sub_queries = [q.strip() for q in response.content.split('\n') if q.strip()] return {**state, "sub_queries": sub_queries} def retrieve_for_all_queries(state: MultiQueryState) -> MultiQueryState: """Retrieve documents for each sub-question.""" all_docs = [] for query in state["sub_queries"]: docs = vector_store.similarity_search(query, k=3) all_docs.extend([d.page_content for d in docs]) # Deduplicate unique_docs = list(set(all_docs)) return {**state, "all_documents": unique_docs} def answer_sub_queries(state: MultiQueryState) -> MultiQueryState: """Answer each sub-question.""" context = "\n\n".join(state["all_documents"]) answers = [] for query in state["sub_queries"]: prompt = f""" Context: {context} Question: {query} Brief answer: """ response = llm.invoke(prompt) answers.append(f"Q: {query}\nA: {response.content}") return {**state, "sub_answers": answers} def synthesize_final_answer(state: MultiQueryState) -> MultiQueryState: """Synthesize answers into final response.""" prompt = f""" Original question: {state['original_query']} Sub-questions and answers: {chr(10).join(state['sub_answers'])} Synthesize a comprehensive final answer that addresses the original question using all the information above. """ response = llm.invoke(prompt) return {**state, "final_answer": response.content} # Build graph workflow = StateGraph(MultiQueryState) workflow.add_node("decompose", decompose_query) workflow.add_node("retrieve", retrieve_for_all_queries) workflow.add_node("answer_sub", answer_sub_queries) workflow.add_node("synthesize", synthesize_final_answer) workflow.set_entry_point("decompose") workflow.add_edge("decompose", "retrieve") workflow.add_edge("retrieve", "answer_sub") workflow.add_edge("answer_sub", "synthesize") workflow.add_edge("synthesize", END) app = workflow.compile()
Pattern 3: Adaptive RAG Router
Routes to different strategies based on question type:
DEVELOPERpythonclass AdaptiveRAGState(TypedDict): query: str query_type: str # factual, analytical, comparative, procedural documents: List[str] answer: str def classify_query(state: AdaptiveRAGState) -> AdaptiveRAGState: """Classify question type.""" prompt = f""" Classify this query into one category: - factual: Simple fact lookup (who, what, when, where) - analytical: Requires analysis or explanation (why, how does) - comparative: Comparing multiple things - procedural: Step-by-step instructions Query: {state['query']} Category (one word): """ response = llm.invoke(prompt) query_type = response.content.strip().lower() return {**state, "query_type": query_type} def factual_retrieve(state: AdaptiveRAGState) -> AdaptiveRAGState: """Retrieval for factual questions - maximum precision.""" docs = vector_store.similarity_search(state["query"], k=3) return {**state, "documents": [d.page_content for d in docs]} def analytical_retrieve(state: AdaptiveRAGState) -> AdaptiveRAGState: """Retrieval for analytical questions - broad context.""" docs = vector_store.similarity_search(state["query"], k=7) return {**state, "documents": [d.page_content for d in docs]} def route_by_query_type(state: AdaptiveRAGState) -> str: type_map = { "factual": "factual_retrieve", "analytical": "analytical_retrieve", "comparative": "comparative_retrieve", "procedural": "procedural_retrieve" } return type_map.get(state["query_type"], "factual_retrieve") # Build graph with conditional routing workflow = StateGraph(AdaptiveRAGState) workflow.add_node("classify", classify_query) workflow.add_node("factual_retrieve", factual_retrieve) workflow.add_node("analytical_retrieve", analytical_retrieve) workflow.add_node("generate", generate_answer) workflow.set_entry_point("classify") workflow.add_conditional_edges("classify", route_by_query_type, {...}) workflow.add_edge("generate", END) app = workflow.compile()
Human-in-the-loop
LangGraph allows adding human validation points:
DEVELOPERpythonfrom langgraph.checkpoint.sqlite import SqliteSaver # Configure checkpoint for persistence memory = SqliteSaver.from_conn_string(":memory:") app = workflow.compile(checkpointer=memory, interrupt_before=["request_review"]) # Execution pauses before request_review # Human can examine and provide feedback # Resume after feedback updated_state = {**result, "human_feedback": "Looks good, but add disclaimer"} final_result = app.invoke(updated_state, config)
Best Practices
1. Keep State Minimal
DEVELOPERpython# GOOD - Minimal state class MinimalState(TypedDict): query: str context: str answer: str # BAD - Too much data in state class BloatedState(TypedDict): all_embeddings: List[List[float]] # Too large full_documents: List[Document] # Keep only content
2. Name Nodes Clearly
DEVELOPERpython# GOOD workflow.add_node("retrieve_documents", retrieve_fn) workflow.add_node("check_relevance", check_fn) # BAD workflow.add_node("step1", retrieve_fn) workflow.add_node("step2", check_fn)
3. Handle Errors
DEVELOPERpythondef safe_retrieve(state: RAGState) -> RAGState: try: docs = vector_store.similarity_search(state["query"], k=5) return {**state, "documents": [d.page_content for d in docs]} except Exception as e: return {**state, "documents": [], "error": str(e)}
Costs and Performance
| Approach | Latency | Cost/request | Complexity |
|---|---|---|---|
| Simple RAG | 1-2s | $0.01 | Low |
| Self-correcting | 2-5s | $0.02-0.05 | Medium |
| Multi-query | 3-6s | $0.03-0.08 | Medium |
| Adaptive | 2-4s | $0.02-0.04 | High |
Integration with Ailog
Ailog uses LangGraph workflows internally for:
- Adaptive retrieval: Automatically routes by question type
- Self-correction: Retries with reformulation if relevance < threshold
- Multi-source: Merges results from multiple knowledge bases
Try advanced RAG workflows on Ailog
Related Guides
Tags
Related Posts
RAG Agents: Orchestrating Multi-Agent Systems
Architect multi-agent RAG systems: orchestration, specialization, collaboration and failure handling for complex assistants.
Agentic RAG: Building AI Agents with Dynamic Knowledge Retrieval
Comprehensive guide to Agentic RAG: architecture, design patterns, implementing autonomous agents with knowledge retrieval, multi-tool orchestration, and advanced use cases.
Diagrams and Schemas: Extracting Visual Information
Complete guide to integrating diagrams, technical schemas and infographics into your RAG system: extraction, interpretation and indexing with vision models.