Sicherheit und Compliance für RAG: DSGVO, AI Act und Best Practices

Sichern Sie Ihr RAG-System: DSGVO-Konformität, europäischer AI Act, Datenschutz und Audit. Umfassender Leitfaden für Unternehmen.

Author
Equipe Ailog
Published
Reading time
18 Minuten Lesezeit
Level
advanced

Sicherheit und Compliance für RAG : DSGVO, AI Act und Best Practices

Die Bereitstellung eines RAG-Systems im Unternehmen bedeutet oft den Umgang mit sensiblen Daten: interne Dokumente, Kundendaten, geistiges Eigentum. Dieser Leitfaden behandelt die wesentlichen Sicherheits- und Compliance-Aspekte für eine sorgenfreie Implementierung.

Der europäische Rechtsrahmen

DSGVO : Grundlagen für RAG

Die Datenschutz-Grundverordnung (DSGVO) gilt, sobald ein RAG-System personenbezogene Daten aus Europa verarbeitet.

| Grundsatz DSGVO | Anwendung RAG | |-----------------|----------------| | Rechtmäßigkeit der Verarbeitung | Rechtsgrundlage zum Indexieren und Verwenden der Daten | | Datenminimierung | Nur die notwendigen Daten indexieren | | Richtigkeit | Wissensbasis aktuell halten | | Speicherbegrenzung | Veraltete Daten löschen | | Integrität und Vertraulichkeit | Verschlüsselung und Zugriffskontrolle | | Rechenschaftspflicht | Verarbeitung dokumentieren |

AI Act : Was sich für RAG-Systeme ändert

Der europäische AI Act (stufenweise in Kraft 2024–2027) klassifiziert KI-Systeme nach Risikoniveau:

Begrenztes Risiko (Mehrheit der RAG) : • Chatbots im Kundensupport • Interne Assistenten • Dokumentensuche

Pflichten : • Transparenz: angeben, dass der Nutzer mit einer KI interagiert • Aufbewahrung von Logs für Audits

Hohes Risiko (spezifische Fälle) : • RAG für HR-Entscheidungen (Rekrutierung, Bewertung) • RAG für Kredit-/Versicherungsentscheidungen • RAG im Gesundheitsbereich mit medizinischer Auswirkung

Zusätzliche Pflichten : • Konformitätsbewertung • Risikomanagementsystem • Dokumentierte Trainingsdaten • Menschliche Aufsicht

``python Classification du risque selon l'AI Act def classify_rag_risk(use_case: dict) -> str: high_risk_domains = [ "hr_recruitment", "hr_evaluation", "credit_scoring", "insurance_pricing", "medical_diagnosis", "legal_decision" ]

if use_case.get("domain") in high_risk_domains: return "high_risk"

if use_case.get("automated_decision") and use_case.get("significant_impact"): return "high_risk"

return "limited_risk" `

Sicherheitsarchitektur

Trennung der Umgebungen

` ┌─────────────────────────────────────────────────────────────┐ │ PRODUCTION │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ API GW │──│ RAG API │──│ Vector DB │ │ │ │ (WAF) │ │ (Isolated) │ │ (Encrypted) │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Network Isolation (VPC) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐ │ STAGING │ │ (Donnees anonymisees uniquement) │ └─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐ │ DEVELOPMENT │ │ (Donnees synthetiques) │ └─────────────────────────────────────────────────────────────┘ `

Ende-zu-Ende-Verschlüsselung

`python from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64

class SecureRAGStorage: def __init__(self, master_key: bytes): self.cipher = Fernet(master_key)

def encrypt_document(self, content: str, metadata: dict) -> dict: """Verschlüsselt den Inhalt vor der Speicherung""" encrypted_content = self.cipher.encrypt(content.encode())

Sensible Metadaten verschlüsseln sensitive_fields = ["author", "email", "department"] encrypted_metadata = metadata.copy()

for field in sensitive_fields: if field in encrypted_metadata: encrypted_metadata[field] = self.cipher.encrypt( str(encrypted_metadata[field]).encode() ).decode()

return { "content": encrypted_content.decode(), "metadata": encrypted_metadata, "encrypted": True }

def decrypt_document(self, encrypted_doc: dict) -> dict: """Entschlüsselt das Dokument zur Verwendung""" if not encrypted_doc.get("encrypted"): return encrypted_doc

content = self.cipher.decrypt( encrypted_doc["content"].encode() ).decode()

return { "content": content, "metadata": self._decrypt_metadata(encrypted_doc["metadata"]) }

def encrypt_embedding(self, embedding: list[float]) -> bytes: """Verschlüsselt die embeddings (optional, beeinträchtigt die Performance)""" import numpy as np embedding_bytes = np.array(embedding).tobytes() return self.cipher.encrypt(embedding_bytes) `

Feingranulare Zugriffskontrolle (RBAC)

`python from enum import Enum from functools import wraps

class Permission(Enum): READ_PUBLIC = "read:public" READ_INTERNAL = "read:internal" READ_CONFIDENTIAL = "read:confidential" READ_RESTRICTED = "read:restricted" ADMIN = "admin"

