AnleitungExperte

RAG-Agenten: Orchestrierung von Multi-Agenten-Systemen

5. März 2026
20 Minuten Lesezeit
Equipe Ailog

Konzipieren Sie RAG-basierte Multi-Agenten-Systeme: Orchestrierung, Spezialisierung, Zusammenarbeit und Fehlerbehandlung für komplexe Assistenten.

Agents RAG : Orchestrieren von Multi-Agenten-Systemen

Ein einfacher RAG-Agent folgt einer linearen Pipeline: retrieval, generation, antwort. Ein Multi-Agenten-System orchestriert mehrere spezialisierte Agents, die zusammenarbeiten, um komplexe Aufgaben zu bearbeiten. Dieser Leitfaden untersucht Architekturen und Patterns zum Aufbau solcher Systeme.

Warum Multi-Agenten?

Grenzen des monolithischen RAG

Ein klassischer RAG stößt bei bestimmten Aufgaben an Grenzen:

TacheProbleme RAG simple
Questions multi-sourcesEin einziges retrieval, begrenzter Kontext
Raisonnement complexeKeine Problemzerlegung
Actions multiplesLineare Pipeline, keine Schleife
Verification des faitsKein Double-Check
Taches longuesKeine Planung

Der Multi-Agenten-Ansatz

Teilen, um besser zu herrschen:

Question complexe
       │
       ▼
┌─────────────────┐
│  ORCHESTRATEUR  │ ← Decompose et coordonne
└────────┬────────┘
         │
    ┌────┼────┬────────────┐
    ▼    ▼    ▼            ▼
┌──────┐┌──────┐┌──────┐┌──────┐
│Agent ││Agent ││Agent ││Agent │
│Search││Reason││Verify││Action│
└──────┘└──────┘└──────┘└──────┘
    │    │    │            │
    └────┴────┴────────────┘
                │
                ▼
         Reponse finale

Orchestrierungsarchitekturen

Pattern 1 : Routeur

Der Orchestrator routet an den spezialisierten Agenten.

DEVELOPERpython
from enum import Enum from typing import Callable, Dict class AgentType(Enum): FAQ = "faq" TECHNICAL = "technical" SALES = "sales" ESCALATION = "escalation" class RouterOrchestrator: def __init__(self, agents: Dict[AgentType, Callable], classifier): self.agents = agents self.classifier = classifier async def process(self, query: str, context: dict = None) -> dict: """ Route la requete vers l'agent approprie """ # 1. Die Anfrage klassifizieren classification = await self.classifier.classify(query) agent_type = AgentType(classification["agent"]) # 2. Agenten aufrufen agent = self.agents.get(agent_type) if not agent: agent = self.agents[AgentType.ESCALATION] result = await agent(query, context) return { "response": result["answer"], "agent_used": agent_type.value, "confidence": classification["confidence"], "sources": result.get("sources", []) } class IntentClassifier: def __init__(self, llm): self.llm = llm async def classify(self, query: str) -> dict: prompt = f""" Classifie cette requete utilisateur. Categories : - faq : Questions generales, policies, informations - technical : Problemes techniques, bugs, configurations - sales : Pricing, achats, abonnements - escalation : Plaintes, urgences, demandes sensibles Requete : {query} Reponds en JSON : {{"agent": "...", "confidence": 0.0-1.0}} """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)

Pattern 2 : Sequenzielle Pipeline

Die Agents werden nacheinander ausgeführt, jeder erweitert den Kontext.

DEVELOPERpython
from dataclasses import dataclass from typing import List @dataclass class AgentResult: agent_name: str output: dict success: bool error: str = None class SequentialPipeline: def __init__(self, agents: List[tuple]): """ agents: List de (nom, agent_callable) """ self.agents = agents async def process(self, query: str, initial_context: dict = None) -> dict: """ Execute les agents en sequence """ context = initial_context or {} context["original_query"] = query results = [] for agent_name, agent in self.agents: try: result = await agent(query, context) agent_result = AgentResult( agent_name=agent_name, output=result, success=True ) # Kontext für den nächsten Agenten anreichern context[f"{agent_name}_result"] = result context["last_result"] = result except Exception as e: agent_result = AgentResult( agent_name=agent_name, output={}, success=False, error=str(e) ) results.append(agent_result) # Abbrechen, wenn ein Agent fehlschlägt (optional) if not agent_result.success and self.stop_on_failure: break return { "final_result": context.get("last_result"), "pipeline_results": results, "context": context } # Exemple de pipeline pipeline = SequentialPipeline([ ("query_analyzer", QueryAnalyzerAgent()), ("retriever", RetrievalAgent()), ("fact_checker", FactCheckAgent()), ("generator", GenerationAgent()), ("citation_adder", CitationAgent()) ])

