RAG-Agenten: Orchestrierung von Multi-Agenten-Systemen
Konzipieren Sie RAG-basierte Multi-Agenten-Systeme: Orchestrierung, Spezialisierung, Zusammenarbeit und Fehlerbehandlung für komplexe Assistenten.
Agents RAG : Orchestrieren von Multi-Agenten-Systemen
Ein einfacher RAG-Agent folgt einer linearen Pipeline: retrieval, generation, antwort. Ein Multi-Agenten-System orchestriert mehrere spezialisierte Agents, die zusammenarbeiten, um komplexe Aufgaben zu bearbeiten. Dieser Leitfaden untersucht Architekturen und Patterns zum Aufbau solcher Systeme.
Warum Multi-Agenten?
Grenzen des monolithischen RAG
Ein klassischer RAG stößt bei bestimmten Aufgaben an Grenzen:
| Tache | Probleme RAG simple |
|---|---|
| Questions multi-sources | Ein einziges retrieval, begrenzter Kontext |
| Raisonnement complexe | Keine Problemzerlegung |
| Actions multiples | Lineare Pipeline, keine Schleife |
| Verification des faits | Kein Double-Check |
| Taches longues | Keine Planung |
Der Multi-Agenten-Ansatz
Teilen, um besser zu herrschen:
Question complexe
│
▼
┌─────────────────┐
│ ORCHESTRATEUR │ ← Decompose et coordonne
└────────┬────────┘
│
┌────┼────┬────────────┐
▼ ▼ ▼ ▼
┌──────┐┌──────┐┌──────┐┌──────┐
│Agent ││Agent ││Agent ││Agent │
│Search││Reason││Verify││Action│
└──────┘└──────┘└──────┘└──────┘
│ │ │ │
└────┴────┴────────────┘
│
▼
Reponse finale
Orchestrierungsarchitekturen
Pattern 1 : Routeur
Der Orchestrator routet an den spezialisierten Agenten.
DEVELOPERpythonfrom enum import Enum from typing import Callable, Dict class AgentType(Enum): FAQ = "faq" TECHNICAL = "technical" SALES = "sales" ESCALATION = "escalation" class RouterOrchestrator: def __init__(self, agents: Dict[AgentType, Callable], classifier): self.agents = agents self.classifier = classifier async def process(self, query: str, context: dict = None) -> dict: """ Route la requete vers l'agent approprie """ # 1. Die Anfrage klassifizieren classification = await self.classifier.classify(query) agent_type = AgentType(classification["agent"]) # 2. Agenten aufrufen agent = self.agents.get(agent_type) if not agent: agent = self.agents[AgentType.ESCALATION] result = await agent(query, context) return { "response": result["answer"], "agent_used": agent_type.value, "confidence": classification["confidence"], "sources": result.get("sources", []) } class IntentClassifier: def __init__(self, llm): self.llm = llm async def classify(self, query: str) -> dict: prompt = f""" Classifie cette requete utilisateur. Categories : - faq : Questions generales, policies, informations - technical : Problemes techniques, bugs, configurations - sales : Pricing, achats, abonnements - escalation : Plaintes, urgences, demandes sensibles Requete : {query} Reponds en JSON : {{"agent": "...", "confidence": 0.0-1.0}} """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)
Pattern 2 : Sequenzielle Pipeline
Die Agents werden nacheinander ausgeführt, jeder erweitert den Kontext.
DEVELOPERpythonfrom dataclasses import dataclass from typing import List @dataclass class AgentResult: agent_name: str output: dict success: bool error: str = None class SequentialPipeline: def __init__(self, agents: List[tuple]): """ agents: List de (nom, agent_callable) """ self.agents = agents async def process(self, query: str, initial_context: dict = None) -> dict: """ Execute les agents en sequence """ context = initial_context or {} context["original_query"] = query results = [] for agent_name, agent in self.agents: try: result = await agent(query, context) agent_result = AgentResult( agent_name=agent_name, output=result, success=True ) # Kontext für den nächsten Agenten anreichern context[f"{agent_name}_result"] = result context["last_result"] = result except Exception as e: agent_result = AgentResult( agent_name=agent_name, output={}, success=False, error=str(e) ) results.append(agent_result) # Abbrechen, wenn ein Agent fehlschlägt (optional) if not agent_result.success and self.stop_on_failure: break return { "final_result": context.get("last_result"), "pipeline_results": results, "context": context } # Exemple de pipeline pipeline = SequentialPipeline([ ("query_analyzer", QueryAnalyzerAgent()), ("retriever", RetrievalAgent()), ("fact_checker", FactCheckAgent()), ("generator", GenerationAgent()), ("citation_adder", CitationAgent()) ])
Pattern 3 : Parallel mit Fusion
Mehrere Agents arbeiten parallel, danach werden die Ergebnisse zusammengeführt.
DEVELOPERpythonimport asyncio from typing import List, Callable class ParallelOrchestrator: def __init__( self, agents: List[tuple], fusion_agent: Callable ): self.agents = agents # (name, agent, weight) self.fusion = fusion_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute les agents en parallele puis fusionne """ # Alle Agenten parallel starten tasks = [ self._run_agent(name, agent, query, context) for name, agent, _ in self.agents ] results = await asyncio.gather(*tasks, return_exceptions=True) # Gültige Ergebnisse sammeln valid_results = [] for i, result in enumerate(results): name, _, weight = self.agents[i] if not isinstance(result, Exception): valid_results.append({ "agent": name, "result": result, "weight": weight }) # Ergebnisse fusionieren fused = await self.fusion(query, valid_results) return { "response": fused["answer"], "contributing_agents": [r["agent"] for r in valid_results], "fusion_confidence": fused.get("confidence") } async def _run_agent(self, name: str, agent: Callable, query: str, context: dict): try: return await asyncio.wait_for( agent(query, context), timeout=10.0 ) except asyncio.TimeoutError: return {"error": "timeout", "agent": name} class FusionAgent: def __init__(self, llm): self.llm = llm async def __call__(self, query: str, results: List[dict]) -> dict: """ Fusionne les reponses de plusieurs agents """ responses_text = "\n\n".join([ f"Agent {r['agent']} (poids {r['weight']}):\n{r['result'].get('answer', 'N/A')}" for r in results ]) prompt = f""" Tu dois synthetiser ces reponses de differents agents en une seule reponse coherente. Question originale : {query} Reponses des agents : {responses_text} Regles : 1. Priorise les agents avec le poids le plus eleve 2. En cas de contradiction, indique les deux points de vue 3. Cite les sources si disponibles 4. Genere une reponse unique et coherente Reponse synthetisee : """ answer = await self.llm.generate(prompt, temperature=0.3) return { "answer": answer, "confidence": self._calculate_confidence(results) }
Pattern 4 : ReAct (Reasoning + Acting)
Der Agent denkt nach, handelt, beobachtet und iteriert.
DEVELOPERpythonfrom typing import Dict, Any class ReActAgent: def __init__(self, llm, tools: Dict[str, Callable], max_iterations: int = 5): self.llm = llm self.tools = tools self.max_iterations = max_iterations async def process(self, query: str) -> dict: """ Execute le pattern ReAct : Thought -> Action -> Observation """ history = [] final_answer = None for i in range(self.max_iterations): # Den nächsten Gedanken/die nächste Aktion erzeugen step = await self._generate_step(query, history) if step["type"] == "thought": history.append({"thought": step["content"]}) elif step["type"] == "action": # Aktion ausführen tool_name = step["tool"] tool_input = step["input"] if tool_name in self.tools: observation = await self.tools[tool_name](tool_input) else: observation = f"Outil '{tool_name}' non disponible" history.append({ "action": f"{tool_name}({tool_input})", "observation": observation }) elif step["type"] == "answer": final_answer = step["content"] break return { "answer": final_answer, "reasoning_trace": history, "iterations": i + 1 } async def _generate_step(self, query: str, history: list) -> dict: """ Genere la prochaine etape (pensee, action ou reponse) """ history_text = self._format_history(history) tools_desc = self._format_tools() prompt = f""" Tu es un agent qui resout des problemes etape par etape. Question : {query} Outils disponibles : {tools_desc} Historique : {history_text} Prochaine etape (choisis UN format) : Pensee : [ton raisonnement] OU Action : [nom_outil]([parametres]) OU Reponse : [ta reponse finale] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_step(result) # Outils pour l'agent ReAct tools = { "search_kb": lambda q: rag_search(q), "calculate": lambda expr: eval(expr), "get_current_date": lambda _: datetime.now().isoformat(), "check_inventory": lambda product_id: inventory_api.check(product_id) }
Spezialisierte Agents
Search Agent
DEVELOPERpythonclass SearchAgent: def __init__(self, retrievers: Dict[str, Retriever], llm): self.retrievers = retrievers self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Agent specialise dans la recherche multi-sources """ # 1. Relevante Quellen bestimmen sources = await self._select_sources(query) # 2. In jeder Quelle suchen all_results = [] for source_name in sources: retriever = self.retrievers.get(source_name) if retriever: results = await retriever.search(query) for r in results: r["source"] = source_name all_results.extend(results) # 3. Ergebnisse reranken ranked = await self._rerank(query, all_results) return { "documents": ranked[:10], "sources_used": sources, "total_found": len(all_results) } async def _select_sources(self, query: str) -> List[str]: """ Selectionne les sources pertinentes pour la requete """ prompt = f""" Quelles sources sont pertinentes pour cette question ? Sources disponibles : - kb_general : Base de connaissances generale - kb_technical : Documentation technique - kb_support : Historique tickets support - kb_products : Catalogue produits Question : {query} Sources pertinentes (liste JSON) : """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)
Verification Agent
DEVELOPERpythonclass VerificationAgent: def __init__(self, llm, fact_db): self.llm = llm self.fact_db = fact_db async def __call__(self, query: str, context: dict) -> dict: """ Verifie les affirmations contre des sources fiables """ # Die zu überprüfende Antwort abrufen answer = context.get("last_result", {}).get("answer", "") # 1. Aussagen extrahieren claims = await self._extract_claims(answer) # 2. Jede Aussage überprüfen verifications = [] for claim in claims: verification = await self._verify_claim(claim) verifications.append(verification) # 3. Gesamtscore bestimmen verified_count = sum(1 for v in verifications if v["verified"]) total = len(verifications) return { "claims": verifications, "verification_score": verified_count / total if total > 0 else 1.0, "needs_correction": any(not v["verified"] for v in verifications) } async def _verify_claim(self, claim: str) -> dict: """ Verifie une affirmation specifique """ # Widersprüchliche oder bestätigende Fakten suchen facts = await self.fact_db.search(claim, top_k=3) prompt = f""" Verifie cette affirmation contre les faits suivants. Affirmation : {claim} Faits de reference : {self._format_facts(facts)} L'affirmation est-elle : - verified : supportee par les faits - contradicted : contredite par les faits - unverified : impossible a verifier Reponds en JSON avec "status" et "evidence". """ result = await self.llm.generate(prompt, temperature=0) parsed = self._parse_json(result) return { "claim": claim, "verified": parsed.get("status") == "verified", "status": parsed.get("status"), "evidence": parsed.get("evidence") }
Action Agent
DEVELOPERpythonclass ActionAgent: def __init__(self, action_registry: Dict[str, Callable], llm): self.actions = action_registry self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Execute des actions basees sur la requete """ # 1. Notwendige Aktionen bestimmen action_plan = await self._plan_actions(query, context) # 2. Aktionen ausführen results = [] for action in action_plan: if action["name"] in self.actions: try: result = await self.actions[action["name"]](**action["params"]) results.append({ "action": action["name"], "success": True, "result": result }) except Exception as e: results.append({ "action": action["name"], "success": False, "error": str(e) }) return { "actions_executed": results, "all_successful": all(r["success"] for r in results) } async def _plan_actions(self, query: str, context: dict) -> List[dict]: """ Planifie les actions a executer """ actions_desc = "\n".join([ f"- {name}: {func.__doc__ or 'No description'}" for name, func in self.actions.items() ]) prompt = f""" Planifie les actions necessaires pour satisfaire cette demande. Demande : {query} Contexte : {context} Actions disponibles : {actions_desc} Plan d'actions (JSON array) : [ {{"name": "action_name", "params": {{}}}}, ... ] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result) # Aktionsregister action_registry = { "create_ticket": create_support_ticket, "send_email": send_email, "update_order": update_order_status, "schedule_callback": schedule_callback, "apply_discount": apply_discount_code }
Fehlerbehandlung
Retry mit Backoff
DEVELOPERpythonimport asyncio from functools import wraps def with_retry(max_attempts: int = 3, backoff: float = 1.0): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: wait_time = backoff * (2 ** attempt) await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator
Sanfter Fallback
DEVELOPERpythonclass ResilientOrchestrator: def __init__(self, primary_agents, fallback_agent): self.primary = primary_agents self.fallback = fallback_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute avec fallback en cas d'echec """ try: # Haupt-Pipeline versuchen result = await self._run_primary(query, context) if self._is_valid_result(result): return result except Exception as e: logger.warning(f"Primary pipeline failed: {e}") # Fallback zum einfachen Agenten return await self.fallback(query, context) def _is_valid_result(self, result: dict) -> bool: """ Verifie si le resultat est valide """ return ( result.get("answer") and len(result["answer"]) > 10 and result.get("confidence", 0) > 0.5 )
Monitoring und Observability
DEVELOPERpythonimport time from dataclasses import dataclass @dataclass class AgentTrace: agent_name: str start_time: float end_time: float input_query: str output: dict success: bool error: str = None class TracingOrchestrator: def __init__(self, orchestrator, trace_store): self.orchestrator = orchestrator self.traces = trace_store async def process(self, query: str, context: dict = None) -> dict: """ Execute avec tracing complet """ trace_id = str(uuid.uuid4()) traces = [] # Wrapper zum Erfassen der Traces original_agents = self.orchestrator.agents.copy() for name, agent in original_agents.items(): self.orchestrator.agents[name] = self._wrap_agent(name, agent, traces) try: result = await self.orchestrator.process(query, context) # Traces speichern await self.traces.save(trace_id, traces) result["trace_id"] = trace_id result["execution_time_ms"] = sum( (t.end_time - t.start_time) * 1000 for t in traces ) return result finally: # Ursprüngliche Agenten wiederherstellen self.orchestrator.agents = original_agents def _wrap_agent(self, name: str, agent, traces: list): async def wrapped(query, context): start = time.time() try: result = await agent(query, context) traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output=result, success=True )) return result except Exception as e: traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output={}, success=False, error=str(e) )) raise return wrapped
Best Practices
1. Atomare Agents
Jeder Agent sollte eine klare, einzelne Verantwortung haben.
2. Timeouts überall
Setzen Sie Timeouts, um Blockierungen zu vermeiden.
3. Strukturierte Logs
Loggen Sie jeden Schritt für Debugging.
4. Unittests pro Agent
Testen Sie jeden Agent unabhängig vor der Integration.
5. Circuit Breaker
Temporäres Deaktivieren fehlerhafter Agents.
Weiterführende Links
- RAG Conversationnel - Memoire et contexte
- Generation LLM - Optimiser les reponses
- Evaluation RAG - Mesurer la qualite
FAQ
Orchestration multi-agents avec Ailog
Der Aufbau einer robusten Multi-Agenten-Architektur erfordert tiefgehende Expertise. Mit Ailog profitieren Sie von einer vorkonfigurierten Agenten-Infrastruktur:
- Agents specialises : recherche, verification, action
- Orchestration configurable sans code
- Monitoring integre avec traces completes
- Fallbacks automatiques et gestion des erreurs
- Scalabilite pour les charges importantes
Testez Ailog gratuitement et deployez des assistants multi-agents en quelques clics.
Tags
Verwandte Artikel
Agentic RAG 2025: Aufbau autonomer KI-Agenten (Kompletter Leitfaden)
Kompletter Agentic RAG-Leitfaden: Architektur, Design Patterns, autonome Agenten mit dynamischem Retrieval, Multi-Tool-Orchestrierung. Mit Beispielen LangGraph und CrewAI.
Guardrails für RAG: Ihre KI-Assistenten absichern
Implementieren Sie robuste Guardrails, um gefährliche, themenfremde oder unangemessene Antworten in Ihren produktiven RAG-Systemen zu vermeiden.
Konversationelles RAG: Gedächtnis und Kontext über mehrere Sitzungen
Implementieren Sie ein RAG mit konversationellem Gedächtnis: Verwaltung des Kontexts, Verlauf über mehrere Sitzungen und Personalisierung der Antworten.