Agents RAG : Orchestrer des systemes multi-agents
Architecturez des systemes RAG multi-agents : orchestration, specialisation, collaboration et gestion des echecs pour des assistants complexes.
Agents RAG : Orchestrer des systemes multi-agents
Un agent RAG simple suit un pipeline lineaire : retrieval, generation, reponse. Un systeme multi-agents orchestre plusieurs agents specialises qui collaborent pour traiter des taches complexes. Ce guide explore les architectures et patterns pour construire ces systemes.
Pourquoi les multi-agents ?
Limites du RAG monolithique
Un RAG classique peine face a certaines taches :
| Tache | Probleme RAG simple |
|---|---|
| Questions multi-sources | Un seul retrieval, contexte limite |
| Raisonnement complexe | Pas de decomposition du probleme |
| Actions multiples | Pipeline lineaire, pas de boucle |
| Verification des faits | Pas de double-check |
| Taches longues | Pas de planification |
L'approche multi-agents
Diviser pour mieux regner :
Question complexe
│
▼
┌─────────────────┐
│ ORCHESTRATEUR │ ← Decompose et coordonne
└────────┬────────┘
│
┌────┼────┬────────────┐
▼ ▼ ▼ ▼
┌──────┐┌──────┐┌──────┐┌──────┐
│Agent ││Agent ││Agent ││Agent │
│Search││Reason││Verify││Action│
└──────┘└──────┘└──────┘└──────┘
│ │ │ │
└────┴────┴────────────┘
│
▼
Reponse finale
Architectures d'orchestration
Pattern 1 : Routeur
L'orchestrateur route vers l'agent specialise.
DEVELOPERpythonfrom enum import Enum from typing import Callable, Dict class AgentType(Enum): FAQ = "faq" TECHNICAL = "technical" SALES = "sales" ESCALATION = "escalation" class RouterOrchestrator: def __init__(self, agents: Dict[AgentType, Callable], classifier): self.agents = agents self.classifier = classifier async def process(self, query: str, context: dict = None) -> dict: """ Route la requete vers l'agent approprie """ # 1. Classifier la requete classification = await self.classifier.classify(query) agent_type = AgentType(classification["agent"]) # 2. Appeler l'agent agent = self.agents.get(agent_type) if not agent: agent = self.agents[AgentType.ESCALATION] result = await agent(query, context) return { "response": result["answer"], "agent_used": agent_type.value, "confidence": classification["confidence"], "sources": result.get("sources", []) } class IntentClassifier: def __init__(self, llm): self.llm = llm async def classify(self, query: str) -> dict: prompt = f""" Classifie cette requete utilisateur. Categories : - faq : Questions generales, policies, informations - technical : Problemes techniques, bugs, configurations - sales : Pricing, achats, abonnements - escalation : Plaintes, urgences, demandes sensibles Requete : {query} Reponds en JSON : {{"agent": "...", "confidence": 0.0-1.0}} """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)
Pattern 2 : Pipeline sequentiel
Les agents s'executent en sequence, chacun enrichissant le contexte.
DEVELOPERpythonfrom dataclasses import dataclass from typing import List @dataclass class AgentResult: agent_name: str output: dict success: bool error: str = None class SequentialPipeline: def __init__(self, agents: List[tuple]): """ agents: List de (nom, agent_callable) """ self.agents = agents async def process(self, query: str, initial_context: dict = None) -> dict: """ Execute les agents en sequence """ context = initial_context or {} context["original_query"] = query results = [] for agent_name, agent in self.agents: try: result = await agent(query, context) agent_result = AgentResult( agent_name=agent_name, output=result, success=True ) # Enrichir le contexte pour l'agent suivant context[f"{agent_name}_result"] = result context["last_result"] = result except Exception as e: agent_result = AgentResult( agent_name=agent_name, output={}, success=False, error=str(e) ) results.append(agent_result) # Arreter si un agent echoue (optionnel) if not agent_result.success and self.stop_on_failure: break return { "final_result": context.get("last_result"), "pipeline_results": results, "context": context } # Exemple de pipeline pipeline = SequentialPipeline([ ("query_analyzer", QueryAnalyzerAgent()), ("retriever", RetrievalAgent()), ("fact_checker", FactCheckAgent()), ("generator", GenerationAgent()), ("citation_adder", CitationAgent()) ])
Pattern 3 : Parallele avec fusion
Plusieurs agents travaillent en parallele, puis fusion des resultats.
DEVELOPERpythonimport asyncio from typing import List, Callable class ParallelOrchestrator: def __init__( self, agents: List[tuple], fusion_agent: Callable ): self.agents = agents # (name, agent, weight) self.fusion = fusion_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute les agents en parallele puis fusionne """ # Lancer tous les agents en parallele tasks = [ self._run_agent(name, agent, query, context) for name, agent, _ in self.agents ] results = await asyncio.gather(*tasks, return_exceptions=True) # Collecter les resultats valides valid_results = [] for i, result in enumerate(results): name, _, weight = self.agents[i] if not isinstance(result, Exception): valid_results.append({ "agent": name, "result": result, "weight": weight }) # Fusionner les resultats fused = await self.fusion(query, valid_results) return { "response": fused["answer"], "contributing_agents": [r["agent"] for r in valid_results], "fusion_confidence": fused.get("confidence") } async def _run_agent(self, name: str, agent: Callable, query: str, context: dict): try: return await asyncio.wait_for( agent(query, context), timeout=10.0 ) except asyncio.TimeoutError: return {"error": "timeout", "agent": name} class FusionAgent: def __init__(self, llm): self.llm = llm async def __call__(self, query: str, results: List[dict]) -> dict: """ Fusionne les reponses de plusieurs agents """ responses_text = "\n\n".join([ f"Agent {r['agent']} (poids {r['weight']}):\n{r['result'].get('answer', 'N/A')}" for r in results ]) prompt = f""" Tu dois synthetiser ces reponses de differents agents en une seule reponse coherente. Question originale : {query} Reponses des agents : {responses_text} Regles : 1. Priorise les agents avec le poids le plus eleve 2. En cas de contradiction, indique les deux points de vue 3. Cite les sources si disponibles 4. Genere une reponse unique et coherente Reponse synthetisee : """ answer = await self.llm.generate(prompt, temperature=0.3) return { "answer": answer, "confidence": self._calculate_confidence(results) }
Pattern 4 : ReAct (Reasoning + Acting)
L'agent raisonne, agit, observe, et itere.
DEVELOPERpythonfrom typing import Dict, Any class ReActAgent: def __init__(self, llm, tools: Dict[str, Callable], max_iterations: int = 5): self.llm = llm self.tools = tools self.max_iterations = max_iterations async def process(self, query: str) -> dict: """ Execute le pattern ReAct : Thought -> Action -> Observation """ history = [] final_answer = None for i in range(self.max_iterations): # Generer la prochaine pensee/action step = await self._generate_step(query, history) if step["type"] == "thought": history.append({"thought": step["content"]}) elif step["type"] == "action": # Executer l'action tool_name = step["tool"] tool_input = step["input"] if tool_name in self.tools: observation = await self.tools[tool_name](tool_input) else: observation = f"Outil '{tool_name}' non disponible" history.append({ "action": f"{tool_name}({tool_input})", "observation": observation }) elif step["type"] == "answer": final_answer = step["content"] break return { "answer": final_answer, "reasoning_trace": history, "iterations": i + 1 } async def _generate_step(self, query: str, history: list) -> dict: """ Genere la prochaine etape (pensee, action ou reponse) """ history_text = self._format_history(history) tools_desc = self._format_tools() prompt = f""" Tu es un agent qui resout des problemes etape par etape. Question : {query} Outils disponibles : {tools_desc} Historique : {history_text} Prochaine etape (choisis UN format) : Pensee : [ton raisonnement] OU Action : [nom_outil]([parametres]) OU Reponse : [ta reponse finale] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_step(result) # Outils pour l'agent ReAct tools = { "search_kb": lambda q: rag_search(q), "calculate": lambda expr: eval(expr), "get_current_date": lambda _: datetime.now().isoformat(), "check_inventory": lambda product_id: inventory_api.check(product_id) }
Agents specialises
Agent de recherche
DEVELOPERpythonclass SearchAgent: def __init__(self, retrievers: Dict[str, Retriever], llm): self.retrievers = retrievers self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Agent specialise dans la recherche multi-sources """ # 1. Determiner les sources pertinentes sources = await self._select_sources(query) # 2. Rechercher dans chaque source all_results = [] for source_name in sources: retriever = self.retrievers.get(source_name) if retriever: results = await retriever.search(query) for r in results: r["source"] = source_name all_results.extend(results) # 3. Re-ranker les resultats ranked = await self._rerank(query, all_results) return { "documents": ranked[:10], "sources_used": sources, "total_found": len(all_results) } async def _select_sources(self, query: str) -> List[str]: """ Selectionne les sources pertinentes pour la requete """ prompt = f""" Quelles sources sont pertinentes pour cette question ? Sources disponibles : - kb_general : Base de connaissances generale - kb_technical : Documentation technique - kb_support : Historique tickets support - kb_products : Catalogue produits Question : {query} Sources pertinentes (liste JSON) : """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)
Agent de verification
DEVELOPERpythonclass VerificationAgent: def __init__(self, llm, fact_db): self.llm = llm self.fact_db = fact_db async def __call__(self, query: str, context: dict) -> dict: """ Verifie les affirmations contre des sources fiables """ # Recuperer la reponse a verifier answer = context.get("last_result", {}).get("answer", "") # 1. Extraire les affirmations claims = await self._extract_claims(answer) # 2. Verifier chaque affirmation verifications = [] for claim in claims: verification = await self._verify_claim(claim) verifications.append(verification) # 3. Determiner le score global verified_count = sum(1 for v in verifications if v["verified"]) total = len(verifications) return { "claims": verifications, "verification_score": verified_count / total if total > 0 else 1.0, "needs_correction": any(not v["verified"] for v in verifications) } async def _verify_claim(self, claim: str) -> dict: """ Verifie une affirmation specifique """ # Chercher des faits contradictoires ou confirmants facts = await self.fact_db.search(claim, top_k=3) prompt = f""" Verifie cette affirmation contre les faits suivants. Affirmation : {claim} Faits de reference : {self._format_facts(facts)} L'affirmation est-elle : - verified : supportee par les faits - contradicted : contredite par les faits - unverified : impossible a verifier Reponds en JSON avec "status" et "evidence". """ result = await self.llm.generate(prompt, temperature=0) parsed = self._parse_json(result) return { "claim": claim, "verified": parsed.get("status") == "verified", "status": parsed.get("status"), "evidence": parsed.get("evidence") }
Agent d'action
DEVELOPERpythonclass ActionAgent: def __init__(self, action_registry: Dict[str, Callable], llm): self.actions = action_registry self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Execute des actions basees sur la requete """ # 1. Determiner les actions necessaires action_plan = await self._plan_actions(query, context) # 2. Executer les actions results = [] for action in action_plan: if action["name"] in self.actions: try: result = await self.actions[action["name"]](**action["params"]) results.append({ "action": action["name"], "success": True, "result": result }) except Exception as e: results.append({ "action": action["name"], "success": False, "error": str(e) }) return { "actions_executed": results, "all_successful": all(r["success"] for r in results) } async def _plan_actions(self, query: str, context: dict) -> List[dict]: """ Planifie les actions a executer """ actions_desc = "\n".join([ f"- {name}: {func.__doc__ or 'No description'}" for name, func in self.actions.items() ]) prompt = f""" Planifie les actions necessaires pour satisfaire cette demande. Demande : {query} Contexte : {context} Actions disponibles : {actions_desc} Plan d'actions (JSON array) : [ {{"name": "action_name", "params": {{}}}}, ... ] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result) # Registre d'actions action_registry = { "create_ticket": create_support_ticket, "send_email": send_email, "update_order": update_order_status, "schedule_callback": schedule_callback, "apply_discount": apply_discount_code }
Gestion des echecs
Retry avec backoff
DEVELOPERpythonimport asyncio from functools import wraps def with_retry(max_attempts: int = 3, backoff: float = 1.0): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: wait_time = backoff * (2 ** attempt) await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator
Fallback gracieux
DEVELOPERpythonclass ResilientOrchestrator: def __init__(self, primary_agents, fallback_agent): self.primary = primary_agents self.fallback = fallback_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute avec fallback en cas d'echec """ try: # Essayer le pipeline principal result = await self._run_primary(query, context) if self._is_valid_result(result): return result except Exception as e: logger.warning(f"Primary pipeline failed: {e}") # Fallback vers l'agent simple return await self.fallback(query, context) def _is_valid_result(self, result: dict) -> bool: """ Verifie si le resultat est valide """ return ( result.get("answer") and len(result["answer"]) > 10 and result.get("confidence", 0) > 0.5 )
Monitoring et observabilite
DEVELOPERpythonimport time from dataclasses import dataclass @dataclass class AgentTrace: agent_name: str start_time: float end_time: float input_query: str output: dict success: bool error: str = None class TracingOrchestrator: def __init__(self, orchestrator, trace_store): self.orchestrator = orchestrator self.traces = trace_store async def process(self, query: str, context: dict = None) -> dict: """ Execute avec tracing complet """ trace_id = str(uuid.uuid4()) traces = [] # Wrapper pour capturer les traces original_agents = self.orchestrator.agents.copy() for name, agent in original_agents.items(): self.orchestrator.agents[name] = self._wrap_agent(name, agent, traces) try: result = await self.orchestrator.process(query, context) # Sauvegarder les traces await self.traces.save(trace_id, traces) result["trace_id"] = trace_id result["execution_time_ms"] = sum( (t.end_time - t.start_time) * 1000 for t in traces ) return result finally: # Restaurer les agents originaux self.orchestrator.agents = original_agents def _wrap_agent(self, name: str, agent, traces: list): async def wrapped(query, context): start = time.time() try: result = await agent(query, context) traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output=result, success=True )) return result except Exception as e: traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output={}, success=False, error=str(e) )) raise return wrapped
Bonnes pratiques
1. Agents atomiques
Chaque agent doit avoir une responsabilite unique et claire.
2. Timeout partout
Definir des timeouts pour eviter les blocages.
3. Logging structure
Logger chaque etape pour le debugging.
4. Tests unitaires par agent
Tester chaque agent independamment avant integration.
5. Circuit breaker
Desactiver temporairement les agents defaillants.
Pour aller plus loin
- RAG Conversationnel - Memoire et contexte
- Generation LLM - Optimiser les reponses
- Evaluation RAG - Mesurer la qualite
FAQ
Orchestration multi-agents avec Ailog
Construire une architecture multi-agents robuste demande une expertise pointue. Avec Ailog, beneficiez d'une infrastructure d'agents preconstruite :
- Agents specialises : recherche, verification, action
- Orchestration configurable sans code
- Monitoring integre avec traces completes
- Fallbacks automatiques et gestion des erreurs
- Scalabilite pour les charges importantes
Testez Ailog gratuitement et deployez des assistants multi-agents en quelques clics.
Tags
Articles connexes
Agentic RAG 2025 : Construire des Agents IA Autonomes (Guide Complet)
Guide complet Agentic RAG : architecture, design patterns, agents autonomes avec retrieval dynamique, orchestration multi-outils. Avec exemples LangGraph et CrewAI.
Guardrails pour RAG : Sécuriser vos Assistants IA
Implémentez des guardrails robustes pour éviter les réponses dangereuses, hors-sujet ou inappropriées dans vos systèmes RAG de production.
RAG Conversationnel : Memoire et contexte multi-sessions
Implementez un RAG avec memoire conversationnelle : gestion du contexte, historique multi-sessions et personnalisation des reponses.