Pattern 3 : Parallel mit Fusion

Mehrere Agents arbeiten parallel, danach werden die Ergebnisse zusammengeführt.

DEVELOPERpython
import asyncio from typing import List, Callable class ParallelOrchestrator: def __init__( self, agents: List[tuple], fusion_agent: Callable ): self.agents = agents # (name, agent, weight) self.fusion = fusion_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute les agents en parallele puis fusionne """ # Alle Agenten parallel starten tasks = [ self._run_agent(name, agent, query, context) for name, agent, _ in self.agents ] results = await asyncio.gather(*tasks, return_exceptions=True) # Gültige Ergebnisse sammeln valid_results = [] for i, result in enumerate(results): name, _, weight = self.agents[i] if not isinstance(result, Exception): valid_results.append({ "agent": name, "result": result, "weight": weight }) # Ergebnisse fusionieren fused = await self.fusion(query, valid_results) return { "response": fused["answer"], "contributing_agents": [r["agent"] for r in valid_results], "fusion_confidence": fused.get("confidence") } async def _run_agent(self, name: str, agent: Callable, query: str, context: dict): try: return await asyncio.wait_for( agent(query, context), timeout=10.0 ) except asyncio.TimeoutError: return {"error": "timeout", "agent": name} class FusionAgent: def __init__(self, llm): self.llm = llm async def __call__(self, query: str, results: List[dict]) -> dict: """ Fusionne les reponses de plusieurs agents """ responses_text = "\n\n".join([ f"Agent {r['agent']} (poids {r['weight']}):\n{r['result'].get('answer', 'N/A')}" for r in results ]) prompt = f""" Tu dois synthetiser ces reponses de differents agents en une seule reponse coherente. Question originale : {query} Reponses des agents : {responses_text} Regles : 1. Priorise les agents avec le poids le plus eleve 2. En cas de contradiction, indique les deux points de vue 3. Cite les sources si disponibles 4. Genere une reponse unique et coherente Reponse synthetisee : """ answer = await self.llm.generate(prompt, temperature=0.3) return { "answer": answer, "confidence": self._calculate_confidence(results) }

Pattern 4 : ReAct (Reasoning + Acting)

Der Agent denkt nach, handelt, beobachtet und iteriert.

DEVELOPERpython
from typing import Dict, Any class ReActAgent: def __init__(self, llm, tools: Dict[str, Callable], max_iterations: int = 5): self.llm = llm self.tools = tools self.max_iterations = max_iterations async def process(self, query: str) -> dict: """ Execute le pattern ReAct : Thought -> Action -> Observation """ history = [] final_answer = None for i in range(self.max_iterations): # Den nächsten Gedanken/die nächste Aktion erzeugen step = await self._generate_step(query, history) if step["type"] == "thought": history.append({"thought": step["content"]}) elif step["type"] == "action": # Aktion ausführen tool_name = step["tool"] tool_input = step["input"] if tool_name in self.tools: observation = await self.tools[tool_name](tool_input) else: observation = f"Outil '{tool_name}' non disponible" history.append({ "action": f"{tool_name}({tool_input})", "observation": observation }) elif step["type"] == "answer": final_answer = step["content"] break return { "answer": final_answer, "reasoning_trace": history, "iterations": i + 1 } async def _generate_step(self, query: str, history: list) -> dict: """ Genere la prochaine etape (pensee, action ou reponse) """ history_text = self._format_history(history) tools_desc = self._format_tools() prompt = f""" Tu es un agent qui resout des problemes etape par etape. Question : {query} Outils disponibles : {tools_desc} Historique : {history_text} Prochaine etape (choisis UN format) : Pensee : [ton raisonnement] OU Action : [nom_outil]([parametres]) OU Reponse : [ta reponse finale] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_step(result) # Outils pour l'agent ReAct tools = { "search_kb": lambda q: rag_search(q), "calculate": lambda expr: eval(expr), "get_current_date": lambda _: datetime.now().isoformat(), "check_inventory": lambda product_id: inventory_api.check(product_id) }

Spezialisierte Agents

Search Agent

DEVELOPERpython
class SearchAgent: def __init__(self, retrievers: Dict[str, Retriever], llm): self.retrievers = retrievers self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Agent specialise dans la recherche multi-sources """ # 1. Relevante Quellen bestimmen sources = await self._select_sources(query) # 2. In jeder Quelle suchen all_results = [] for source_name in sources: retriever = self.retrievers.get(source_name) if retriever: results = await retriever.search(query) for r in results: r["source"] = source_name all_results.extend(results) # 3. Ergebnisse reranken ranked = await self._rerank(query, all_results) return { "documents": ranked[:10], "sources_used": sources, "total_found": len(all_results) } async def _select_sources(self, query: str) -> List[str]: """ Selectionne les sources pertinentes pour la requete """ prompt = f""" Quelles sources sont pertinentes pour cette question ? Sources disponibles : - kb_general : Base de connaissances generale - kb_technical : Documentation technique - kb_support : Historique tickets support - kb_products : Catalogue produits Question : {query} Sources pertinentes (liste JSON) : """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result)

Verification Agent

DEVELOPERpython
class VerificationAgent: def __init__(self, llm, fact_db): self.llm = llm self.fact_db = fact_db async def __call__(self, query: str, context: dict) -> dict: """ Verifie les affirmations contre des sources fiables """ # Die zu überprüfende Antwort abrufen answer = context.get("last_result", {}).get("answer", "") # 1. Aussagen extrahieren claims = await self._extract_claims(answer) # 2. Jede Aussage überprüfen verifications = [] for claim in claims: verification = await self._verify_claim(claim) verifications.append(verification) # 3. Gesamtscore bestimmen verified_count = sum(1 for v in verifications if v["verified"]) total = len(verifications) return { "claims": verifications, "verification_score": verified_count / total if total > 0 else 1.0, "needs_correction": any(not v["verified"] for v in verifications) } async def _verify_claim(self, claim: str) -> dict: """ Verifie une affirmation specifique """ # Widersprüchliche oder bestätigende Fakten suchen facts = await self.fact_db.search(claim, top_k=3) prompt = f""" Verifie cette affirmation contre les faits suivants. Affirmation : {claim} Faits de reference : {self._format_facts(facts)} L'affirmation est-elle : - verified : supportee par les faits - contradicted : contredite par les faits - unverified : impossible a verifier Reponds en JSON avec "status" et "evidence". """ result = await self.llm.generate(prompt, temperature=0) parsed = self._parse_json(result) return { "claim": claim, "verified": parsed.get("status") == "verified", "status": parsed.get("status"), "evidence": parsed.get("evidence") }

Action Agent

DEVELOPERpython
class ActionAgent: def __init__(self, action_registry: Dict[str, Callable], llm): self.actions = action_registry self.llm = llm async def __call__(self, query: str, context: dict) -> dict: """ Execute des actions basees sur la requete """ # 1. Notwendige Aktionen bestimmen action_plan = await self._plan_actions(query, context) # 2. Aktionen ausführen results = [] for action in action_plan: if action["name"] in self.actions: try: result = await self.actions[action["name"]](**action["params"]) results.append({ "action": action["name"], "success": True, "result": result }) except Exception as e: results.append({ "action": action["name"], "success": False, "error": str(e) }) return { "actions_executed": results, "all_successful": all(r["success"] for r in results) } async def _plan_actions(self, query: str, context: dict) -> List[dict]: """ Planifie les actions a executer """ actions_desc = "\n".join([ f"- {name}: {func.__doc__ or 'No description'}" for name, func in self.actions.items() ]) prompt = f""" Planifie les actions necessaires pour satisfaire cette demande. Demande : {query} Contexte : {context} Actions disponibles : {actions_desc} Plan d'actions (JSON array) : [ {{"name": "action_name", "params": {{}}}}, ... ] """ result = await self.llm.generate(prompt, temperature=0) return self._parse_json(result) # Aktionsregister action_registry = { "create_ticket": create_support_ticket, "send_email": send_email, "update_order": update_order_status, "schedule_callback": schedule_callback, "apply_discount": apply_discount_code }

Fehlerbehandlung

Retry mit Backoff

DEVELOPERpython
import asyncio from functools import wraps def with_retry(max_attempts: int = 3, backoff: float = 1.0): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: wait_time = backoff * (2 ** attempt) await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator

Sanfter Fallback

DEVELOPERpython
class ResilientOrchestrator: def __init__(self, primary_agents, fallback_agent): self.primary = primary_agents self.fallback = fallback_agent async def process(self, query: str, context: dict = None) -> dict: """ Execute avec fallback en cas d'echec """ try: # Haupt-Pipeline versuchen result = await self._run_primary(query, context) if self._is_valid_result(result): return result except Exception as e: logger.warning(f"Primary pipeline failed: {e}") # Fallback zum einfachen Agenten return await self.fallback(query, context) def _is_valid_result(self, result: dict) -> bool: """ Verifie si le resultat est valide """ return ( result.get("answer") and len(result["answer"]) > 10 and result.get("confidence", 0) > 0.5 )

Monitoring und Observability

DEVELOPERpython
import time from dataclasses import dataclass @dataclass class AgentTrace: agent_name: str start_time: float end_time: float input_query: str output: dict success: bool error: str = None class TracingOrchestrator: def __init__(self, orchestrator, trace_store): self.orchestrator = orchestrator self.traces = trace_store async def process(self, query: str, context: dict = None) -> dict: """ Execute avec tracing complet """ trace_id = str(uuid.uuid4()) traces = [] # Wrapper zum Erfassen der Traces original_agents = self.orchestrator.agents.copy() for name, agent in original_agents.items(): self.orchestrator.agents[name] = self._wrap_agent(name, agent, traces) try: result = await self.orchestrator.process(query, context) # Traces speichern await self.traces.save(trace_id, traces) result["trace_id"] = trace_id result["execution_time_ms"] = sum( (t.end_time - t.start_time) * 1000 for t in traces ) return result finally: # Ursprüngliche Agenten wiederherstellen self.orchestrator.agents = original_agents def _wrap_agent(self, name: str, agent, traces: list): async def wrapped(query, context): start = time.time() try: result = await agent(query, context) traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output=result, success=True )) return result except Exception as e: traces.append(AgentTrace( agent_name=name, start_time=start, end_time=time.time(), input_query=query, output={}, success=False, error=str(e) )) raise return wrapped

Best Practices

1. Atomare Agents

Jeder Agent sollte eine klare, einzelne Verantwortung haben.

2. Timeouts überall

Setzen Sie Timeouts, um Blockierungen zu vermeiden.

3. Strukturierte Logs

Loggen Sie jeden Schritt für Debugging.

4. Unittests pro Agent

Testen Sie jeden Agent unabhängig vor der Integration.

5. Circuit Breaker

Temporäres Deaktivieren fehlerhafter Agents.

Weiterführende Links

FAQ

Ein einfacher RAG folgt einer linearen Pipeline: requete, retrieval, generation, reponse. Ein Multi-Agenten-System orchestriert mehrere spezialisierte Agents, die zusammenarbeiten. Beispielsweise findet ein Search-Agent Dokumente, ein Verification-Agent validiert Fakten und ein Generation-Agent formuliert die Antwort. Diese Architektur ermöglicht das Bearbeiten komplexer Aufgaben, die Reasoning und mehrere Aktionen erfordern.
Das ReAct-Pattern (Reasoning + Acting) eignet sich für Aufgaben, die mehrere Iterationen und dynamische Entscheidungen benötigen: Der Agent denkt nach, handelt, beobachtet das Ergebnis und entscheidet über das weitere Vorgehen. Verwenden Sie es für komplexe Fragen, explorative Recherchen oder Aufgaben mit Unsicherheit. Die sequentielle Pipeline passt besser zu vordefinierten, vorhersehbaren Workflows, bei denen jeder Schritt im Voraus bekannt ist.
Implementieren Sie mehrere Resilienzeebenen: Retry mit exponentiellem Backoff für transiente Fehler, Fallback zu Ersatz-Agents bei Ausfall, Circuit Breaker, um fehlerhafte Agents temporär zu deaktivieren, und strikte Timeouts, um Blockaden zu vermeiden. Planen Sie immer einen ultimativen Fallback zu einem einfachen Agenten, der zumindest eine degradierte Antwort liefern kann.
Nicht zwangsläufig. Mit dem Parallel-Pattern arbeiten mehrere Agents gleichzeitig und die Gesamtlaufzeit entspricht der des langsamsten Agenten. Die sequentielle Pipeline erhöht die Latenz, kann aber mit Caching und leichten Agents optimiert werden. Der Kompromiss zwischen Latenz und Qualität hängt vom Use-Case ab: Priorisieren Sie Geschwindigkeit für den Kundensupport, Qualität für die Analyse komplexer Dokumente.
Tracing ist essenziell: Jeder Agent sollte Eingaben, Ausgaben, Ausführungsdauer und Fehler mit einer eindeutigen trace_id loggen, um den kompletten Weg einer Anfrage nachzuvollziehen. Implementieren Sie Monitoring-Dashboards, die Erfolgsraten pro Agent, Latenzen und Fehlerpattern anzeigen. Traces ermöglichen das Reproduzieren problematischer Anfragen zum Fixen von Bugs. ---

Orchestration multi-agents avec Ailog

Der Aufbau einer robusten Multi-Agenten-Architektur erfordert tiefgehende Expertise. Mit Ailog profitieren Sie von einer vorkonfigurierten Agenten-Infrastruktur:

  • Agents specialises : recherche, verification, action
  • Orchestration configurable sans code
  • Monitoring integre avec traces completes
  • Fallbacks automatiques et gestion des erreurs
  • Scalabilite pour les charges importantes

Testez Ailog gratuitement et deployez des assistants multi-agents en quelques clics.

Tags

RAGagentsorchestrationmulti-agentsLLMarchitecture

Verwandte Artikel

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !