Sicherheit und Compliance für RAG: DSGVO, AI Act und Best Practices
Sichern Sie Ihr RAG-System: DSGVO-Konformität, europäischer AI Act, Datenschutz und Audit. Umfassender Leitfaden für Unternehmen.
Sicherheit und Compliance für RAG : DSGVO, AI Act und Best Practices
Die Bereitstellung eines RAG-Systems im Unternehmen bedeutet oft den Umgang mit sensiblen Daten: interne Dokumente, Kundendaten, geistiges Eigentum. Dieser Leitfaden behandelt die wesentlichen Sicherheits- und Compliance-Aspekte für eine sorgenfreie Implementierung.
Der europäische Rechtsrahmen
DSGVO : Grundlagen für RAG
Die Datenschutz-Grundverordnung (DSGVO) gilt, sobald ein RAG-System personenbezogene Daten aus Europa verarbeitet.
| Grundsatz DSGVO | Anwendung RAG |
|---|---|
| Rechtmäßigkeit der Verarbeitung | Rechtsgrundlage zum Indexieren und Verwenden der Daten |
| Datenminimierung | Nur die notwendigen Daten indexieren |
| Richtigkeit | Wissensbasis aktuell halten |
| Speicherbegrenzung | Veraltete Daten löschen |
| Integrität und Vertraulichkeit | Verschlüsselung und Zugriffskontrolle |
| Rechenschaftspflicht | Verarbeitung dokumentieren |
AI Act : Was sich für RAG-Systeme ändert
Der europäische AI Act (stufenweise in Kraft 2024–2027) klassifiziert KI-Systeme nach Risikoniveau:
Begrenztes Risiko (Mehrheit der RAG) :
- Chatbots im Kundensupport
- Interne Assistenten
- Dokumentensuche
Pflichten :
- Transparenz: angeben, dass der Nutzer mit einer KI interagiert
- Aufbewahrung von Logs für Audits
Hohes Risiko (spezifische Fälle) :
- RAG für HR-Entscheidungen (Rekrutierung, Bewertung)
- RAG für Kredit-/Versicherungsentscheidungen
- RAG im Gesundheitsbereich mit medizinischer Auswirkung
Zusätzliche Pflichten :
- Konformitätsbewertung
- Risikomanagementsystem
- Dokumentierte Trainingsdaten
- Menschliche Aufsicht
DEVELOPERpython# Classification du risque selon l'AI Act def classify_rag_risk(use_case: dict) -> str: high_risk_domains = [ "hr_recruitment", "hr_evaluation", "credit_scoring", "insurance_pricing", "medical_diagnosis", "legal_decision" ] if use_case.get("domain") in high_risk_domains: return "high_risk" if use_case.get("automated_decision") and use_case.get("significant_impact"): return "high_risk" return "limited_risk"
Sicherheitsarchitektur
Trennung der Umgebungen
┌─────────────────────────────────────────────────────────────┐
│ PRODUCTION │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ API GW │──│ RAG API │──│ Vector DB │ │
│ │ (WAF) │ │ (Isolated) │ │ (Encrypted) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Network Isolation (VPC) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ STAGING │
│ (Donnees anonymisees uniquement) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ DEVELOPMENT │
│ (Donnees synthetiques) │
└─────────────────────────────────────────────────────────────┘
Ende-zu-Ende-Verschlüsselung
DEVELOPERpythonfrom cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 class SecureRAGStorage: def __init__(self, master_key: bytes): self.cipher = Fernet(master_key) def encrypt_document(self, content: str, metadata: dict) -> dict: """Verschlüsselt den Inhalt vor der Speicherung""" encrypted_content = self.cipher.encrypt(content.encode()) # Sensible Metadaten verschlüsseln sensitive_fields = ["author", "email", "department"] encrypted_metadata = metadata.copy() for field in sensitive_fields: if field in encrypted_metadata: encrypted_metadata[field] = self.cipher.encrypt( str(encrypted_metadata[field]).encode() ).decode() return { "content": encrypted_content.decode(), "metadata": encrypted_metadata, "encrypted": True } def decrypt_document(self, encrypted_doc: dict) -> dict: """Entschlüsselt das Dokument zur Verwendung""" if not encrypted_doc.get("encrypted"): return encrypted_doc content = self.cipher.decrypt( encrypted_doc["content"].encode() ).decode() return { "content": content, "metadata": self._decrypt_metadata(encrypted_doc["metadata"]) } def encrypt_embedding(self, embedding: list[float]) -> bytes: """Verschlüsselt die embeddings (optional, beeinträchtigt die Performance)""" import numpy as np embedding_bytes = np.array(embedding).tobytes() return self.cipher.encrypt(embedding_bytes)
Feingranulare Zugriffskontrolle (RBAC)
DEVELOPERpythonfrom enum import Enum from functools import wraps class Permission(Enum): READ_PUBLIC = "read:public" READ_INTERNAL = "read:internal" READ_CONFIDENTIAL = "read:confidential" READ_RESTRICTED = "read:restricted" ADMIN = "admin" class DocumentClassification(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted" class RBACController: def __init__(self, user_service): self.user_service = user_service self.permission_hierarchy = { Permission.READ_PUBLIC: [DocumentClassification.PUBLIC], Permission.READ_INTERNAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL ], Permission.READ_CONFIDENTIAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL ], Permission.READ_RESTRICTED: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL, DocumentClassification.RESTRICTED ] } async def filter_accessible_documents( self, documents: list[dict], user_id: str ) -> list[dict]: """Filtert die Dokumente nach Benutzerberechtigungen""" user = await self.user_service.get_user(user_id) user_permissions = set(user.get("permissions", [])) accessible = [] for doc in documents: doc_class = DocumentClassification( doc.get("metadata", {}).get("classification", "internal") ) if self._can_access(user_permissions, doc_class): accessible.append(doc) return accessible def _can_access( self, user_permissions: set, doc_classification: DocumentClassification ) -> bool: """Prüft, ob der Benutzer auf das Dokument zugreifen kann""" for perm in user_permissions: try: allowed = self.permission_hierarchy.get(Permission(perm), []) if doc_classification in allowed: return True except ValueError: continue return False
Schutz personenbezogener Daten
Automatische Erkennung und Anonymisierung
DEVELOPERpythonimport re from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine class PIIProtector: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine() # Patterns personnalises pour la France self.french_patterns = { "FRENCH_SSN": r"\b[12][0-9]{2}[0-1][0-9][0-9]{2}[0-9]{3}[0-9]{3}[0-9]{2}\b", "FRENCH_PHONE": r"\b(?:(?:\+33|0033|0)[1-9](?:[.\-\s]?\d{2}){4})\b", "IBAN_FR": r"\bFR\d{2}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{3}\b" } def detect_pii(self, text: str, language: str = "fr") -> list[dict]: """Erkennt personenbezogene Daten im Text""" # Presidio-Erkennung (email, nom, adresse, etc.) results = self.analyzer.analyze( text=text, language=language, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", "CREDIT_CARD", "IBAN_CODE" ] ) # Erkennung französischer Muster french_results = self._detect_french_patterns(text) all_results = list(results) + french_results return self._format_results(all_results) def anonymize( self, text: str, mode: str = "replace" # replace, hash, mask ) -> str: """Anonymisiert personenbezogene Daten""" results = self.analyzer.analyze(text=text, language="fr") if mode == "replace": anonymized = self.anonymizer.anonymize( text=text, analyzer_results=results ) elif mode == "hash": anonymized = self._hash_pii(text, results) elif mode == "mask": anonymized = self._mask_pii(text, results) return anonymized.text def _hash_pii(self, text: str, results: list) -> str: """Ersetzt PII durch ihren Hash (mit Schlüssel reversibel)""" import hashlib result_text = text for result in sorted(results, key=lambda x: x.start, reverse=True): original = text[result.start:result.end] hashed = hashlib.sha256(original.encode()).hexdigest()[:12] result_text = result_text[:result.start] + f"[{result.entity_type}:{hashed}]" + result_text[result.end:] return result_text def check_before_indexing(self, document: dict) -> dict: """Prüft und verarbeitet ein Dokument vor der Indexierung""" content = document.get("content", "") pii_detected = self.detect_pii(content) if pii_detected: return { "safe_to_index": False, "pii_found": pii_detected, "action_required": "review_or_anonymize", "anonymized_content": self.anonymize(content) } return { "safe_to_index": True, "pii_found": [], "original_content": content }
Einwilligung und Betroffenenrechte
DEVELOPERpythonclass ConsentManager: def __init__(self, db): self.db = db async def record_consent( self, user_id: str, purpose: str, data_categories: list[str], expiry_date: datetime = None ): """Speichert die Benutzerzustimmung""" consent = { "user_id": user_id, "purpose": purpose, "data_categories": data_categories, "granted_at": datetime.now(), "expires_at": expiry_date, "status": "active" } await self.db.insert("consents", consent) async def check_consent( self, user_id: str, purpose: str, data_category: str ) -> bool: """Überprüft, ob die Zustimmung gültig ist""" consent = await self.db.find_one("consents", { "user_id": user_id, "purpose": purpose, "data_categories": {"$in": [data_category]}, "status": "active", "$or": [ {"expires_at": None}, {"expires_at": {"$gt": datetime.now()}} ] }) return consent is not None async def handle_deletion_request(self, user_id: str) -> dict: """Verarbeitet eine Löschanfrage (Recht auf Vergessenwerden)""" # 1. Alle Daten identifizieren user_data = await self._find_all_user_data(user_id) # 2. Aus den vector-Indizes löschen await self._delete_from_vector_db(user_data["document_ids"]) # 3. Aus Datenbanken löschen await self._delete_from_databases(user_id) # 4. Löschung protokollieren await self._log_deletion(user_id, user_data) return { "status": "completed", "deleted_documents": len(user_data["document_ids"]), "deleted_conversations": user_data["conversation_count"], "completion_date": datetime.now().isoformat() } async def handle_export_request(self, user_id: str) -> dict: """Verarbeitet eine Exportanfrage (Recht auf Datenübertragbarkeit)""" user_data = await self._find_all_user_data(user_id) export = { "user_profile": user_data["profile"], "conversations": user_data["conversations"], "indexed_documents": user_data["documents"], "consents": user_data["consents"], "export_date": datetime.now().isoformat() } return export
Audit und Nachvollziehbarkeit
Sichere Protokollierung
DEVELOPERpythonimport logging import json from datetime import datetime class AuditLogger: def __init__(self, log_destination: str): self.logger = logging.getLogger("rag_audit") self.logger.setLevel(logging.INFO) # Handler securise (fichier chiffre ou SIEM) handler = logging.FileHandler(log_destination) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) def log_query( self, user_id: str, query: str, documents_accessed: list[str], response_generated: bool ): """Protokolliert jede RAG-Anfrage""" # SENSIBLEN Inhalt der Anfrage NICHT protokollieren log_entry = { "event_type": "rag_query", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "query_hash": self._hash_content(query), "query_length": len(query), "documents_accessed": documents_accessed, "document_count": len(documents_accessed), "response_generated": response_generated } self.logger.info(json.dumps(log_entry)) def log_document_access( self, user_id: str, document_id: str, access_type: str, classification: str ): """Protokolliert den Zugriff auf Dokumente""" log_entry = { "event_type": "document_access", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "document_id": document_id, "access_type": access_type, "classification": classification } self.logger.info(json.dumps(log_entry)) def log_security_event( self, event_type: str, severity: str, details: dict ): """Protokolliert Sicherheitsereignisse""" log_entry = { "event_type": f"security_{event_type}", "timestamp": datetime.now().isoformat(), "severity": severity, "details": details } if severity in ["high", "critical"]: self.logger.warning(json.dumps(log_entry)) self._alert_security_team(log_entry) else: self.logger.info(json.dumps(log_entry)) def _hash_content(self, content: str) -> str: import hashlib return hashlib.sha256(content.encode()).hexdigest()
Aufbewahrung und Löschung
DEVELOPERpythonclass DataRetentionManager: def __init__(self, db, vector_db, config: dict): self.db = db self.vector_db = vector_db self.config = config async def apply_retention_policy(self): """Wendet die Aufbewahrungsrichtlinie an""" # Conversations conv_retention = self.config.get("conversation_retention_days", 365) await self._purge_old_conversations(conv_retention) # Logs d'audit (obligation legale plus longue) audit_retention = self.config.get("audit_retention_days", 2190) # 6 ans await self._archive_old_audits(audit_retention) # Documents expires await self._handle_expired_documents() # Metadonnees orphelines await self._cleanup_orphaned_data() async def _purge_old_conversations(self, days: int): """Löscht alte Konversationen""" cutoff_date = datetime.now() - timedelta(days=days) # Anonymisieren statt Löschen, falls Analysen benötigt werden await self.db.update_many( "conversations", {"created_at": {"$lt": cutoff_date}}, { "$set": { "user_id": "anonymized", "messages": [], "purged_at": datetime.now() } } ) async def _handle_expired_documents(self): """Verwaltet Dokumente mit Ablaufdatum""" expired = await self.db.find( "documents", {"expires_at": {"$lt": datetime.now()}} ) for doc in expired: # Aus dem vector-Indizes löschen await self.vector_db.delete(doc["id"]) # Als abgelaufen markieren await self.db.update( "documents", {"id": doc["id"]}, {"$set": {"status": "expired", "content": None}} )
Operative Sicherheit
Schutz gegen Injections
DEVELOPERpythonclass PromptSecurityGuard: def __init__(self): self.injection_patterns = [ r"ignore\s+(previous|all|above)\s+instructions", r"disregard\s+(your|the)\s+(rules|instructions)", r"you\s+are\s+now\s+", r"pretend\s+(you|to)\s+", r"act\s+as\s+if", r"system\s*:\s*", r"<\|.*\|>", r"\[INST\]", r"###\s*(instruction|system)", ] def check_query(self, query: str) -> dict: """Prüft die Sicherheit einer Anfrage""" query_lower = query.lower() # Erkennung von Injection-Patterns for pattern in self.injection_patterns: if re.search(pattern, query_lower, re.IGNORECASE): return { "safe": False, "reason": "potential_injection", "pattern_matched": pattern } # Detection tentatives d'exfiltration if self._detect_exfiltration_attempt(query): return { "safe": False, "reason": "potential_exfiltration" } # Detection contenu malveillant if self._detect_malicious_content(query): return { "safe": False, "reason": "malicious_content" } return {"safe": True} def sanitize_context(self, context: str) -> str: """Bereinigt den Kontext vor dem Einfügen in den Prompt""" # Entfernt Injection-Versuche in den Dokumenten sanitized = context for pattern in self.injection_patterns: sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE) return sanitized def _detect_exfiltration_attempt(self, query: str) -> bool: """Erkennt Versuche der Datenexfiltration""" exfil_patterns = [ r"list\s+all\s+(users|passwords|keys|secrets)", r"show\s+(me\s+)?(the\s+)?system\s+prompt", r"what\s+(are|is)\s+(your|the)\s+(instructions|rules)", r"dump\s+(all|the)\s+data", r"export\s+.*\s+to\s+(email|url|external)" ] for pattern in exfil_patterns: if re.search(pattern, query.lower()): return True return False
Rate Limiting und DDoS-Schutz
DEVELOPERpythonfrom redis import Redis import time class RateLimiter: def __init__(self, redis_client: Redis): self.redis = redis_client async def check_rate_limit( self, identifier: str, limit_type: str = "query" ) -> dict: """Überprüft die Ratenbegrenzung""" limits = { "query": {"requests": 60, "window": 60}, # 60 req/min "heavy_query": {"requests": 10, "window": 60}, # 10 req/min "indexing": {"requests": 100, "window": 3600}, # 100/heure "export": {"requests": 5, "window": 86400} # 5/jour } config = limits.get(limit_type, limits["query"]) key = f"ratelimit:{limit_type}:{identifier}" current = await self.redis.incr(key) if current == 1: await self.redis.expire(key, config["window"]) remaining = max(0, config["requests"] - current) if current > config["requests"]: return { "allowed": False, "remaining": 0, "reset_in": await self.redis.ttl(key) } return { "allowed": True, "remaining": remaining, "limit": config["requests"] }
Checklist conformite
Avant mise en production
DEVELOPERmarkdown## RGPD - [ ] Registre des traitements mis a jour - [ ] Analyse d'impact (DPIA) si donnees sensibles - [ ] Information des personnes concernees - [ ] Mecanismes d'exercice des droits implementes - [ ] Contrats sous-traitants (DPA) signes - [ ] Mesures de securite documentees ## AI Act (si applicable) - [ ] Classification du risque effectuee - [ ] Systeme de gestion des risques (si haut risque) - [ ] Documentation technique complete - [ ] Mecanisme de supervision humaine - [ ] Tests de robustesse et biais ## Securite - [ ] Chiffrement au repos et en transit - [ ] Controle d'acces RBAC configure - [ ] Audit logging actif - [ ] Tests de penetration effectues - [ ] Plan de reponse aux incidents - [ ] Sauvegardes testees ## Operationnel - [ ] Politique de retention definie - [ ] Processus de purge automatise - [ ] Monitoring securite actif - [ ] Formation equipes effectuee
Weiterführende Informationen
- RGPD et Chatbots - Fokus Kundenchats
- AI Act et RAG - Regulatorische Auswirkungen
- Donnees sensibles - Verarbeitung kritischer Daten
Vereinfachte Compliance mit Ailog
Die DSGVO- und AI Act-Compliance kann komplex wirken. Mit Ailog profitieren Sie von einer bereits konformen Infrastruktur:
- Hebergement 100% France chez OVH, donnees jamais transferees hors UE
- Chiffrement bout en bout AES-256
- RBAC natif avec gestion des permissions par document
- Anonymisation automatique des donnees personnelles
- Logs d'audit exportables pour vos DPO
- DPA signe et documentation RGPD fournie
Testez Ailog gratuitement et deployez un RAG conforme des aujourd'hui.
FAQ
Tags
Verwandte Artikel
Magento: Intelligenter Katalog-Assistent
Einen AI-Assistenten auf Magento bereitstellen, um in komplexen Katalogen zu navigieren, Produkte zu empfehlen und das B2B- und B2C-Erlebnis zu verbessern.
Upsell und Cross-sell : Personalisierte KI-Empfehlungen
Den durchschnittlichen Warenkorbwert mit intelligenten KI-Empfehlungen erhöhen: Upsell, Cross-sell und personalisierte Bundles basierend auf RAG.
Dynamische E-Commerce-FAQ: Kontextbezogene Antworten generieren
Erstellen Sie eine intelligente FAQ für Ihren Onlineshop: personalisierte Antworten je nach Produkt, Kunde und Kaufkontext.