GuideAvancé

LangGraph : Workflows RAG complexes

23 mars 2026
22 min de lecture
Équipe Ailog

Construire des pipelines RAG avancés avec LangGraph. Graphes d'agents, états, conditions et orchestration de workflows complexes.

LangGraph : Workflows RAG complexes

LangGraph est un framework puissant pour construire des applications LLM avec état et multi-agents. Ce guide explore comment l'utiliser pour créer des pipelines RAG sophistiqués avec routage conditionnel, boucles de correction et orchestration d'agents.

Prérequis : Consultez les fondamentaux du RAG et notre guide sur l'orchestration d'agents RAG.

Pourquoi LangGraph pour le RAG ?

Limitations du RAG linéaire

ApprocheLimitationSolution LangGraph
Pipeline séquentielPas de retour en arrièreCycles et boucles
Prompt uniquePas d'adaptationRoutage conditionnel
Retrieval fixeRésultats insuffisantsRe-retrieval dynamique
Réponse directePas de vérificationSelf-correction

Architecture LangGraph

ARCHITECTURE LANGGRAPH

StateGraph
    |
    v
+-------+     +-------+     +-------+
| Node1 | --> | Node2 | --> | Node3 |
+-------+     +-------+     +-------+
    |             |             |
    +-------------+-------------+
                  |
                  v
            +---------+
            |  State  |
            +---------+
            (partagé entre tous les noeuds)

Avantages:
- État persistant entre les noeuds
- Cycles possibles (correction, retry)
- Conditions de routage
- Parallélisation

Concepts fondamentaux

State, Nodes et Edges

DEVELOPERpython
from typing import TypedDict, Annotated, List from langgraph.graph import StateGraph, END from langchain_core.messages import HumanMessage, AIMessage import operator # 1. Définition de l'état class RAGState(TypedDict): """État partagé du workflow RAG.""" # Messages de conversation messages: Annotated[List, operator.add] # Requête utilisateur query: str # Documents récupérés documents: List[dict] # Réponse générée response: str # Métadonnées retrieval_count: int is_relevant: bool needs_more_context: bool # 2. Définition des noeuds def retrieve_documents(state: RAGState) -> RAGState: """Noeud de récupération de documents.""" query = state["query"] # Simulation de retrieval documents = retriever.search(query, top_k=5) return { "documents": documents, "retrieval_count": state.get("retrieval_count", 0) + 1 } def grade_documents(state: RAGState) -> RAGState: """Noeud de notation des documents.""" documents = state["documents"] query = state["query"] # Évaluer la pertinence relevant_docs = [] for doc in documents: score = evaluate_relevance(doc, query) if score > 0.7: relevant_docs.append(doc) is_relevant = len(relevant_docs) >= 2 return { "documents": relevant_docs, "is_relevant": is_relevant } def generate_response(state: RAGState) -> RAGState: """Noeud de génération de réponse.""" query = state["query"] documents = state["documents"] # Construire le contexte context = "\n\n".join([doc["content"] for doc in documents]) # Générer la réponse prompt = f"""Context:\n{context}\n\nQuestion: {query}\n\nAnswer:""" response = llm.invoke(prompt) return { "response": response.content, "messages": [AIMessage(content=response.content)] } def check_hallucination(state: RAGState) -> RAGState: """Vérifie si la réponse contient des hallucinations.""" response = state["response"] documents = state["documents"] # Vérifier que la réponse est fondée sur les documents is_grounded = verify_grounding(response, documents) return { "needs_more_context": not is_grounded } # 3. Construction du graphe def build_rag_graph(): """Construit le graphe RAG avec LangGraph.""" workflow = StateGraph(RAGState) # Ajouter les noeuds workflow.add_node("retrieve", retrieve_documents) workflow.add_node("grade", grade_documents) workflow.add_node("generate", generate_response) workflow.add_node("check", check_hallucination) # Définir les edges workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "grade") # Edge conditionnel après la notation workflow.add_conditional_edges( "grade", lambda state: "generate" if state["is_relevant"] else "retrieve", { "generate": "generate", "retrieve": "retrieve" # Boucle de retry } ) workflow.add_edge("generate", "check") # Edge conditionnel après vérification workflow.add_conditional_edges( "check", lambda state: END if not state["needs_more_context"] else "retrieve", { END: END, "retrieve": "retrieve" # Retry si hallucination } ) return workflow.compile()

Patterns de workflows RAG

Pattern 1: Self-RAG (Correction automatique)

DEVELOPERpython
from langgraph.graph import StateGraph, END from typing import Literal class SelfRAGState(TypedDict): query: str documents: List[dict] response: str reflection: str iteration: int max_iterations: int def reflect_on_response(state: SelfRAGState) -> SelfRAGState: """Réflexion sur la qualité de la réponse.""" response = state["response"] query = state["query"] reflection_prompt = f""" Évalue cette réponse à la question. Question: {query} Réponse: {response} Critères: 1. La réponse est-elle complète? 2. Est-elle factuelle et vérifiable? 3. Répond-elle directement à la question? Si des améliorations sont nécessaires, explique lesquelles. Sinon, réponds "SATISFAISANT". """ reflection = llm.invoke(reflection_prompt) return { "reflection": reflection.content, "iteration": state["iteration"] + 1 } def should_continue(state: SelfRAGState) -> Literal["improve", "end"]: """Décide si on continue l'amélioration.""" if state["iteration"] >= state["max_iterations"]: return "end" if "SATISFAISANT" in state["reflection"]: return "end" return "improve" def improve_response(state: SelfRAGState) -> SelfRAGState: """Améliore la réponse basée sur la réflexion.""" improve_prompt = f""" Améliore cette réponse basée sur le feedback. Question originale: {state['query']} Réponse actuelle: {state['response']} Feedback: {state['reflection']} Nouvelle réponse améliorée: """ improved = llm.invoke(improve_prompt) return {"response": improved.content} def build_self_rag(): workflow = StateGraph(SelfRAGState) workflow.add_node("retrieve", retrieve_documents) workflow.add_node("generate", generate_response) workflow.add_node("reflect", reflect_on_response) workflow.add_node("improve", improve_response) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "generate") workflow.add_edge("generate", "reflect") workflow.add_conditional_edges( "reflect", should_continue, {"improve": "improve", "end": END} ) workflow.add_edge("improve", "reflect") # Boucle de correction return workflow.compile()

Pattern 2: Adaptive RAG (Routage intelligent)

DEVELOPERpython
class AdaptiveRAGState(TypedDict): query: str query_type: str # "simple", "complex", "comparison" documents: List[dict] sub_queries: List[str] sub_answers: List[str] final_response: str def classify_query(state: AdaptiveRAGState) -> AdaptiveRAGState: """Classifie la complexité de la requête.""" query = state["query"] classification_prompt = f""" Classifie cette question: "{query}" Types possibles: - simple: Question factuelle directe - complex: Nécessite plusieurs étapes de raisonnement - comparison: Compare plusieurs éléments Réponds uniquement par le type. """ query_type = llm.invoke(classification_prompt).content.strip().lower() return {"query_type": query_type} def decompose_query(state: AdaptiveRAGState) -> AdaptiveRAGState: """Décompose une requête complexe en sous-requêtes.""" query = state["query"] decompose_prompt = f""" Décompose cette question complexe en sous-questions: "{query}" Liste chaque sous-question sur une ligne. """ result = llm.invoke(decompose_prompt) sub_queries = [q.strip() for q in result.content.split("\n") if q.strip()] return {"sub_queries": sub_queries} def process_sub_queries(state: AdaptiveRAGState) -> AdaptiveRAGState: """Traite chaque sous-requête séparément.""" sub_answers = [] for sub_query in state["sub_queries"]: # Retrieval pour chaque sous-requête docs = retriever.search(sub_query, top_k=3) context = "\n".join([d["content"] for d in docs]) answer = llm.invoke(f"Context: {context}\n\nQuestion: {sub_query}") sub_answers.append(answer.content) return {"sub_answers": sub_answers} def synthesize_response(state: AdaptiveRAGState) -> AdaptiveRAGState: """Synthétise les réponses partielles.""" synthesis_prompt = f""" Question originale: {state['query']} Réponses aux sous-questions: {chr(10).join([f'- {a}' for a in state['sub_answers']])} Synthétise une réponse complète et cohérente. """ final = llm.invoke(synthesis_prompt) return {"final_response": final.content} def route_by_complexity(state: AdaptiveRAGState) -> str: """Route selon la complexité de la requête.""" query_type = state["query_type"] if query_type == "simple": return "simple_path" elif query_type == "complex": return "complex_path" else: return "comparison_path" def build_adaptive_rag(): workflow = StateGraph(AdaptiveRAGState) # Noeuds workflow.add_node("classify", classify_query) workflow.add_node("simple_retrieve", retrieve_documents) workflow.add_node("simple_generate", generate_response) workflow.add_node("decompose", decompose_query) workflow.add_node("process_subs", process_sub_queries) workflow.add_node("synthesize", synthesize_response) # Edges workflow.set_entry_point("classify") workflow.add_conditional_edges( "classify", route_by_complexity, { "simple_path": "simple_retrieve", "complex_path": "decompose", "comparison_path": "decompose" } ) workflow.add_edge("simple_retrieve", "simple_generate") workflow.add_edge("simple_generate", END) workflow.add_edge("decompose", "process_subs") workflow.add_edge("process_subs", "synthesize") workflow.add_edge("synthesize", END) return workflow.compile()

Pattern 3: Multi-Agent RAG

DEVELOPERpython
class MultiAgentState(TypedDict): query: str documents: List[dict] researcher_notes: str writer_draft: str reviewer_feedback: str final_response: str revision_count: int def researcher_agent(state: MultiAgentState) -> MultiAgentState: """Agent chercheur: analyse les documents.""" docs = state["documents"] researcher_prompt = """ Tu es un chercheur. Analyse ces documents et extrais: 1. Les faits clés 2. Les sources importantes 3. Les points de consensus et de désaccord Documents: {docs} Notes de recherche: """ notes = llm.invoke(researcher_prompt.format( docs="\n".join([d["content"] for d in docs]) )) return {"researcher_notes": notes.content} def writer_agent(state: MultiAgentState) -> MultiAgentState: """Agent rédacteur: écrit une réponse.""" writer_prompt = f""" Tu es un rédacteur. Utilise ces notes de recherche pour répondre: Question: {state['query']} Notes de recherche: {state['researcher_notes']} Rédige une réponse claire et structurée. """ draft = llm.invoke(writer_prompt) return {"writer_draft": draft.content} def reviewer_agent(state: MultiAgentState) -> MultiAgentState: """Agent relecteur: vérifie la qualité.""" reviewer_prompt = f""" Tu es un relecteur exigeant. Évalue cette réponse: Question: {state['query']} Réponse: {state['writer_draft']} Critères: 1. Exactitude factuelle 2. Clarté 3. Complétude 4. Style Si acceptable, réponds "APPROUVÉ". Sinon, donne des suggestions d'amélioration. """ feedback = llm.invoke(reviewer_prompt) return { "reviewer_feedback": feedback.content, "revision_count": state.get("revision_count", 0) + 1 } def revise_draft(state: MultiAgentState) -> MultiAgentState: """Révise le brouillon basé sur le feedback.""" revise_prompt = f""" Révise cette réponse selon le feedback: Réponse actuelle: {state['writer_draft']} Feedback: {state['reviewer_feedback']} Réponse révisée: """ revised = llm.invoke(revise_prompt) return {"writer_draft": revised.content} def finalize_response(state: MultiAgentState) -> MultiAgentState: """Finalise la réponse.""" return {"final_response": state["writer_draft"]} def should_revise(state: MultiAgentState) -> str: """Décide si une révision est nécessaire.""" if "APPROUVÉ" in state["reviewer_feedback"]: return "finalize" if state["revision_count"] >= 3: return "finalize" return "revise" def build_multi_agent_rag(): workflow = StateGraph(MultiAgentState) workflow.add_node("retrieve", retrieve_documents) workflow.add_node("research", researcher_agent) workflow.add_node("write", writer_agent) workflow.add_node("review", reviewer_agent) workflow.add_node("revise", revise_draft) workflow.add_node("finalize", finalize_response) workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "research") workflow.add_edge("research", "write") workflow.add_edge("write", "review") workflow.add_conditional_edges( "review", should_revise, {"revise": "revise", "finalize": "finalize"} ) workflow.add_edge("revise", "review") # Boucle de révision workflow.add_edge("finalize", END) return workflow.compile()

Exécution et monitoring

Streaming des résultats

DEVELOPERpython
async def stream_rag_response( graph, query: str ): """Stream les résultats du workflow RAG.""" initial_state = { "query": query, "documents": [], "response": "", "messages": [HumanMessage(content=query)], "retrieval_count": 0, "is_relevant": False, "needs_more_context": False } async for event in graph.astream(initial_state): # event contient le nom du noeud et l'état mis à jour node_name = list(event.keys())[0] state_update = event[node_name] yield { "node": node_name, "update": state_update } # Utilisation async def main(): graph = build_rag_graph() async for update in stream_rag_response(graph, "Qu'est-ce que le RAG?"): print(f"[{update['node']}] {update['update']}")

Persistance de l'état

DEVELOPERpython
from langgraph.checkpoint.sqlite import SqliteSaver def build_persistent_rag(): """RAG avec persistance de l'état pour conversations multi-tours.""" workflow = StateGraph(RAGState) # ... configuration du workflow ... # Ajouter un checkpointer pour la persistance memory = SqliteSaver.from_conn_string(":memory:") return workflow.compile(checkpointer=memory) # Utilisation avec thread_id pour conversations graph = build_persistent_rag() # Premier message config = {"configurable": {"thread_id": "user_123"}} result1 = graph.invoke({"query": "Explique le RAG"}, config) # Message de suivi (même thread) result2 = graph.invoke({"query": "Et les embeddings?"}, config)

Bonnes pratiques

Gestion des erreurs

DEVELOPERpython
from langgraph.graph import StateGraph class SafeRAGState(TypedDict): query: str documents: List[dict] response: str error: str retry_count: int def safe_retrieve(state: SafeRAGState) -> SafeRAGState: """Retrieval avec gestion d'erreurs.""" try: docs = retriever.search(state["query"]) return {"documents": docs, "error": ""} except Exception as e: return { "error": str(e), "retry_count": state.get("retry_count", 0) + 1 } def handle_error(state: SafeRAGState) -> SafeRAGState: """Gère les erreurs gracieusement.""" return { "response": f"Désolé, une erreur s'est produite: {state['error']}" } def route_on_error(state: SafeRAGState) -> str: if state.get("error"): if state.get("retry_count", 0) < 3: return "retry" return "error_handler" return "continue"

Checklist d'implémentation

  • État bien défini avec tous les champs nécessaires
  • Noeuds atomiques et testables
  • Conditions de routage claires
  • Gestion des boucles infinies (max_iterations)
  • Gestion des erreurs à chaque noeud
  • Streaming configuré pour UX temps réel
  • Persistance pour conversations multi-tours
  • Monitoring et logging des transitions

Conclusion

LangGraph permet de construire des pipelines RAG sophistiqués avec correction automatique, routage intelligent et collaboration multi-agents. La clé est de bien définir l'état partagé et les conditions de transition.

FAQ

LangChain propose des chaînes linéaires (LCEL) où chaque étape s'exécute séquentiellement. LangGraph ajoute la notion de graphe avec état partagé, permettant des cycles (boucles de correction), des conditions de routage, et des retours en arrière. Utilisez LangChain pour les pipelines simples, LangGraph pour les workflows complexes nécessitant de la logique conditionnelle.
Trois stratégies : définissez un compteur d'itérations dans l'état et arrêtez après N essais, utilisez un timeout global sur l'exécution du graphe, et ajoutez des conditions de sortie explicites (par exemple, score de qualité suffisant). La combinaison des trois garantit que le workflow se termine toujours.
Oui, LangGraph est conçu pour la production. Il supporte l'exécution asynchrone, le streaming des résultats, et la persistance de l'état via des checkpointers (SQLite, PostgreSQL). Pour le scaling, déployez plusieurs instances stateless et utilisez un checkpointer partagé pour maintenir l'état des conversations.
Absolument. LangGraph peut orchestrer l'appel à des agents AutoGen ou CrewAI comme des noeuds du graphe. Chaque noeud peut encapsuler un système multi-agents complet, permettant de combiner la flexibilité du graphe avec la puissance des équipes d'agents spécialisés.
Activez le mode verbose pour voir les transitions entre noeuds. Utilisez le streaming pour observer l'état à chaque étape. Ajoutez des logs dans chaque fonction de noeud. Pour les problèmes de routage, testez les conditions de manière isolée avec des états mockés avant de les intégrer au graphe complet.

Pour aller plus loin


Besoin d'un RAG orchestré ? Ailog propose des solutions RAG avec workflows intelligents et correction automatique. Architecture moderne et scalable.

Tags

RAGLangGraphagentsworkflowsorchestration

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !