GuideAvancé

Agents RAG : Orchestrer des systemes multi-agents

5 mars 2026
20 min de lecture
Equipe Ailog

Architecturez des systemes RAG multi-agents : orchestration, specialisation, collaboration et gestion des echecs pour des assistants complexes.

Agents RAG : Orchestrer des systemes multi-agents

Un agent RAG simple suit un pipeline lineaire : retrieval, generation, reponse. Un systeme multi-agents orchestre plusieurs agents specialises qui collaborent pour traiter des taches complexes. Ce guide explore les architectures et patterns pour construire ces systemes.

Pourquoi les multi-agents ?

Limites du RAG monolithique

Un RAG classique peine face a certaines taches :

TacheProbleme RAG simple
Questions multi-sourcesUn seul retrieval, contexte limite
Raisonnement complexePas de decomposition du probleme
Actions multiplesPipeline lineaire, pas de boucle
Verification des faitsPas de double-check
Taches longuesPas de planification

L'approche multi-agents

Diviser pour mieux regner :

Question complexe
       │
       ▼
┌─────────────────┐
│  ORCHESTRATEUR  │ ← Decompose et coordonne
└────────┬────────┘
         │
    ┌────┼────┬────────────┐
    ▼    ▼    ▼            ▼
┌──────┐┌──────┐┌──────┐┌──────┐
│Agent ││Agent ││Agent ││Agent │
│Search││Reason││Verify││Action│
└──────┘└──────┘└──────┘└──────┘
    │    │    │            │
    └────┴────┴────────────┘
                │
                ▼
         Reponse finale

Architectures d'orchestration

Pattern 1 : Routeur

L'orchestrateur route vers l'agent specialise.

DEVELOPERpython
from enum import Enum from typing import Callable, Dict class AgentType(Enum): FAQ = "faq" TECHNICAL = "technical" SALES = "sales" ESCALATION = "escalation" class RouterOrchestrator: def __init__(self, agents: Dict[AgentType, Callable], classifier): self.agents = agents self.classifier = classifier async def process(self, query: str, context: dict = None) -> dict: """ Route la requete vers l'agent approprie """ # 1. Classifier la requete classification = await self.classifier.classify(query) agent_type = AgentType(classification["agent"]) # 2. Appeler l'agent agent = self.agents.get(agent_type) if not agent: agent = self.agents[AgentType.ESCALATION] result = await agent(query, context) return { "response": result["answer"], "agent_used": agent_type.value, "confidence": classification["confidence"], "sources": result.get("sources", []) } class IntentClassifier: def __init__(self, llm): self.llm = llm async def classify(self, query: str) -> dict: prompt = f""" Classifie cette requete utilisateur. Categories : - faq : Questions generales, policies, informations - technical : Problemes techniques, bugs, configurations - sales : Pricing, achats, abonnements - escalation : Plaintes, urgences, demandes sensibles Requete : {query} Reponds en JSON : {{"agent": "...", "confidence": 0.0-1.0}} """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)

Pattern 2 : Pipeline sequentiel

Les agents s'executent en sequence, chacun enrichissant le contexte.

DEVELOPERpython
from dataclasses import dataclass from typing import List @dataclass class AgentResult: agent_name: str output: dict success: bool error: str = None class SequentialPipeline: def __init__(self, agents: List[tuple]): """ agents: List de (nom, agent_callable) """ self.agents = agents async def process(self, query: str, initial_context: dict = None) -> dict: """ Execute les agents en sequence """ context = initial_context or {} context["original_query"] = query results = [] for agent_name, agent in self.agents: try: result = await agent(query, context) agent_result = AgentResult( agent_name=agent_name, output=result, success=True ) # Enrichir le contexte pour l'agent suivant context[f"{agent_name}_result"] = result context["last_result"] = result except Exception as e: agent_result = AgentResult( agent_name=agent_name, output={}, success=False, error=str(e) ) results.append(agent_result) # Arreter si un agent echoue (optionnel) if not agent_result.success and self.stop_on_failure: break return { "final_result": context.get("last_result"), "pipeline_results": results, "context": context } # Exemple de pipeline pipeline = SequentialPipeline([ ("query_analyzer", QueryAnalyzerAgent()), ("retriever", RetrievalAgent()), ("fact_checker", FactCheckAgent()), ("generator", GenerationAgent()), ("citation_adder", CitationAgent()) ])

Pattern 3 : Parallele avec fusion

Plusieurs agents travaillent en parallele, puis fusion des resultats.

DEVELOPERpython
import asyncio from typing import List, Callable class ParallelOrchestrator: def __init__( self, agents: List[tuple], fusion_agent: Callable ): self.agents = agents # (name, agent, weight) self.fusion = fusion_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute les agents en parallele puis fusionne """ # Lancer tous les agents en parallele tasks = [ self._run_agent(name, agent, query, context) for name, agent, _ in self.agents ] results = await asyncio.gather(*tasks, return_exceptions=True) # Collecter les resultats valides valid_results = [] for i, result in enumerate(results): name, _, weight = self.agents[i] if not isinstance(result, Exception): valid_results.append({ "agent": name, "result": result, "weight": weight }) # Fusionner les resultats fused = await self.fusion(query, valid_results) return { "response": fused["answer"], "contributing_agents": [r["agent"] for r in valid_results], "fusion_confidence": fused.get("confidence") } async def _run_agent(self, name: str, agent: Callable, query: str, context: dict): try: return await asyncio.wait_for( agent(query, context), timeout=10.0 ) except asyncio.TimeoutError: return {"error": "timeout", "agent": name} class FusionAgent: def __init__(self, llm): self.llm = llm async def __call__(self, query: str, results: List[dict]) -> dict: """ Fusionne les reponses de plusieurs agents """ responses_text = "\n\n".join([ f"Agent {r['agent']} (poids {r['weight']}):\n{r['result'].get('answer', 'N/A')}" for r in results ]) prompt = f""" Tu dois synthetiser ces reponses de differents agents en une seule reponse coherente. Question originale : {query} Reponses des agents : {responses_text} Regles : 1. Priorise les agents avec le poids le plus eleve 2. En cas de contradiction, indique les deux points de vue 3. Cite les sources si disponibles 4. Genere une reponse unique et coherente Reponse synthetisee : """ answer = await self.llm.generate(prompt, temperature=0.3) return { "answer": answer, "confidence": self._calculate_confidence(results) }

Pattern 4 : ReAct (Reasoning + Acting)

L'agent raisonne, agit, observe, et itere.

DEVELOPERpython
from typing import Dict, Any class ReActAgent: def __init__(self, llm, tools: Dict[str, Callable], max_iterations: int = 5): self.llm = llm self.tools = tools self.max_iterations = max_iterations async def process(self, query: str) -> dict: """ Execute le pattern ReAct : Thought -> Action -> Observation """ history = [] final_answer = None for i in range(self.max_iterations): # Generer la prochaine pensee/action step = await self._generate_step(query, history) if step["type"] == "thought": history.append({"thought": step["content"]}) elif step["type"] == "action": # Executer l'action tool_name = step["tool"] tool_input = step["input"] if tool_name in self.tools: observation = await self.tools[tool_name](tool_input) else: observation = f"Outil '{tool_name}' non disponible" history.append({ "action": f"{tool_name}({tool_input})", "observation": observation }) elif step["type"] == "answer": final_answer = step["content"] break return { "answer": final_answer, "reasoning_trace": history, "iterations": i + 1 } async def _generate_step(self, query: str, history: list) -> dict: """ Genere la prochaine etape (pensee, action ou reponse) """ history_text = self._format_history(history) tools_desc = self._format_tools() prompt = f""" Tu es un agent qui resout des problemes etape par etape. Question : {query} Outils disponibles : {tools_desc} Historique : {history_text} Prochaine etape (choisis UN format) : Pensee : [ton raisonnement] OU Action : [nom_outil]([parametres]) OU Reponse : [ta reponse finale] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_step(result) # Outils pour l'agent ReAct tools = { "search_kb": lambda q: rag_search(q), "calculate": lambda expr: eval(expr), "get_current_date": lambda _: datetime.now().isoformat(), "check_inventory": lambda product_id: inventory_api.check(product_id) }

Agents specialises

Agent de recherche

DEVELOPERpython
class SearchAgent: def __init__(self, retrievers: Dict[str, Retriever], llm): self.retrievers = retrievers self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Agent specialise dans la recherche multi-sources """ # 1. Determiner les sources pertinentes sources = await self._select_sources(query) # 2. Rechercher dans chaque source all_results = [] for source_name in sources: retriever = self.retrievers.get(source_name) if retriever: results = await retriever.search(query) for r in results: r["source"] = source_name all_results.extend(results) # 3. Re-ranker les resultats ranked = await self._rerank(query, all_results) return { "documents": ranked[:10], "sources_used": sources, "total_found": len(all_results) } async def _select_sources(self, query: str) -> List[str]: """ Selectionne les sources pertinentes pour la requete """ prompt = f""" Quelles sources sont pertinentes pour cette question ? Sources disponibles : - kb_general : Base de connaissances generale - kb_technical : Documentation technique - kb_support : Historique tickets support - kb_products : Catalogue produits Question : {query} Sources pertinentes (liste JSON) : """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)

Agent de verification

DEVELOPERpython
class VerificationAgent: def __init__(self, llm, fact_db): self.llm = llm self.fact_db = fact_db async def __call__(self, query: str, context: dict) -> dict: """ Verifie les affirmations contre des sources fiables """ # Recuperer la reponse a verifier answer = context.get("last_result", {}).get("answer", "") # 1. Extraire les affirmations claims = await self._extract_claims(answer) # 2. Verifier chaque affirmation verifications = [] for claim in claims: verification = await self._verify_claim(claim) verifications.append(verification) # 3. Determiner le score global verified_count = sum(1 for v in verifications if v["verified"]) total = len(verifications) return { "claims": verifications, "verification_score": verified_count / total if total > 0 else 1.0, "needs_correction": any(not v["verified"] for v in verifications) } async def _verify_claim(self, claim: str) -> dict: """ Verifie une affirmation specifique """ # Chercher des faits contradictoires ou confirmants facts = await self.fact_db.search(claim, top_k=3) prompt = f""" Verifie cette affirmation contre les faits suivants. Affirmation : {claim} Faits de reference : {self._format_facts(facts)} L'affirmation est-elle : - verified : supportee par les faits - contradicted : contredite par les faits - unverified : impossible a verifier Reponds en JSON avec "status" et "evidence". """ result = await self.llm.generate(prompt, temperature=0) parsed = self._parse_json(result) return { "claim": claim, "verified": parsed.get("status") == "verified", "status": parsed.get("status"), "evidence": parsed.get("evidence") }

Agent d'action

DEVELOPERpython
class ActionAgent: def __init__(self, action_registry: Dict[str, Callable], llm): self.actions = action_registry self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Execute des actions basees sur la requete """ # 1. Determiner les actions necessaires action_plan = await self._plan_actions(query, context) # 2. Executer les actions results = [] for action in action_plan: if action["name"] in self.actions: try: result = await self.actions[action["name"]](**action["params"]) results.append({ "action": action["name"], "success": True, "result": result }) except Exception as e: results.append({ "action": action["name"], "success": False, "error": str(e) }) return { "actions_executed": results, "all_successful": all(r["success"] for r in results) } async def _plan_actions(self, query: str, context: dict) -> List[dict]: """ Planifie les actions a executer """ actions_desc = "\n".join([ f"- {name}: {func.__doc__ or 'No description'}" for name, func in self.actions.items() ]) prompt = f""" Planifie les actions necessaires pour satisfaire cette demande. Demande : {query} Contexte : {context} Actions disponibles : {actions_desc} Plan d'actions (JSON array) : [ {{"name": "action_name", "params": {{}}}}, ... ] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result) # Registre d'actions action_registry = { "create_ticket": create_support_ticket, "send_email": send_email, "update_order": update_order_status, "schedule_callback": schedule_callback, "apply_discount": apply_discount_code }

Gestion des echecs

Retry avec backoff

DEVELOPERpython
import asyncio from functools import wraps def with_retry(max_attempts: int = 3, backoff: float = 1.0): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: wait_time = backoff * (2 ** attempt) await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator

Fallback gracieux

DEVELOPERpython
class ResilientOrchestrator: def __init__(self, primary_agents, fallback_agent): self.primary = primary_agents self.fallback = fallback_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute avec fallback en cas d'echec """ try: # Essayer le pipeline principal result = await self._run_primary(query, context) if self._is_valid_result(result): return result except Exception as e: logger.warning(f"Primary pipeline failed: {e}") # Fallback vers l'agent simple return await self.fallback(query, context) def _is_valid_result(self, result: dict) -> bool: """ Verifie si le resultat est valide """ return ( result.get("answer") and len(result["answer"]) > 10 and result.get("confidence", 0) > 0.5 )

Monitoring et observabilite

DEVELOPERpython
import time from dataclasses import dataclass @dataclass class AgentTrace: agent_name: str start_time: float end_time: float input_query: str output: dict success: bool error: str = None class TracingOrchestrator: def __init__(self, orchestrator, trace_store): self.orchestrator = orchestrator self.traces = trace_store async def process(self, query: str, context: dict = None) -> dict: """ Execute avec tracing complet """ trace_id = str(uuid.uuid4()) traces = [] # Wrapper pour capturer les traces original_agents = self.orchestrator.agents.copy() for name, agent in original_agents.items(): self.orchestrator.agents[name] = self._wrap_agent(name, agent, traces) try: result = await self.orchestrator.process(query, context) # Sauvegarder les traces await self.traces.save(trace_id, traces) result["trace_id"] = trace_id result["execution_time_ms"] = sum( (t.end_time - t.start_time) * 1000 for t in traces ) return result finally: # Restaurer les agents originaux self.orchestrator.agents = original_agents def _wrap_agent(self, name: str, agent, traces: list): async def wrapped(query, context): start = time.time() try: result = await agent(query, context) traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output=result, success=True )) return result except Exception as e: traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output={}, success=False, error=str(e) )) raise return wrapped

Bonnes pratiques

1. Agents atomiques

Chaque agent doit avoir une responsabilite unique et claire.

2. Timeout partout

Definir des timeouts pour eviter les blocages.

3. Logging structure

Logger chaque etape pour le debugging.

4. Tests unitaires par agent

Tester chaque agent independamment avant integration.

5. Circuit breaker

Desactiver temporairement les agents defaillants.

Pour aller plus loin

FAQ

Un RAG simple suit un pipeline lineaire : requete, retrieval, generation, reponse. Un systeme multi-agents orchestre plusieurs agents specialises qui collaborent. Par exemple, un agent de recherche trouve les documents, un agent de verification valide les faits, et un agent de generation formule la reponse. Cette architecture permet de traiter des taches complexes necessitant raisonnement et actions multiples.
Le pattern ReAct (Reasoning + Acting) est adapte aux taches necessitant plusieurs iterations et decisions dynamiques : l'agent raisonne, agit, observe le resultat et decide de la suite. Utilisez-le pour les questions complexes, la recherche exploratoire ou les taches avec incertitude. Le pipeline sequentiel convient mieux aux workflows predefinis et previsibles ou chaque etape est connue a l'avance.
Implementez plusieurs niveaux de resilience : retry avec backoff exponentiel pour les erreurs transitoires, fallback vers des agents de secours si un agent echoue, circuit breaker pour desactiver temporairement les agents defaillants, et timeouts stricts pour eviter les blocages. Toujours prevoir un fallback ultime vers un agent simple qui peut repondre meme de maniere degradee.
Pas necessairement. Avec le pattern parallele, plusieurs agents travaillent simultanement et le temps total est celui de l'agent le plus lent. Le pattern sequentiel ajoute de la latence mais peut etre optimise avec du caching et des agents legers. Le compromis latence/qualite depend du cas d'usage : privilegiez la vitesse pour le support client, la qualite pour l'analyse de documents complexes.
Le tracing est essentiel : chaque agent doit logger ses entrees, sorties, duree d'execution et erreurs avec un trace_id unique pour suivre le parcours complet d'une requete. Implementez des dashboards de monitoring montrant les taux de succes par agent, les latences et les patterns d'echec. Les traces permettent de rejouer une requete problematique pour reproduire et corriger les bugs. ---

Orchestration multi-agents avec Ailog

Construire une architecture multi-agents robuste demande une expertise pointue. Avec Ailog, beneficiez d'une infrastructure d'agents preconstruite :

  • Agents specialises : recherche, verification, action
  • Orchestration configurable sans code
  • Monitoring integre avec traces completes
  • Fallbacks automatiques et gestion des erreurs
  • Scalabilite pour les charges importantes

Testez Ailog gratuitement et deployez des assistants multi-agents en quelques clics.

Tags

RAGagentsorchestrationmulti-agentsLLMarchitecture

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !