Sicherheit und Compliance für RAG: DSGVO, AI Act und Best Practices
Sichern Sie Ihr RAG-System: DSGVO-Konformität, europäischer AI Act, Datenschutz und Audit. Umfassender Leitfaden für Unternehmen.
- Author
- Equipe Ailog
- Published
- Reading time
- 18 Minuten Lesezeit
- Level
- advanced
Sicherheit und Compliance für RAG : DSGVO, AI Act und Best Practices
Die Bereitstellung eines RAG-Systems im Unternehmen bedeutet oft den Umgang mit sensiblen Daten: interne Dokumente, Kundendaten, geistiges Eigentum. Dieser Leitfaden behandelt die wesentlichen Sicherheits- und Compliance-Aspekte für eine sorgenfreie Implementierung.
Der europäische Rechtsrahmen
DSGVO : Grundlagen für RAG
Die Datenschutz-Grundverordnung (DSGVO) gilt, sobald ein RAG-System personenbezogene Daten aus Europa verarbeitet.
| Grundsatz DSGVO | Anwendung RAG | |-----------------|----------------| | Rechtmäßigkeit der Verarbeitung | Rechtsgrundlage zum Indexieren und Verwenden der Daten | | Datenminimierung | Nur die notwendigen Daten indexieren | | Richtigkeit | Wissensbasis aktuell halten | | Speicherbegrenzung | Veraltete Daten löschen | | Integrität und Vertraulichkeit | Verschlüsselung und Zugriffskontrolle | | Rechenschaftspflicht | Verarbeitung dokumentieren |
AI Act : Was sich für RAG-Systeme ändert
Der europäische AI Act (stufenweise in Kraft 2024–2027) klassifiziert KI-Systeme nach Risikoniveau:
Begrenztes Risiko (Mehrheit der RAG) : • Chatbots im Kundensupport • Interne Assistenten • Dokumentensuche
Pflichten : • Transparenz: angeben, dass der Nutzer mit einer KI interagiert • Aufbewahrung von Logs für Audits
Hohes Risiko (spezifische Fälle) : • RAG für HR-Entscheidungen (Rekrutierung, Bewertung) • RAG für Kredit-/Versicherungsentscheidungen • RAG im Gesundheitsbereich mit medizinischer Auswirkung
Zusätzliche Pflichten : • Konformitätsbewertung • Risikomanagementsystem • Dokumentierte Trainingsdaten • Menschliche Aufsicht
``python Classification du risque selon l'AI Act def classify_rag_risk(use_case: dict) -> str: high_risk_domains = [ "hr_recruitment", "hr_evaluation", "credit_scoring", "insurance_pricing", "medical_diagnosis", "legal_decision" ]
if use_case.get("domain") in high_risk_domains: return "high_risk"
if use_case.get("automated_decision") and use_case.get("significant_impact"): return "high_risk"
return "limited_risk" `
Sicherheitsarchitektur
Trennung der Umgebungen
` ┌─────────────────────────────────────────────────────────────┐ │ PRODUCTION │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ API GW │──│ RAG API │──│ Vector DB │ │ │ │ (WAF) │ │ (Isolated) │ │ (Encrypted) │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Network Isolation (VPC) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ STAGING │ │ (Donnees anonymisees uniquement) │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ DEVELOPMENT │ │ (Donnees synthetiques) │ └─────────────────────────────────────────────────────────────┘ `
Ende-zu-Ende-Verschlüsselung
`python from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64
class SecureRAGStorage: def __init__(self, master_key: bytes): self.cipher = Fernet(master_key)
def encrypt_document(self, content: str, metadata: dict) -> dict: """Verschlüsselt den Inhalt vor der Speicherung""" encrypted_content = self.cipher.encrypt(content.encode())
Sensible Metadaten verschlüsseln sensitive_fields = ["author", "email", "department"] encrypted_metadata = metadata.copy()
for field in sensitive_fields: if field in encrypted_metadata: encrypted_metadata[field] = self.cipher.encrypt( str(encrypted_metadata[field]).encode() ).decode()
return { "content": encrypted_content.decode(), "metadata": encrypted_metadata, "encrypted": True }
def decrypt_document(self, encrypted_doc: dict) -> dict: """Entschlüsselt das Dokument zur Verwendung""" if not encrypted_doc.get("encrypted"): return encrypted_doc
content = self.cipher.decrypt( encrypted_doc["content"].encode() ).decode()
return { "content": content, "metadata": self._decrypt_metadata(encrypted_doc["metadata"]) }
def encrypt_embedding(self, embedding: list[float]) -> bytes: """Verschlüsselt die embeddings (optional, beeinträchtigt die Performance)""" import numpy as np embedding_bytes = np.array(embedding).tobytes() return self.cipher.encrypt(embedding_bytes) `
Feingranulare Zugriffskontrolle (RBAC)
`python from enum import Enum from functools import wraps
class Permission(Enum): READ_PUBLIC = "read:public" READ_INTERNAL = "read:internal" READ_CONFIDENTIAL = "read:confidential" READ_RESTRICTED = "read:restricted" ADMIN = "admin"
class DocumentClassification(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted"
class RBACController: def __init__(self, user_service): self.user_service = user_service
self.permission_hierarchy = { Permission.READ_PUBLIC: [DocumentClassification.PUBLIC], Permission.READ_INTERNAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL ], Permission.READ_CONFIDENTIAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL ], Permission.READ_RESTRICTED: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL, DocumentClassification.RESTRICTED ] }
async def filter_accessible_documents( self, documents: list[dict], user_id: str ) -> list[dict]: """Filtert die Dokumente nach Benutzerberechtigungen""" user = await self.user_service.get_user(user_id) user_permissions = set(user.get("permissions", []))
accessible = [] for doc in documents: doc_class = DocumentClassification( doc.get("metadata", {}).get("classification", "internal") )
if self._can_access(user_permissions, doc_class): accessible.append(doc)
return accessible
def _can_access( self, user_permissions: set, doc_classification: DocumentClassification ) -> bool: """Prüft, ob der Benutzer auf das Dokument zugreifen kann""" for perm in user_permissions: try: allowed = self.permission_hierarchy.get(Permission(perm), []) if doc_classification in allowed: return True except ValueError: continue return False `
Schutz personenbezogener Daten
Automatische Erkennung und Anonymisierung
`python import re from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine
class PIIProtector: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine()
Patterns personnalises pour la France self.french_patterns = { "FRENCH_SSN": r"\b[12][0-9]{2}[0-1][0-9][0-9]{2}[0-9]{3}[0-9]{3}[0-9]{2}\b", "FRENCH_PHONE": r"\b(?:(?:\+33|0033|0)1-9{4})\b", "IBAN_FR": r"\bFR\d{2}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{3}\b" }
def detect_pii(self, text: str, language: str = "fr") -> list[dict]: """Erkennt personenbezogene Daten im Text""" Presidio-Erkennung (email, nom, adresse, etc.) results = self.analyzer.analyze( text=text, language=language, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", "CREDIT_CARD", "IBAN_CODE" ] )
Erkennung französischer Muster french_results = self._detect_french_patterns(text)
all_results = list(results) + french_results return self._format_results(all_results)
def anonymize( self, text: str, mode: str = "replace" replace, hash, mask ) -> str: """Anonymisiert personenbezogene Daten""" results = self.analyzer.analyze(text=text, language="fr")
if mode == "replace": anonymized = self.anonymizer.anonymize( text=text, analyzer_results=results ) elif mode == "hash": anonymized = self._hash_pii(text, results) elif mode == "mask": anonymized = self._mask_pii(text, results)
return anonymized.text
def _hash_pii(self, text: str, results: list) -> str: """Ersetzt PII durch ihren Hash (mit Schlüssel reversibel)""" import hashlib
result_text = text for result in sorted(results, key=lambda x: x.start, reverse=True): original = text[result.start:result.end] hashed = hashlib.sha256(original.encode()).hexdigest()[:12] result_text = result_text[:result.start] + f"[{result.entity_type}:{hashed}]" + result_text[result.end:]
return result_text
def check_before_indexing(self, document: dict) -> dict: """Prüft und verarbeitet ein Dokument vor der Indexierung""" content = document.get("content", "")
pii_detected = self.detect_pii(content)
if pii_detected: return { "safe_to_index": False, "pii_found": pii_detected, "action_required": "review_or_anonymize", "anonymized_content": self.anonymize(content) }
return { "safe_to_index": True, "pii_found": [], "original_content": content } `
Einwilligung und Betroffenenrechte
`python class ConsentManager: def __init__(self, db): self.db = db
async def record_consent( self, user_id: str, purpose: str, data_categories: list[str], expiry_date: datetime = None ): """Speichert die Benutzerzustimmung""" consent = { "user_id": user_id, "purpose": purpose, "data_categories": data_categories, "granted_at": datetime.now(), "expires_at": expiry_date, "status": "active" }
await self.db.insert("consents", consent)
async def check_consent( self, user_id: str, purpose: str, data_category: str ) -> bool: """Überprüft, ob die Zustimmung gültig ist""" consent = await self.db.find_one("consents", { "user_id": user_id, "purpose": purpose, "data_categories": {"$in": [data_category]}, "status": "active", "$or": [ {"expires_at": None}, {"expires_at": {"$gt": datetime.now()}} ] })
return consent is not None
async def handle_deletion_request(self, user_id: str) -> dict: """Verarbeitet eine Löschanfrage (Recht auf Vergessenwerden)""" Alle Daten identifizieren user_data = await self._find_all_user_data(user_id) Aus den vector-Indizes löschen await self._delete_from_vector_db(user_data["document_ids"]) Aus Datenbanken löschen await self._delete_from_databases(user_id) Löschung protokollieren await self._log_deletion(user_id, user_data)
return { "status": "completed", "deleted_documents": len(user_data["document_ids"]), "deleted_conversations": user_data["conversation_count"], "completion_date": datetime.now().isoformat() }
async def handle_export_request(self, user_id: str) -> dict: """Verarbeitet eine Exportanfrage (Recht auf Datenübertragbarkeit)""" user_data = await self._find_all_user_data(user_id)
export = { "user_profile": user_data["profile"], "conversations": user_data["conversations"], "indexed_documents": user_data["documents"], "consents": user_data["consents"], "export_date": datetime.now().isoformat() }
return export `
Audit und Nachvollziehbarkeit
Sichere Protokollierung
`python import logging import json from datetime import datetime
class AuditLogger: def __init__(self, log_destination: str): self.logger = logging.getLogger("rag_audit") self.logger.setLevel(logging.INFO)
Handler securise (fichier chiffre ou SIEM) handler = logging.FileHandler(log_destination) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler)
def log_query( self, user_id: str, query: str, documents_accessed: list[str], response_generated: bool ): """Protokolliert jede RAG-Anfrage""" SENSIBLEN Inhalt der Anfrage NICHT protokollieren log_entry = { "event_type": "rag_query", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "query_hash": self._hash_content(query), "query_length": len(query), "documents_accessed": documents_accessed, "document_count": len(documents_accessed), "response_generated": response_generated }
self.logger.info(json.dumps(log_entry))
def log_document_access( self, user_id: str, document_id: str, access_type: str, classification: str ): """Protokolliert den Zugriff auf Dokumente""" log_entry = { "event_type": "document_access", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "document_id": document_id, "access_type": access_type, "classification": classification }
self.logger.info(json.dumps(log_entry))
def log_security_event( self, event_type: str, severity: str, details: dict ): """Protokolliert Sicherheitsereignisse""" log_entry = { "event_type": f"security_{event_type}", "timestamp": datetime.now().isoformat(), "severity": severity, "details": details }
if severity in ["high", "critical"]: self.logger.warning(json.dumps(log_entry)) self._alert_security_team(log_entry) else: self.logger.info(json.dumps(log_entry))
def _hash_content(self, content: str) -> str: import hashlib return hashlib.sha256(content.encode()).hexdigest() `
Aufbewahrung und Löschung
`python class DataRetentionManager: def __init__(self, db, vector_db, config: dict): self.db = db self.vector_db = vector_db self.config = config
async def apply_retention_policy(self): """Wendet die Aufbewahrungsrichtlinie an"""
Conversations conv_retention = self.config.get("conversation_retention_days", 365) await self._purge_old_conversations(conv_retention)
Logs d'audit (obligation legale plus longue) audit_retention = self.config.get("audit_retention_days", 2190) 6 ans await self._archive_old_audits(audit_retention)
Documents expires await self._handle_expired_documents()
Metadonnees orphelines await self._cleanup_orphaned_data()
async def _purge_old_conversations(self, days: int): """Löscht alte Konversationen""" cutoff_date = datetime.now() - timedelta(days=days)
Anonymisieren statt Löschen, falls Analysen benötigt werden await self.db.update_many( "conversations", {"created_at": {"$lt": cutoff_date}}, { "$set": { "user_id": "anonymized", "messages": [], "purged_at": datetime.now() } } )
async def _handle_expired_documents(self): """Verwaltet Dokumente mit Ablaufdatum""" expired = await self.db.find( "documents", {"expires_at": {"$lt": datetime.now()}} )
for doc in expired: Aus dem vector-Indizes löschen await self.vector_db.delete(doc["id"])
Als abgelaufen markieren await self.db.update( "documents", {"id": doc["id"]}, {"$set": {"status": "expired", "content": None}} ) `
Operative Sicherheit
Schutz gegen Injections
`python class PromptSecurityGuard: def __init__(self): self.injection_patterns = [ r"ignore\s+(previous|all|above)\s+instructions", r"disregard\s+(your|the)\s+(rules|instructions)", r"you\s+are\s+now\s+", r"pretend\s+(you|to)\s+", r"act\s+as\s+if", r"system\s:\s", r"<\|.\|>", r"\[INST\]", r"###\s(instruction|system)", ]
def check_query(self, query: str) -> dict: """Prüft die Sicherheit einer Anfrage""" query_lower = query.lower()
Erkennung von Injection-Patterns for pattern in self.injection_patterns: if re.search(pattern, query_lower, re.IGNORECASE): return { "safe": False, "reason": "potential_injection", "pattern_matched": pattern }
Detection tentatives d'exfiltration if self._detect_exfiltration_attempt(query): return { "safe": False, "reason": "potential_exfiltration" }
Detection contenu malveillant if self._detect_malicious_content(query): return { "safe": False, "reason": "malicious_content" }
return {"safe": True}
def sanitize_context(self, context: str) -> str: """Bereinigt den Kontext vor dem Einfügen in den Prompt""" Entfernt Injection-Versuche in den Dokumenten sanitized = context
for pattern in self.injection_patterns: sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE)
return sanitized
def _detect_exfiltration_attempt(self, query: str) -> bool: """Erkennt Versuche der Datenexfiltration""" exfil_patterns = [ r"list\s+all\s+(users|passwords|keys|secrets)", r"show\s+(me\s+)?(the\s+)?system\s+prompt", r"what\s+(are|is)\s+(your|the)\s+(instructions|rules)", r"dump\s+(all|the)\s+data", r"export\s+.*\s+to\s+(email|url|external)" ]
for pattern in exfil_patterns: if re.search(pattern, query.lower()): return True
return False `
Rate Limiting und DDoS-Schutz
`python from redis import Redis import time
class RateLimiter: def __init__(self, redis_client: Redis): self.redis = redis_client
async def check_rate_limit( self, identifier: str, limit_type: str = "query" ) -> dict: """Überprüft die Ratenbegrenzung"""
limits = { "query": {"requests": 60, "window": 60}, 60 req/min "heavy_query": {"requests": 10, "window": 60}, 10 req/min "indexing": {"requests": 100, "window": 3600}, 100/heure "export": {"requests": 5, "window": 86400} 5/jour }
config = limits.get(limit_type, limits["query"])
key = f"ratelimit:{limit_type}:{identifier}" current = await self.redis.incr(key)
if current == 1: await self.redis.expire(key, config["window"])
remaining = max(0, config["requests"] - current)
if current > config["requests"]: return { "allowed": False, "remaining": 0, "reset_in": await self.redis.ttl(key) }
return { "allowed": True, "remaining": remaining, "limit": config["requests"] } `
Checklist conformite
Avant mise en production
`markdown RGPD • [ ] Registre des traitements mis a jour • [ ] Analyse d'impact (DPIA) si donnees sensibles • [ ] Information des personnes concernees • [ ] Mecanismes d'exercice des droits implementes • [ ] Contrats sous-traitants (DPA) signes • [ ] Mesures de securite documentees
AI Act (si applicable) • [ ] Classification du risque effectuee • [ ] Systeme de gestion des risques (si haut risque) • [ ] Documentation technique complete • [ ] Mecanisme de supervision humaine • [ ] Tests de robustesse et biais
Securite • [ ] Chiffrement au repos et en transit • [ ] Controle d'acces RBAC configure • [ ] Audit logging actif • [ ] Tests de penetration effectues • [ ] Plan de reponse aux incidents • [ ] Sauvegardes testees
Operationnel • [ ] Politique de retention definie • [ ] Processus de purge automatise • [ ] Monitoring securite actif • [ ] Formation equipes effectuee ``
Weiterführende Informationen • RGPD et Chatbots - Fokus Kundenchats • AI Act et RAG - Regulatorische Auswirkungen • Donnees sensibles - Verarbeitung kritischer Daten
---
Vereinfachte Compliance mit Ailog
Die DSGVO- und AI Act-Compliance kann komplex wirken. Mit Ailog profitieren Sie von einer bereits konformen Infrastruktur: • Hebergement 100% France chez OVH, donnees jamais transferees hors UE • Chiffrement bout en bout AES-256 • RBAC natif avec gestion des permissions par document • Anonymisation automatique des donnees personnelles • Logs d'audit exportables pour vos DPO • DPA signe et documentation RGPD fournie
Testez Ailog gratuitement et deployez un RAG conforme des aujourd'hui.
---
FAQ
Ist ein RAG-System von der DSGVO betroffen?
Ja — sobald es personenbezogene Daten (Namen, E-Mails, Konversationen) verarbeitet. Ein RAG gilt als automatisierte Verarbeitung, die Einwilligung, Information der Betroffenen und Sicherheitsmaßnahmen erfordert. Auch vector-Daten sind betroffen, da sie die Rekonstruktion der Originaltexte ermöglichen.
Gilt der AI Act für RAG-Chatbots?
Das hängt vom Anwendungsfall ab. Ein Chatbot im Kundendienst ist in der Regel "limited risk" (Transparenzpflichten). Ein RAG, das für HR- oder Kreditentscheidungen eingesetzt wird, wäre "high risk" mit strengen Anforderungen an Dokumentation und menschliche Aufsicht.
Wie anonymisiert man Daten in einem RAG?
Drei Ansätze: Entfernen von Identifikatoren vor der Indexierung, Pseudonymisierung mit sicherem Mapping oder dynamisches Filtern bei der Generierung. Anonymisierung sollte irreversibel sein für nicht benötigte Daten, reversibel (Pseudonymisierung) falls Nachvollziehbarkeit erforderlich ist.
Kann man ein RAG außerhalb der EU hosten?
Technisch ja, aber das erschwert die DSGVO-Compliance. Übermittlungen außerhalb der EU benötigen Garantien (Standardvertragsklauseln, Angemessenheitsentscheidungen). Für sensible Daten wird souveränes Hosting in Frankreich dringend empfohlen.
Welche Logs sollte man für Audits aufbewahren?
Mindestens: Nutzerkennzeichen, Timestamp, Anfrage, genutzte Quellen, generierte Antwort. Für AI Act: automatisierte Entscheidungen, menschliche Eingriffe, Vorfälle. Aufbewahrungsdauer: je nach Richtlinie meist 1 bis 3 Jahre.