class DocumentClassification(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted"

class RBACController: def __init__(self, user_service): self.user_service = user_service

self.permission_hierarchy = { Permission.READ_PUBLIC: [DocumentClassification.PUBLIC], Permission.READ_INTERNAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL ], Permission.READ_CONFIDENTIAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL ], Permission.READ_RESTRICTED: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL, DocumentClassification.RESTRICTED ] }

async def filter_accessible_documents( self, documents: list[dict], user_id: str ) -> list[dict]: """Filtert die Dokumente nach Benutzerberechtigungen""" user = await self.user_service.get_user(user_id) user_permissions = set(user.get("permissions", []))

accessible = [] for doc in documents: doc_class = DocumentClassification( doc.get("metadata", {}).get("classification", "internal") )

if self._can_access(user_permissions, doc_class): accessible.append(doc)

return accessible

def _can_access( self, user_permissions: set, doc_classification: DocumentClassification ) -> bool: """Prüft, ob der Benutzer auf das Dokument zugreifen kann""" for perm in user_permissions: try: allowed = self.permission_hierarchy.get(Permission(perm), []) if doc_classification in allowed: return True except ValueError: continue return False `

Schutz personenbezogener Daten

Automatische Erkennung und Anonymisierung

`python import re from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine

class PIIProtector: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine()

Patterns personnalises pour la France self.french_patterns = { "FRENCH_SSN": r"\b[12][0-9]{2}[0-1][0-9][0-9]{2}[0-9]{3}[0-9]{3}[0-9]{2}\b", "FRENCH_PHONE": r"\b(?:(?:\+33|0033|0)1-9{4})\b", "IBAN_FR": r"\bFR\d{2}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{3}\b" }

def detect_pii(self, text: str, language: str = "fr") -> list[dict]: """Erkennt personenbezogene Daten im Text""" Presidio-Erkennung (email, nom, adresse, etc.) results = self.analyzer.analyze( text=text, language=language, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", "CREDIT_CARD", "IBAN_CODE" ] )

Erkennung französischer Muster french_results = self._detect_french_patterns(text)

all_results = list(results) + french_results return self._format_results(all_results)

def anonymize( self, text: str, mode: str = "replace" replace, hash, mask ) -> str: """Anonymisiert personenbezogene Daten""" results = self.analyzer.analyze(text=text, language="fr")

if mode == "replace": anonymized = self.anonymizer.anonymize( text=text, analyzer_results=results ) elif mode == "hash": anonymized = self._hash_pii(text, results) elif mode == "mask": anonymized = self._mask_pii(text, results)

return anonymized.text

def _hash_pii(self, text: str, results: list) -> str: """Ersetzt PII durch ihren Hash (mit Schlüssel reversibel)""" import hashlib

result_text = text for result in sorted(results, key=lambda x: x.start, reverse=True): original = text[result.start:result.end] hashed = hashlib.sha256(original.encode()).hexdigest()[:12] result_text = result_text[:result.start] + f"[{result.entity_type}:{hashed}]" + result_text[result.end:]

return result_text

def check_before_indexing(self, document: dict) -> dict: """Prüft und verarbeitet ein Dokument vor der Indexierung""" content = document.get("content", "")

pii_detected = self.detect_pii(content)

if pii_detected: return { "safe_to_index": False, "pii_found": pii_detected, "action_required": "review_or_anonymize", "anonymized_content": self.anonymize(content) }

return { "safe_to_index": True, "pii_found": [], "original_content": content } `

Einwilligung und Betroffenenrechte

`python class ConsentManager: def __init__(self, db): self.db = db

async def record_consent( self, user_id: str, purpose: str, data_categories: list[str], expiry_date: datetime = None ): """Speichert die Benutzerzustimmung""" consent = { "user_id": user_id, "purpose": purpose, "data_categories": data_categories, "granted_at": datetime.now(), "expires_at": expiry_date, "status": "active" }

await self.db.insert("consents", consent)

async def check_consent( self, user_id: str, purpose: str, data_category: str ) -> bool: """Überprüft, ob die Zustimmung gültig ist""" consent = await self.db.find_one("consents", { "user_id": user_id, "purpose": purpose, "data_categories": {"$in": [data_category]}, "status": "active", "$or": [ {"expires_at": None}, {"expires_at": {"$gt": datetime.now()}} ] })

return consent is not None

async def handle_deletion_request(self, user_id: str) -> dict: """Verarbeitet eine Löschanfrage (Recht auf Vergessenwerden)""" Alle Daten identifizieren user_data = await self._find_all_user_data(user_id) Aus den vector-Indizes löschen await self._delete_from_vector_db(user_data["document_ids"]) Aus Datenbanken löschen await self._delete_from_databases(user_id) Löschung protokollieren await self._log_deletion(user_id, user_data)

return { "status": "completed", "deleted_documents": len(user_data["document_ids"]), "deleted_conversations": user_data["conversation_count"], "completion_date": datetime.now().isoformat() }

async def handle_export_request(self, user_id: str) -> dict: """Verarbeitet eine Exportanfrage (Recht auf Datenübertragbarkeit)""" user_data = await self._find_all_user_data(user_id)

export = { "user_profile": user_data["profile"], "conversations": user_data["conversations"], "indexed_documents": user_data["documents"], "consents": user_data["consents"], "export_date": datetime.now().isoformat() }

return export `

Audit und Nachvollziehbarkeit

Sichere Protokollierung

`python import logging import json from datetime import datetime

class AuditLogger: def __init__(self, log_destination: str): self.logger = logging.getLogger("rag_audit") self.logger.setLevel(logging.INFO)

Handler securise (fichier chiffre ou SIEM) handler = logging.FileHandler(log_destination) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler)

def log_query( self, user_id: str, query: str, documents_accessed: list[str], response_generated: bool ): """Protokolliert jede RAG-Anfrage""" SENSIBLEN Inhalt der Anfrage NICHT protokollieren log_entry = { "event_type": "rag_query", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "query_hash": self._hash_content(query), "query_length": len(query), "documents_accessed": documents_accessed, "document_count": len(documents_accessed), "response_generated": response_generated }

self.logger.info(json.dumps(log_entry))

def log_document_access( self, user_id: str, document_id: str, access_type: str, classification: str ): """Protokolliert den Zugriff auf Dokumente""" log_entry = { "event_type": "document_access", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "document_id": document_id, "access_type": access_type, "classification": classification }

self.logger.info(json.dumps(log_entry))

def log_security_event( self, event_type: str, severity: str, details: dict ): """Protokolliert Sicherheitsereignisse""" log_entry = { "event_type": f"security_{event_type}", "timestamp": datetime.now().isoformat(), "severity": severity, "details": details }

if severity in ["high", "critical"]: self.logger.warning(json.dumps(log_entry)) self._alert_security_team(log_entry) else: self.logger.info(json.dumps(log_entry))

def _hash_content(self, content: str) -> str: import hashlib return hashlib.sha256(content.encode()).hexdigest() `

Aufbewahrung und Löschung

`python class DataRetentionManager: def __init__(self, db, vector_db, config: dict): self.db = db self.vector_db = vector_db self.config = config

async def apply_retention_policy(self): """Wendet die Aufbewahrungsrichtlinie an"""

Conversations conv_retention = self.config.get("conversation_retention_days", 365) await self._purge_old_conversations(conv_retention)

Logs d'audit (obligation legale plus longue) audit_retention = self.config.get("audit_retention_days", 2190) 6 ans await self._archive_old_audits(audit_retention)

Documents expires await self._handle_expired_documents()

Metadonnees orphelines await self._cleanup_orphaned_data()

async def _purge_old_conversations(self, days: int): """Löscht alte Konversationen""" cutoff_date = datetime.now() - timedelta(days=days)

Anonymisieren statt Löschen, falls Analysen benötigt werden await self.db.update_many( "conversations", {"created_at": {"$lt": cutoff_date}}, { "$set": { "user_id": "anonymized", "messages": [], "purged_at": datetime.now() } } )

async def _handle_expired_documents(self): """Verwaltet Dokumente mit Ablaufdatum""" expired = await self.db.find( "documents", {"expires_at": {"$lt": datetime.now()}} )

for doc in expired: Aus dem vector-Indizes löschen await self.vector_db.delete(doc["id"])

Als abgelaufen markieren await self.db.update( "documents", {"id": doc["id"]}, {"$set": {"status": "expired", "content": None}} ) `

Operative Sicherheit

Schutz gegen Injections

`python class PromptSecurityGuard: def __init__(self): self.injection_patterns = [ r"ignore\s+(previous|all|above)\s+instructions", r"disregard\s+(your|the)\s+(rules|instructions)", r"you\s+are\s+now\s+", r"pretend\s+(you|to)\s+", r"act\s+as\s+if", r"system\s:\s", r"<\|.\|>", r"\[INST\]", r"###\s(instruction|system)", ]

def check_query(self, query: str) -> dict: """Prüft die Sicherheit einer Anfrage""" query_lower = query.lower()

Erkennung von Injection-Patterns for pattern in self.injection_patterns: if re.search(pattern, query_lower, re.IGNORECASE): return { "safe": False, "reason": "potential_injection", "pattern_matched": pattern }

Detection tentatives d'exfiltration if self._detect_exfiltration_attempt(query): return { "safe": False, "reason": "potential_exfiltration" }

Detection contenu malveillant if self._detect_malicious_content(query): return { "safe": False, "reason": "malicious_content" }

return {"safe": True}

def sanitize_context(self, context: str) -> str: """Bereinigt den Kontext vor dem Einfügen in den Prompt""" Entfernt Injection-Versuche in den Dokumenten sanitized = context

for pattern in self.injection_patterns: sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE)

return sanitized

def _detect_exfiltration_attempt(self, query: str) -> bool: """Erkennt Versuche der Datenexfiltration""" exfil_patterns = [ r"list\s+all\s+(users|passwords|keys|secrets)", r"show\s+(me\s+)?(the\s+)?system\s+prompt", r"what\s+(are|is)\s+(your|the)\s+(instructions|rules)", r"dump\s+(all|the)\s+data", r"export\s+.*\s+to\s+(email|url|external)" ]

for pattern in exfil_patterns: if re.search(pattern, query.lower()): return True

return False `

Rate Limiting und DDoS-Schutz

`python from redis import Redis import time

class RateLimiter: def __init__(self, redis_client: Redis): self.redis = redis_client

async def check_rate_limit( self, identifier: str, limit_type: str = "query" ) -> dict: """Überprüft die Ratenbegrenzung"""

limits = { "query": {"requests": 60, "window": 60}, 60 req/min "heavy_query": {"requests": 10, "window": 60}, 10 req/min "indexing": {"requests": 100, "window": 3600}, 100/heure "export": {"requests": 5, "window": 86400} 5/jour }

config = limits.get(limit_type, limits["query"])

key = f"ratelimit:{limit_type}:{identifier}" current = await self.redis.incr(key)

if current == 1: await self.redis.expire(key, config["window"])

remaining = max(0, config["requests"] - current)

if current > config["requests"]: return { "allowed": False, "remaining": 0, "reset_in": await self.redis.ttl(key) }

return { "allowed": True, "remaining": remaining, "limit": config["requests"] } `

Checklist conformite

Avant mise en production

`markdown RGPD • [ ] Registre des traitements mis a jour • [ ] Analyse d'impact (DPIA) si donnees sensibles • [ ] Information des personnes concernees • [ ] Mecanismes d'exercice des droits implementes • [ ] Contrats sous-traitants (DPA) signes • [ ] Mesures de securite documentees

AI Act (si applicable) • [ ] Classification du risque effectuee • [ ] Systeme de gestion des risques (si haut risque) • [ ] Documentation technique complete • [ ] Mecanisme de supervision humaine • [ ] Tests de robustesse et biais

Securite • [ ] Chiffrement au repos et en transit • [ ] Controle d'acces RBAC configure • [ ] Audit logging actif • [ ] Tests de penetration effectues • [ ] Plan de reponse aux incidents • [ ] Sauvegardes testees

Operationnel • [ ] Politique de retention definie • [ ] Processus de purge automatise • [ ] Monitoring securite actif • [ ] Formation equipes effectuee ``

Weiterführende Informationen • RGPD et Chatbots - Fokus Kundenchats • AI Act et RAG - Regulatorische Auswirkungen • Donnees sensibles - Verarbeitung kritischer Daten

---

Vereinfachte Compliance mit Ailog

Die DSGVO- und AI Act-Compliance kann komplex wirken. Mit Ailog profitieren Sie von einer bereits konformen Infrastruktur: • Hebergement 100% France chez OVH, donnees jamais transferees hors UE • Chiffrement bout en bout AES-256 • RBAC natif avec gestion des permissions par document • Anonymisation automatique des donnees personnelles • Logs d'audit exportables pour vos DPO • DPA signe et documentation RGPD fournie

Testez Ailog gratuitement et deployez un RAG conforme des aujourd'hui.

---

FAQ

Ist ein RAG-System von der DSGVO betroffen?

Ja — sobald es personenbezogene Daten (Namen, E-Mails, Konversationen) verarbeitet. Ein RAG gilt als automatisierte Verarbeitung, die Einwilligung, Information der Betroffenen und Sicherheitsmaßnahmen erfordert. Auch vector-Daten sind betroffen, da sie die Rekonstruktion der Originaltexte ermöglichen.

Gilt der AI Act für RAG-Chatbots?

Das hängt vom Anwendungsfall ab. Ein Chatbot im Kundendienst ist in der Regel "limited risk" (Transparenzpflichten). Ein RAG, das für HR- oder Kreditentscheidungen eingesetzt wird, wäre "high risk" mit strengen Anforderungen an Dokumentation und menschliche Aufsicht.

Wie anonymisiert man Daten in einem RAG?

Drei Ansätze: Entfernen von Identifikatoren vor der Indexierung, Pseudonymisierung mit sicherem Mapping oder dynamisches Filtern bei der Generierung. Anonymisierung sollte irreversibel sein für nicht benötigte Daten, reversibel (Pseudonymisierung) falls Nachvollziehbarkeit erforderlich ist.

Kann man ein RAG außerhalb der EU hosten?

Technisch ja, aber das erschwert die DSGVO-Compliance. Übermittlungen außerhalb der EU benötigen Garantien (Standardvertragsklauseln, Angemessenheitsentscheidungen). Für sensible Daten wird souveränes Hosting in Frankreich dringend empfohlen.

Welche Logs sollte man für Audits aufbewahren?

Mindestens: Nutzerkennzeichen, Timestamp, Anfrage, genutzte Quellen, generierte Antwort. Für AI Act: automatisierte Entscheidungen, menschliche Eingriffe, Vorfälle. Aufbewahrungsdauer: je nach Richtlinie meist 1 bis 3 Jahre.

Tags

  • RAG
  • securite
  • RGPD
  • AI Act
  • conformite
  • donnees
GuideAvancé

Sicherheit und Compliance für RAG: DSGVO, AI Act und Best Practices

20 février 2026
18 Minuten Lesezeit
Equipe Ailog

Sichern Sie Ihr RAG-System: DSGVO-Konformität, europäischer AI Act, Datenschutz und Audit. Umfassender Leitfaden für Unternehmen.

Sicherheit und Compliance für RAG : DSGVO, AI Act und Best Practices

Die Bereitstellung eines RAG-Systems im Unternehmen bedeutet oft den Umgang mit sensiblen Daten: interne Dokumente, Kundendaten, geistiges Eigentum. Dieser Leitfaden behandelt die wesentlichen Sicherheits- und Compliance-Aspekte für eine sorgenfreie Implementierung.

Der europäische Rechtsrahmen

DSGVO : Grundlagen für RAG

Die Datenschutz-Grundverordnung (DSGVO) gilt, sobald ein RAG-System personenbezogene Daten aus Europa verarbeitet.

Grundsatz DSGVOAnwendung RAG
Rechtmäßigkeit der VerarbeitungRechtsgrundlage zum Indexieren und Verwenden der Daten
DatenminimierungNur die notwendigen Daten indexieren
RichtigkeitWissensbasis aktuell halten
SpeicherbegrenzungVeraltete Daten löschen
Integrität und VertraulichkeitVerschlüsselung und Zugriffskontrolle
RechenschaftspflichtVerarbeitung dokumentieren

AI Act : Was sich für RAG-Systeme ändert

Der europäische AI Act (stufenweise in Kraft 2024–2027) klassifiziert KI-Systeme nach Risikoniveau:

Begrenztes Risiko (Mehrheit der RAG) :

  • Chatbots im Kundensupport
  • Interne Assistenten
  • Dokumentensuche

Pflichten :

  • Transparenz: angeben, dass der Nutzer mit einer KI interagiert
  • Aufbewahrung von Logs für Audits

Hohes Risiko (spezifische Fälle) :

  • RAG für HR-Entscheidungen (Rekrutierung, Bewertung)
  • RAG für Kredit-/Versicherungsentscheidungen
  • RAG im Gesundheitsbereich mit medizinischer Auswirkung

Zusätzliche Pflichten :

  • Konformitätsbewertung
  • Risikomanagementsystem
  • Dokumentierte Trainingsdaten
  • Menschliche Aufsicht
DEVELOPERpython
# Classification du risque selon l'AI Act def classify_rag_risk(use_case: dict) -> str: high_risk_domains = [ "hr_recruitment", "hr_evaluation", "credit_scoring", "insurance_pricing", "medical_diagnosis", "legal_decision" ] if use_case.get("domain") in high_risk_domains: return "high_risk" if use_case.get("automated_decision") and use_case.get("significant_impact"): return "high_risk" return "limited_risk"

Sicherheitsarchitektur

Trennung der Umgebungen

┌─────────────────────────────────────────────────────────────┐
│                     PRODUCTION                               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │   API GW    │──│   RAG API   │──│   Vector DB         │  │
│  │   (WAF)     │  │  (Isolated) │  │   (Encrypted)       │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
│         │                │                   │               │
│         │                │                   │               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Network Isolation (VPC)                 │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                      STAGING                                 │
│           (Donnees anonymisees uniquement)                  │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                    DEVELOPMENT                               │
│             (Donnees synthetiques)                          │
└─────────────────────────────────────────────────────────────┘

Ende-zu-Ende-Verschlüsselung

DEVELOPERpython
from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 class SecureRAGStorage: def __init__(self, master_key: bytes): self.cipher = Fernet(master_key) def encrypt_document(self, content: str, metadata: dict) -> dict: """Verschlüsselt den Inhalt vor der Speicherung""" encrypted_content = self.cipher.encrypt(content.encode()) # Sensible Metadaten verschlüsseln sensitive_fields = ["author", "email", "department"] encrypted_metadata = metadata.copy() for field in sensitive_fields: if field in encrypted_metadata: encrypted_metadata[field] = self.cipher.encrypt( str(encrypted_metadata[field]).encode() ).decode() return { "content": encrypted_content.decode(), "metadata": encrypted_metadata, "encrypted": True } def decrypt_document(self, encrypted_doc: dict) -> dict: """Entschlüsselt das Dokument zur Verwendung""" if not encrypted_doc.get("encrypted"): return encrypted_doc content = self.cipher.decrypt( encrypted_doc["content"].encode() ).decode() return { "content": content, "metadata": self._decrypt_metadata(encrypted_doc["metadata"]) } def encrypt_embedding(self, embedding: list[float]) -> bytes: """Verschlüsselt die embeddings (optional, beeinträchtigt die Performance)""" import numpy as np embedding_bytes = np.array(embedding).tobytes() return self.cipher.encrypt(embedding_bytes)

Feingranulare Zugriffskontrolle (RBAC)

DEVELOPERpython
from enum import Enum from functools import wraps class Permission(Enum): READ_PUBLIC = "read:public" READ_INTERNAL = "read:internal" READ_CONFIDENTIAL = "read:confidential" READ_RESTRICTED = "read:restricted" ADMIN = "admin" class DocumentClassification(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted" class RBACController: def __init__(self, user_service): self.user_service = user_service self.permission_hierarchy = { Permission.READ_PUBLIC: [DocumentClassification.PUBLIC], Permission.READ_INTERNAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL ], Permission.READ_CONFIDENTIAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL ], Permission.READ_RESTRICTED: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL, DocumentClassification.RESTRICTED ] } async def filter_accessible_documents( self, documents: list[dict], user_id: str ) -> list[dict]: """Filtert die Dokumente nach Benutzerberechtigungen""" user = await self.user_service.get_user(user_id) user_permissions = set(user.get("permissions", [])) accessible = [] for doc in documents: doc_class = DocumentClassification( doc.get("metadata", {}).get("classification", "internal") ) if self._can_access(user_permissions, doc_class): accessible.append(doc) return accessible def _can_access( self, user_permissions: set, doc_classification: DocumentClassification ) -> bool: """Prüft, ob der Benutzer auf das Dokument zugreifen kann""" for perm in user_permissions: try: allowed = self.permission_hierarchy.get(Permission(perm), []) if doc_classification in allowed: return True except ValueError: continue return False

Schutz personenbezogener Daten

Automatische Erkennung und Anonymisierung

DEVELOPERpython
import re from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine class PIIProtector: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine() # Patterns personnalises pour la France self.french_patterns = { "FRENCH_SSN": r"\b[12][0-9]{2}[0-1][0-9][0-9]{2}[0-9]{3}[0-9]{3}[0-9]{2}\b", "FRENCH_PHONE": r"\b(?:(?:\+33|0033|0)[1-9](?:[.\-\s]?\d{2}){4})\b", "IBAN_FR": r"\bFR\d{2}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{3}\b" } def detect_pii(self, text: str, language: str = "fr") -> list[dict]: """Erkennt personenbezogene Daten im Text""" # Presidio-Erkennung (email, nom, adresse, etc.) results = self.analyzer.analyze( text=text, language=language, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", "CREDIT_CARD", "IBAN_CODE" ] ) # Erkennung französischer Muster french_results = self._detect_french_patterns(text) all_results = list(results) + french_results return self._format_results(all_results) def anonymize( self, text: str, mode: str = "replace" # replace, hash, mask ) -> str: """Anonymisiert personenbezogene Daten""" results = self.analyzer.analyze(text=text, language="fr") if mode == "replace": anonymized = self.anonymizer.anonymize( text=text, analyzer_results=results ) elif mode == "hash": anonymized = self._hash_pii(text, results) elif mode == "mask": anonymized = self._mask_pii(text, results) return anonymized.text def _hash_pii(self, text: str, results: list) -> str: """Ersetzt PII durch ihren Hash (mit Schlüssel reversibel)""" import hashlib result_text = text for result in sorted(results, key=lambda x: x.start, reverse=True): original = text[result.start:result.end] hashed = hashlib.sha256(original.encode()).hexdigest()[:12] result_text = result_text[:result.start] + f"[{result.entity_type}:{hashed}]" + result_text[result.end:] return result_text def check_before_indexing(self, document: dict) -> dict: """Prüft und verarbeitet ein Dokument vor der Indexierung""" content = document.get("content", "") pii_detected = self.detect_pii(content) if pii_detected: return { "safe_to_index": False, "pii_found": pii_detected, "action_required": "review_or_anonymize", "anonymized_content": self.anonymize(content) } return { "safe_to_index": True, "pii_found": [], "original_content": content }

Einwilligung und Betroffenenrechte

DEVELOPERpython
class ConsentManager: def __init__(self, db): self.db = db async def record_consent( self, user_id: str, purpose: str, data_categories: list[str], expiry_date: datetime = None ): """Speichert die Benutzerzustimmung""" consent = { "user_id": user_id, "purpose": purpose, "data_categories": data_categories, "granted_at": datetime.now(), "expires_at": expiry_date, "status": "active" } await self.db.insert("consents", consent) async def check_consent( self, user_id: str, purpose: str, data_category: str ) -> bool: """Überprüft, ob die Zustimmung gültig ist""" consent = await self.db.find_one("consents", { "user_id": user_id, "purpose": purpose, "data_categories": {"$in": [data_category]}, "status": "active", "$or": [ {"expires_at": None}, {"expires_at": {"$gt": datetime.now()}} ] }) return consent is not None async def handle_deletion_request(self, user_id: str) -> dict: """Verarbeitet eine Löschanfrage (Recht auf Vergessenwerden)""" # 1. Alle Daten identifizieren user_data = await self._find_all_user_data(user_id) # 2. Aus den vector-Indizes löschen await self._delete_from_vector_db(user_data["document_ids"]) # 3. Aus Datenbanken löschen await self._delete_from_databases(user_id) # 4. Löschung protokollieren await self._log_deletion(user_id, user_data) return { "status": "completed", "deleted_documents": len(user_data["document_ids"]), "deleted_conversations": user_data["conversation_count"], "completion_date": datetime.now().isoformat() } async def handle_export_request(self, user_id: str) -> dict: """Verarbeitet eine Exportanfrage (Recht auf Datenübertragbarkeit)""" user_data = await self._find_all_user_data(user_id) export = { "user_profile": user_data["profile"], "conversations": user_data["conversations"], "indexed_documents": user_data["documents"], "consents": user_data["consents"], "export_date": datetime.now().isoformat() } return export

Audit und Nachvollziehbarkeit

Sichere Protokollierung

DEVELOPERpython
import logging import json from datetime import datetime class AuditLogger: def __init__(self, log_destination: str): self.logger = logging.getLogger("rag_audit") self.logger.setLevel(logging.INFO) # Handler securise (fichier chiffre ou SIEM) handler = logging.FileHandler(log_destination) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) def log_query( self, user_id: str, query: str, documents_accessed: list[str], response_generated: bool ): """Protokolliert jede RAG-Anfrage""" # SENSIBLEN Inhalt der Anfrage NICHT protokollieren log_entry = { "event_type": "rag_query", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "query_hash": self._hash_content(query), "query_length": len(query), "documents_accessed": documents_accessed, "document_count": len(documents_accessed), "response_generated": response_generated } self.logger.info(json.dumps(log_entry)) def log_document_access( self, user_id: str, document_id: str, access_type: str, classification: str ): """Protokolliert den Zugriff auf Dokumente""" log_entry = { "event_type": "document_access", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "document_id": document_id, "access_type": access_type, "classification": classification } self.logger.info(json.dumps(log_entry)) def log_security_event( self, event_type: str, severity: str, details: dict ): """Protokolliert Sicherheitsereignisse""" log_entry = { "event_type": f"security_{event_type}", "timestamp": datetime.now().isoformat(), "severity": severity, "details": details } if severity in ["high", "critical"]: self.logger.warning(json.dumps(log_entry)) self._alert_security_team(log_entry) else: self.logger.info(json.dumps(log_entry)) def _hash_content(self, content: str) -> str: import hashlib return hashlib.sha256(content.encode()).hexdigest()

Aufbewahrung und Löschung

DEVELOPERpython
class DataRetentionManager: def __init__(self, db, vector_db, config: dict): self.db = db self.vector_db = vector_db self.config = config async def apply_retention_policy(self): """Wendet die Aufbewahrungsrichtlinie an""" # Conversations conv_retention = self.config.get("conversation_retention_days", 365) await self._purge_old_conversations(conv_retention) # Logs d'audit (obligation legale plus longue) audit_retention = self.config.get("audit_retention_days", 2190) # 6 ans await self._archive_old_audits(audit_retention) # Documents expires await self._handle_expired_documents() # Metadonnees orphelines await self._cleanup_orphaned_data() async def _purge_old_conversations(self, days: int): """Löscht alte Konversationen""" cutoff_date = datetime.now() - timedelta(days=days) # Anonymisieren statt Löschen, falls Analysen benötigt werden await self.db.update_many( "conversations", {"created_at": {"$lt": cutoff_date}}, { "$set": { "user_id": "anonymized", "messages": [], "purged_at": datetime.now() } } ) async def _handle_expired_documents(self): """Verwaltet Dokumente mit Ablaufdatum""" expired = await self.db.find( "documents", {"expires_at": {"$lt": datetime.now()}} ) for doc in expired: # Aus dem vector-Indizes löschen await self.vector_db.delete(doc["id"]) # Als abgelaufen markieren await self.db.update( "documents", {"id": doc["id"]}, {"$set": {"status": "expired", "content": None}} )

Operative Sicherheit

Schutz gegen Injections

DEVELOPERpython
class PromptSecurityGuard: def __init__(self): self.injection_patterns = [ r"ignore\s+(previous|all|above)\s+instructions", r"disregard\s+(your|the)\s+(rules|instructions)", r"you\s+are\s+now\s+", r"pretend\s+(you|to)\s+", r"act\s+as\s+if", r"system\s*:\s*", r"<\|.*\|>", r"\[INST\]", r"###\s*(instruction|system)", ] def check_query(self, query: str) -> dict: """Prüft die Sicherheit einer Anfrage""" query_lower = query.lower() # Erkennung von Injection-Patterns for pattern in self.injection_patterns: if re.search(pattern, query_lower, re.IGNORECASE): return { "safe": False, "reason": "potential_injection", "pattern_matched": pattern } # Detection tentatives d'exfiltration if self._detect_exfiltration_attempt(query): return { "safe": False, "reason": "potential_exfiltration" } # Detection contenu malveillant if self._detect_malicious_content(query): return { "safe": False, "reason": "malicious_content" } return {"safe": True} def sanitize_context(self, context: str) -> str: """Bereinigt den Kontext vor dem Einfügen in den Prompt""" # Entfernt Injection-Versuche in den Dokumenten sanitized = context for pattern in self.injection_patterns: sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE) return sanitized def _detect_exfiltration_attempt(self, query: str) -> bool: """Erkennt Versuche der Datenexfiltration""" exfil_patterns = [ r"list\s+all\s+(users|passwords|keys|secrets)", r"show\s+(me\s+)?(the\s+)?system\s+prompt", r"what\s+(are|is)\s+(your|the)\s+(instructions|rules)", r"dump\s+(all|the)\s+data", r"export\s+.*\s+to\s+(email|url|external)" ] for pattern in exfil_patterns: if re.search(pattern, query.lower()): return True return False

Rate Limiting und DDoS-Schutz

DEVELOPERpython
from redis import Redis import time class RateLimiter: def __init__(self, redis_client: Redis): self.redis = redis_client async def check_rate_limit( self, identifier: str, limit_type: str = "query" ) -> dict: """Überprüft die Ratenbegrenzung""" limits = { "query": {"requests": 60, "window": 60}, # 60 req/min "heavy_query": {"requests": 10, "window": 60}, # 10 req/min "indexing": {"requests": 100, "window": 3600}, # 100/heure "export": {"requests": 5, "window": 86400} # 5/jour } config = limits.get(limit_type, limits["query"]) key = f"ratelimit:{limit_type}:{identifier}" current = await self.redis.incr(key) if current == 1: await self.redis.expire(key, config["window"]) remaining = max(0, config["requests"] - current) if current > config["requests"]: return { "allowed": False, "remaining": 0, "reset_in": await self.redis.ttl(key) } return { "allowed": True, "remaining": remaining, "limit": config["requests"] }

Checklist conformite

Avant mise en production

DEVELOPERmarkdown
## RGPD - [ ] Registre des traitements mis a jour - [ ] Analyse d'impact (DPIA) si donnees sensibles - [ ] Information des personnes concernees - [ ] Mecanismes d'exercice des droits implementes - [ ] Contrats sous-traitants (DPA) signes - [ ] Mesures de securite documentees ## AI Act (si applicable) - [ ] Classification du risque effectuee - [ ] Systeme de gestion des risques (si haut risque) - [ ] Documentation technique complete - [ ] Mecanisme de supervision humaine - [ ] Tests de robustesse et biais ## Securite - [ ] Chiffrement au repos et en transit - [ ] Controle d'acces RBAC configure - [ ] Audit logging actif - [ ] Tests de penetration effectues - [ ] Plan de reponse aux incidents - [ ] Sauvegardes testees ## Operationnel - [ ] Politique de retention definie - [ ] Processus de purge automatise - [ ] Monitoring securite actif - [ ] Formation equipes effectuee

Weiterführende Informationen


Vereinfachte Compliance mit Ailog

Die DSGVO- und AI Act-Compliance kann komplex wirken. Mit Ailog profitieren Sie von einer bereits konformen Infrastruktur:

  • Hebergement 100% France chez OVH, donnees jamais transferees hors UE
  • Chiffrement bout en bout AES-256
  • RBAC natif avec gestion des permissions par document
  • Anonymisation automatique des donnees personnelles
  • Logs d'audit exportables pour vos DPO
  • DPA signe et documentation RGPD fournie

Testez Ailog gratuitement et deployez un RAG conforme des aujourd'hui.

FAQ

Ja — sobald es personenbezogene Daten (Namen, E-Mails, Konversationen) verarbeitet. Ein RAG gilt als automatisierte Verarbeitung, die Einwilligung, Information der Betroffenen und Sicherheitsmaßnahmen erfordert. Auch vector-Daten sind betroffen, da sie die Rekonstruktion der Originaltexte ermöglichen.
Das hängt vom Anwendungsfall ab. Ein Chatbot im Kundendienst ist in der Regel "limited risk" (Transparenzpflichten). Ein RAG, das für HR- oder Kreditentscheidungen eingesetzt wird, wäre "high risk" mit strengen Anforderungen an Dokumentation und menschliche Aufsicht.
Drei Ansätze: Entfernen von Identifikatoren vor der Indexierung, Pseudonymisierung mit sicherem Mapping oder dynamisches Filtern bei der Generierung. Anonymisierung sollte irreversibel sein für nicht benötigte Daten, reversibel (Pseudonymisierung) falls Nachvollziehbarkeit erforderlich ist.
Technisch ja, aber das erschwert die DSGVO-Compliance. Übermittlungen außerhalb der EU benötigen Garantien (Standardvertragsklauseln, Angemessenheitsentscheidungen). Für sensible Daten wird souveränes Hosting in Frankreich dringend empfohlen.
Mindestens: Nutzerkennzeichen, Timestamp, Anfrage, genutzte Quellen, generierte Antwort. Für AI Act: automatisierte Entscheidungen, menschliche Eingriffe, Vorfälle. Aufbewahrungsdauer: je nach Richtlinie meist 1 bis 3 Jahre.

Tags

RAGsecuriteRGPDAI Actconformitedonnees

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !