LangGraph : Workflows RAG complexes
Construire des pipelines RAG avancés avec LangGraph. Graphes d'agents, états, conditions et orchestration de workflows complexes.
LangGraph : Workflows RAG complexes
LangGraph est un framework puissant pour construire des applications LLM avec état et multi-agents. Ce guide explore comment l'utiliser pour créer des pipelines RAG sophistiqués avec routage conditionnel, boucles de correction et orchestration d'agents.
Prérequis : Consultez les fondamentaux du RAG et notre guide sur l'orchestration d'agents RAG.
Pourquoi LangGraph pour le RAG ?
Limitations du RAG linéaire
| Approche | Limitation | Solution LangGraph |
|---|---|---|
| Pipeline séquentiel | Pas de retour en arrière | Cycles et boucles |
| Prompt unique | Pas d'adaptation | Routage conditionnel |
| Retrieval fixe | Résultats insuffisants | Re-retrieval dynamique |
| Réponse directe | Pas de vérification | Self-correction |
Architecture LangGraph
ARCHITECTURE LANGGRAPH
StateGraph
|
v
+-------+ +-------+ +-------+
| Node1 | --> | Node2 | --> | Node3 |
+-------+ +-------+ +-------+
| | |
+-------------+-------------+
|
v
+---------+
| State |
+---------+
(partagé entre tous les noeuds)
Avantages:
- État persistant entre les noeuds
- Cycles possibles (correction, retry)
- Conditions de routage
- Parallélisation
Concepts fondamentaux
State, Nodes et Edges
DEVELOPERpythonfrom typing import TypedDict, Annotated, List from langgraph.graph import StateGraph, END from langchain_core.messages import HumanMessage, AIMessage import operator # 1. Définition de l'état class RAGState(TypedDict): """État partagé du workflow RAG.""" # Messages de conversation messages: Annotated[List, operator.add] # Requête utilisateur query: str # Documents récupérés documents: List[dict] # Réponse générée response: str # Métadonnées retrieval_count: int is_relevant: bool needs_more_context: bool # 2. Définition des noeuds def retrieve_documents(state: RAGState) -> RAGState: """Noeud de récupération de documents.""" query = state["query"] # Simulation de retrieval documents = retriever.search(query, top_k=5) return { "documents": documents, "retrieval_count": state.get("retrieval_count", 0) + 1 } def grade_documents(state: RAGState) -> RAGState: """Noeud de notation des documents.""" documents = state["documents"] query = state["query"] # Évaluer la pertinence relevant_docs = [] for doc in documents: score = evaluate_relevance(doc, query) if score > 0.7: relevant_docs.append(doc) is_relevant = len(relevant_docs) >= 2 return { "documents": relevant_docs, "is_relevant": is_relevant } def generate_response(state: RAGState) -> RAGState: """Noeud de génération de réponse.""" query = state["query"] documents = state["documents"] # Construire le contexte context = "\n\n".join([doc["content"] for doc in documents]) # Générer la réponse prompt = f"""Context:\n{context}\n\nQuestion: {query}\n\nAnswer:""" response = llm.invoke(prompt) return { "response": response.content, "messages": [AIMessage(content=response.content)] } def check_hallucination(state: RAGState) -> RAGState: """Vérifie si la réponse contient des hallucinations.""" response = state["response"] documents = state["documents"] # Vérifier que la réponse est fondée sur les documents is_grounded = verify_grounding(response, documents) return { "needs_more_context": not is_grounded } # 3. Construction du graphe def build_rag_graph(): """Construit le graphe RAG avec LangGraph.""" workflow = StateGraph(RAGState) # Ajouter les noeuds workflow.add_node("retrieve", retrieve_documents) workflow.add_node("grade", grade_documents) workflow.add_node("generate", generate_response) workflow.add_node("check", check_hallucination) # Définir les edges workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "grade") # Edge conditionnel après la notation workflow.add_conditional_edges( "grade", lambda state: "generate" if state["is_relevant"] else "retrieve", { "generate": "generate", "retrieve": "retrieve" # Boucle de retry } ) workflow.add_edge("generate", "check") # Edge conditionnel après vérification workflow.add_conditional_edges( "check", lambda state: END if not state["needs_more_context"] else "retrieve", { END: END, "retrieve": "retrieve" # Retry si hallucination } ) return workflow.compile()
Patterns de workflows RAG
Pattern 1: Self-RAG (Correction automatique)
DEVELOPERpythonfrom langgraph.graph import StateGraph, END from typing import Literal class SelfRAGState(TypedDict): query: str documents: List[dict] response: str reflection: str iteration: int max_iterations: int def reflect_on_response(state: SelfRAGState) -> SelfRAGState: """Réflexion sur la qualité de la réponse.""" response = state["response"] query = state["query"] reflection_prompt = f""" Évalue cette réponse à la question. Question: {query} Réponse: {response} Critères: 1. La réponse est-elle complète? 2. Est-elle factuelle et vérifiable? 3. Répond-elle directement à la question? Si des améliorations sont nécessaires, explique lesquelles. Sinon, réponds "SATISFAISANT". """ reflection = llm.invoke(reflection_prompt) return { "reflection": reflection.content, "iteration": state["iteration"] + 1 } def should_continue(state: SelfRAGState) -> Literal["improve", "end"]: """Décide si on continue l'amélioration.""" if state["iteration"] >= state["max_iterations"]: return "end" if "SATISFAISANT" in state["reflection"]: return "end" return "improve" def improve_response(state: SelfRAGState) -> SelfRAGState: """Améliore la réponse basée sur la réflexion.""" improve_prompt = f""" Améliore cette réponse basée sur le feedback. Question originale: {state['query']} Réponse actuelle: {state['response']} Feedback: {state['reflection']} Nouvelle réponse améliorée: """ improved = llm.invoke(improve_prompt) return {"response": improved.content} def build_self_rag(): workflow = StateGraph(SelfRAGState) workflow.add_node("retrieve", retrieve_documents) workflow.add_node("generate", generate_response) workflow.add_node("reflect", reflect_on_response) workflow.add_node("improve", improve_response) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "generate") workflow.add_edge("generate", "reflect") workflow.add_conditional_edges( "reflect", should_continue, {"improve": "improve", "end": END} ) workflow.add_edge("improve", "reflect") # Boucle de correction return workflow.compile()
Pattern 2: Adaptive RAG (Routage intelligent)
DEVELOPERpythonclass AdaptiveRAGState(TypedDict): query: str query_type: str # "simple", "complex", "comparison" documents: List[dict] sub_queries: List[str] sub_answers: List[str] final_response: str def classify_query(state: AdaptiveRAGState) -> AdaptiveRAGState: """Classifie la complexité de la requête.""" query = state["query"] classification_prompt = f""" Classifie cette question: "{query}" Types possibles: - simple: Question factuelle directe - complex: Nécessite plusieurs étapes de raisonnement - comparison: Compare plusieurs éléments Réponds uniquement par le type. """ query_type = llm.invoke(classification_prompt).content.strip().lower() return {"query_type": query_type} def decompose_query(state: AdaptiveRAGState) -> AdaptiveRAGState: """Décompose une requête complexe en sous-requêtes.""" query = state["query"] decompose_prompt = f""" Décompose cette question complexe en sous-questions: "{query}" Liste chaque sous-question sur une ligne. """ result = llm.invoke(decompose_prompt) sub_queries = [q.strip() for q in result.content.split("\n") if q.strip()] return {"sub_queries": sub_queries} def process_sub_queries(state: AdaptiveRAGState) -> AdaptiveRAGState: """Traite chaque sous-requête séparément.""" sub_answers = [] for sub_query in state["sub_queries"]: # Retrieval pour chaque sous-requête docs = retriever.search(sub_query, top_k=3) context = "\n".join([d["content"] for d in docs]) answer = llm.invoke(f"Context: {context}\n\nQuestion: {sub_query}") sub_answers.append(answer.content) return {"sub_answers": sub_answers} def synthesize_response(state: AdaptiveRAGState) -> AdaptiveRAGState: """Synthétise les réponses partielles.""" synthesis_prompt = f""" Question originale: {state['query']} Réponses aux sous-questions: {chr(10).join([f'- {a}' for a in state['sub_answers']])} Synthétise une réponse complète et cohérente. """ final = llm.invoke(synthesis_prompt) return {"final_response": final.content} def route_by_complexity(state: AdaptiveRAGState) -> str: """Route selon la complexité de la requête.""" query_type = state["query_type"] if query_type == "simple": return "simple_path" elif query_type == "complex": return "complex_path" else: return "comparison_path" def build_adaptive_rag(): workflow = StateGraph(AdaptiveRAGState) # Noeuds workflow.add_node("classify", classify_query) workflow.add_node("simple_retrieve", retrieve_documents) workflow.add_node("simple_generate", generate_response) workflow.add_node("decompose", decompose_query) workflow.add_node("process_subs", process_sub_queries) workflow.add_node("synthesize", synthesize_response) # Edges workflow.set_entry_point("classify") workflow.add_conditional_edges( "classify", route_by_complexity, { "simple_path": "simple_retrieve", "complex_path": "decompose", "comparison_path": "decompose" } ) workflow.add_edge("simple_retrieve", "simple_generate") workflow.add_edge("simple_generate", END) workflow.add_edge("decompose", "process_subs") workflow.add_edge("process_subs", "synthesize") workflow.add_edge("synthesize", END) return workflow.compile()
Pattern 3: Multi-Agent RAG
DEVELOPERpythonclass MultiAgentState(TypedDict): query: str documents: List[dict] researcher_notes: str writer_draft: str reviewer_feedback: str final_response: str revision_count: int def researcher_agent(state: MultiAgentState) -> MultiAgentState: """Agent chercheur: analyse les documents.""" docs = state["documents"] researcher_prompt = """ Tu es un chercheur. Analyse ces documents et extrais: 1. Les faits clés 2. Les sources importantes 3. Les points de consensus et de désaccord Documents: {docs} Notes de recherche: """ notes = llm.invoke(researcher_prompt.format( docs="\n".join([d["content"] for d in docs]) )) return {"researcher_notes": notes.content} def writer_agent(state: MultiAgentState) -> MultiAgentState: """Agent rédacteur: écrit une réponse.""" writer_prompt = f""" Tu es un rédacteur. Utilise ces notes de recherche pour répondre: Question: {state['query']} Notes de recherche: {state['researcher_notes']} Rédige une réponse claire et structurée. """ draft = llm.invoke(writer_prompt) return {"writer_draft": draft.content} def reviewer_agent(state: MultiAgentState) -> MultiAgentState: """Agent relecteur: vérifie la qualité.""" reviewer_prompt = f""" Tu es un relecteur exigeant. Évalue cette réponse: Question: {state['query']} Réponse: {state['writer_draft']} Critères: 1. Exactitude factuelle 2. Clarté 3. Complétude 4. Style Si acceptable, réponds "APPROUVÉ". Sinon, donne des suggestions d'amélioration. """ feedback = llm.invoke(reviewer_prompt) return { "reviewer_feedback": feedback.content, "revision_count": state.get("revision_count", 0) + 1 } def revise_draft(state: MultiAgentState) -> MultiAgentState: """Révise le brouillon basé sur le feedback.""" revise_prompt = f""" Révise cette réponse selon le feedback: Réponse actuelle: {state['writer_draft']} Feedback: {state['reviewer_feedback']} Réponse révisée: """ revised = llm.invoke(revise_prompt) return {"writer_draft": revised.content} def finalize_response(state: MultiAgentState) -> MultiAgentState: """Finalise la réponse.""" return {"final_response": state["writer_draft"]} def should_revise(state: MultiAgentState) -> str: """Décide si une révision est nécessaire.""" if "APPROUVÉ" in state["reviewer_feedback"]: return "finalize" if state["revision_count"] >= 3: return "finalize" return "revise" def build_multi_agent_rag(): workflow = StateGraph(MultiAgentState) workflow.add_node("retrieve", retrieve_documents) workflow.add_node("research", researcher_agent) workflow.add_node("write", writer_agent) workflow.add_node("review", reviewer_agent) workflow.add_node("revise", revise_draft) workflow.add_node("finalize", finalize_response) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "research") workflow.add_edge("research", "write") workflow.add_edge("write", "review") workflow.add_conditional_edges( "review", should_revise, {"revise": "revise", "finalize": "finalize"} ) workflow.add_edge("revise", "review") # Boucle de révision workflow.add_edge("finalize", END) return workflow.compile()
Exécution et monitoring
Streaming des résultats
DEVELOPERpythonasync def stream_rag_response( graph, query: str ): """Stream les résultats du workflow RAG.""" initial_state = { "query": query, "documents": [], "response": "", "messages": [HumanMessage(content=query)], "retrieval_count": 0, "is_relevant": False, "needs_more_context": False } async for event in graph.astream(initial_state): # event contient le nom du noeud et l'état mis à jour node_name = list(event.keys())[0] state_update = event[node_name] yield { "node": node_name, "update": state_update } # Utilisation async def main(): graph = build_rag_graph() async for update in stream_rag_response(graph, "Qu'est-ce que le RAG?"): print(f"[{update['node']}] {update['update']}")
Persistance de l'état
DEVELOPERpythonfrom langgraph.checkpoint.sqlite import SqliteSaver def build_persistent_rag(): """RAG avec persistance de l'état pour conversations multi-tours.""" workflow = StateGraph(RAGState) # ... configuration du workflow ... # Ajouter un checkpointer pour la persistance memory = SqliteSaver.from_conn_string(":memory:") return workflow.compile(checkpointer=memory) # Utilisation avec thread_id pour conversations graph = build_persistent_rag() # Premier message config = {"configurable": {"thread_id": "user_123"}} result1 = graph.invoke({"query": "Explique le RAG"}, config) # Message de suivi (même thread) result2 = graph.invoke({"query": "Et les embeddings?"}, config)
Bonnes pratiques
Gestion des erreurs
DEVELOPERpythonfrom langgraph.graph import StateGraph class SafeRAGState(TypedDict): query: str documents: List[dict] response: str error: str retry_count: int def safe_retrieve(state: SafeRAGState) -> SafeRAGState: """Retrieval avec gestion d'erreurs.""" try: docs = retriever.search(state["query"]) return {"documents": docs, "error": ""} except Exception as e: return { "error": str(e), "retry_count": state.get("retry_count", 0) + 1 } def handle_error(state: SafeRAGState) -> SafeRAGState: """Gère les erreurs gracieusement.""" return { "response": f"Désolé, une erreur s'est produite: {state['error']}" } def route_on_error(state: SafeRAGState) -> str: if state.get("error"): if state.get("retry_count", 0) < 3: return "retry" return "error_handler" return "continue"
Checklist d'implémentation
- État bien défini avec tous les champs nécessaires
- Noeuds atomiques et testables
- Conditions de routage claires
- Gestion des boucles infinies (max_iterations)
- Gestion des erreurs à chaque noeud
- Streaming configuré pour UX temps réel
- Persistance pour conversations multi-tours
- Monitoring et logging des transitions
Conclusion
LangGraph permet de construire des pipelines RAG sophistiqués avec correction automatique, routage intelligent et collaboration multi-agents. La clé est de bien définir l'état partagé et les conditions de transition.
FAQ
Pour aller plus loin
Besoin d'un RAG orchestré ? Ailog propose des solutions RAG avec workflows intelligents et correction automatique. Architecture moderne et scalable.
Tags
Articles connexes
Agents RAG : Orchestrer des systemes multi-agents
Architecturez des systemes RAG multi-agents : orchestration, specialisation, collaboration et gestion des echecs pour des assistants complexes.
Agentic RAG 2025 : Construire des Agents IA Autonomes (Guide Complet)
Guide complet Agentic RAG : architecture, design patterns, agents autonomes avec retrieval dynamique, orchestration multi-outils. Avec exemples LangGraph et CrewAI.
RAG Conversationnel : Memoire et contexte multi-sessions
Implementez un RAG avec memoire conversationnelle : gestion du contexte, historique multi-sessions et personnalisation des reponses.