Securite et Conformite RAG : RGPD, AI Act et bonnes pratiques
Securisez votre systeme RAG : conformite RGPD, AI Act europeen, protection des donnees et audit. Guide complet pour les entreprises.
- Auteur
- Equipe Ailog
- Date de publication
- Temps de lecture
- 18 min de lecture
- Niveau
- advanced
Securite et Conformite RAG : RGPD, AI Act et bonnes pratiques
Deployer un systeme RAG en entreprise implique de manipuler des donnees souvent sensibles : documents internes, informations clients, propriete intellectuelle. Ce guide couvre les aspects securite et conformite essentiels pour un deploiement serein.
Le cadre reglementaire europeen
RGPD : les fondamentaux pour le RAG
Le Reglement General sur la Protection des Donnees s'applique des qu'un systeme RAG traite des donnees personnelles europeennes.
| Principe RGPD | Application RAG | |---------------|-----------------| | Liceit du traitement | Base legale pour indexer et utiliser les donnees | | Minimisation | N'indexer que les donnees necessaires | | Exactitude | Maintenir la base de connaissances a jour | | Limitation conservation | Purger les donnees obsoletes | | Integrite et confidentialite | Chiffrement et controle d'acces | | Responsabilite | Documenter les traitements |
AI Act : ce qui change pour les systemes RAG
L'AI Act europeen (entree en vigueur progressive 2024-2027) classifie les systemes IA par niveau de risque :
Risque limite (majorite des RAG) : • Chatbots support client • Assistants internes • Recherche documentaire
Obligations : • Transparence : indiquer que l'utilisateur interagit avec une IA • Conservation des logs pour audit
Risque eleve (cas specifiques) : • RAG pour decisions RH (recrutement, evaluation) • RAG pour decisions credit/assurance • RAG sante avec impact medical
Obligations supplementaires : • Evaluation de conformite • Systeme de gestion des risques • Donnees d'entrainement documentees • Supervision humaine
``python Classification du risque selon l'AI Act def classify_rag_risk(use_case: dict) -> str: high_risk_domains = [ "hr_recruitment", "hr_evaluation", "credit_scoring", "insurance_pricing", "medical_diagnosis", "legal_decision" ]
if use_case.get("domain") in high_risk_domains: return "high_risk"
if use_case.get("automated_decision") and use_case.get("significant_impact"): return "high_risk"
return "limited_risk" `
Architecture securisee
Separation des environnements
` ┌─────────────────────────────────────────────────────────────┐ │ PRODUCTION │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ API GW │──│ RAG API │──│ Vector DB │ │ │ │ (WAF) │ │ (Isolated) │ │ (Encrypted) │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Network Isolation (VPC) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ STAGING │ │ (Donnees anonymisees uniquement) │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ DEVELOPMENT │ │ (Donnees synthetiques) │ └─────────────────────────────────────────────────────────────┘ `
Chiffrement bout en bout
`python from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64
class SecureRAGStorage: def __init__(self, master_key: bytes): self.cipher = Fernet(master_key)
def encrypt_document(self, content: str, metadata: dict) -> dict: """Chiffre le contenu avant stockage""" encrypted_content = self.cipher.encrypt(content.encode())
Chiffrer les metadonnees sensibles sensitive_fields = ["author", "email", "department"] encrypted_metadata = metadata.copy()
for field in sensitive_fields: if field in encrypted_metadata: encrypted_metadata[field] = self.cipher.encrypt( str(encrypted_metadata[field]).encode() ).decode()
return { "content": encrypted_content.decode(), "metadata": encrypted_metadata, "encrypted": True }
def decrypt_document(self, encrypted_doc: dict) -> dict: """Dechiffre le document pour utilisation""" if not encrypted_doc.get("encrypted"): return encrypted_doc
content = self.cipher.decrypt( encrypted_doc["content"].encode() ).decode()
return { "content": content, "metadata": self._decrypt_metadata(encrypted_doc["metadata"]) }
def encrypt_embedding(self, embedding: list[float]) -> bytes: """Chiffre les embeddings (optionnel, impact performance)""" import numpy as np embedding_bytes = np.array(embedding).tobytes() return self.cipher.encrypt(embedding_bytes) `
Controle d'acces granulaire (RBAC)
`python from enum import Enum from functools import wraps
class Permission(Enum): READ_PUBLIC = "read:public" READ_INTERNAL = "read:internal" READ_CONFIDENTIAL = "read:confidential" READ_RESTRICTED = "read:restricted" ADMIN = "admin"
class DocumentClassification(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted"
class RBACController: def __init__(self, user_service): self.user_service = user_service
self.permission_hierarchy = { Permission.READ_PUBLIC: [DocumentClassification.PUBLIC], Permission.READ_INTERNAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL ], Permission.READ_CONFIDENTIAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL ], Permission.READ_RESTRICTED: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL, DocumentClassification.RESTRICTED ] }
async def filter_accessible_documents( self, documents: list[dict], user_id: str ) -> list[dict]: """Filtre les documents selon les permissions utilisateur""" user = await self.user_service.get_user(user_id) user_permissions = set(user.get("permissions", []))
accessible = [] for doc in documents: doc_class = DocumentClassification( doc.get("metadata", {}).get("classification", "internal") )
if self._can_access(user_permissions, doc_class): accessible.append(doc)
return accessible
def _can_access( self, user_permissions: set, doc_classification: DocumentClassification ) -> bool: """Verifie si l'utilisateur peut acceder au document""" for perm in user_permissions: try: allowed = self.permission_hierarchy.get(Permission(perm), []) if doc_classification in allowed: return True except ValueError: continue return False `
Protection des donnees personnelles
Detection et anonymisation automatique
`python import re from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine
class PIIProtector: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine()
Patterns personnalises pour la France self.french_patterns = { "FRENCH_SSN": r"\b[12][0-9]{2}[0-1][0-9][0-9]{2}[0-9]{3}[0-9]{3}[0-9]{2}\b", "FRENCH_PHONE": r"\b(?:(?:\+33|0033|0)1-9{4})\b", "IBAN_FR": r"\bFR\d{2}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{3}\b" }
def detect_pii(self, text: str, language: str = "fr") -> list[dict]: """Detecte les donnees personnelles dans le texte""" Detection Presidio (email, nom, adresse, etc.) results = self.analyzer.analyze( text=text, language=language, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", "CREDIT_CARD", "IBAN_CODE" ] )
Detection patterns francais french_results = self._detect_french_patterns(text)
all_results = list(results) + french_results return self._format_results(all_results)
def anonymize( self, text: str, mode: str = "replace" replace, hash, mask ) -> str: """Anonymise les donnees personnelles""" results = self.analyzer.analyze(text=text, language="fr")
if mode == "replace": anonymized = self.anonymizer.anonymize( text=text, analyzer_results=results ) elif mode == "hash": anonymized = self._hash_pii(text, results) elif mode == "mask": anonymized = self._mask_pii(text, results)
return anonymized.text
def _hash_pii(self, text: str, results: list) -> str: """Remplace les PII par leur hash (reversible avec cle)""" import hashlib
result_text = text for result in sorted(results, key=lambda x: x.start, reverse=True): original = text[result.start:result.end] hashed = hashlib.sha256(original.encode()).hexdigest()[:12] result_text = result_text[:result.start] + f"[{result.entity_type}:{hashed}]" + result_text[result.end:]
return result_text
def check_before_indexing(self, document: dict) -> dict: """Verifie et traite un document avant indexation""" content = document.get("content", "")
pii_detected = self.detect_pii(content)
if pii_detected: return { "safe_to_index": False, "pii_found": pii_detected, "action_required": "review_or_anonymize", "anonymized_content": self.anonymize(content) }
return { "safe_to_index": True, "pii_found": [], "original_content": content } `
Consentement et droits des personnes
`python class ConsentManager: def __init__(self, db): self.db = db
async def record_consent( self, user_id: str, purpose: str, data_categories: list[str], expiry_date: datetime = None ): """Enregistre le consentement utilisateur""" consent = { "user_id": user_id, "purpose": purpose, "data_categories": data_categories, "granted_at": datetime.now(), "expires_at": expiry_date, "status": "active" }
await self.db.insert("consents", consent)
async def check_consent( self, user_id: str, purpose: str, data_category: str ) -> bool: """Verifie si le consentement est valide""" consent = await self.db.find_one("consents", { "user_id": user_id, "purpose": purpose, "data_categories": {"$in": [data_category]}, "status": "active", "$or": [ {"expires_at": None}, {"expires_at": {"$gt": datetime.now()}} ] })
return consent is not None
async def handle_deletion_request(self, user_id: str) -> dict: """Traite une demande de suppression (droit a l'oubli)""" Identifier toutes les donnees user_data = await self._find_all_user_data(user_id) Supprimer des index vectoriels await self._delete_from_vector_db(user_data["document_ids"]) Supprimer des bases de donnees await self._delete_from_databases(user_id) Journaliser la suppression await self._log_deletion(user_id, user_data)
return { "status": "completed", "deleted_documents": len(user_data["document_ids"]), "deleted_conversations": user_data["conversation_count"], "completion_date": datetime.now().isoformat() }
async def handle_export_request(self, user_id: str) -> dict: """Traite une demande d'export (droit a la portabilite)""" user_data = await self._find_all_user_data(user_id)
export = { "user_profile": user_data["profile"], "conversations": user_data["conversations"], "indexed_documents": user_data["documents"], "consents": user_data["consents"], "export_date": datetime.now().isoformat() }
return export `
Audit et tracabilite
Logging securise
`python import logging import json from datetime import datetime
class AuditLogger: def __init__(self, log_destination: str): self.logger = logging.getLogger("rag_audit") self.logger.setLevel(logging.INFO)
Handler securise (fichier chiffre ou SIEM) handler = logging.FileHandler(log_destination) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler)
def log_query( self, user_id: str, query: str, documents_accessed: list[str], response_generated: bool ): """Log chaque requete RAG""" NE PAS logger le contenu de la query si sensible log_entry = { "event_type": "rag_query", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "query_hash": self._hash_content(query), "query_length": len(query), "documents_accessed": documents_accessed, "document_count": len(documents_accessed), "response_generated": response_generated }
self.logger.info(json.dumps(log_entry))
def log_document_access( self, user_id: str, document_id: str, access_type: str, classification: str ): """Log les acces aux documents""" log_entry = { "event_type": "document_access", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "document_id": document_id, "access_type": access_type, "classification": classification }
self.logger.info(json.dumps(log_entry))
def log_security_event( self, event_type: str, severity: str, details: dict ): """Log les evenements securite""" log_entry = { "event_type": f"security_{event_type}", "timestamp": datetime.now().isoformat(), "severity": severity, "details": details }
if severity in ["high", "critical"]: self.logger.warning(json.dumps(log_entry)) self._alert_security_team(log_entry) else: self.logger.info(json.dumps(log_entry))
def _hash_content(self, content: str) -> str: import hashlib return hashlib.sha256(content.encode()).hexdigest() `
Retention et purge
`python class DataRetentionManager: def __init__(self, db, vector_db, config: dict): self.db = db self.vector_db = vector_db self.config = config
async def apply_retention_policy(self): """Applique la politique de retention"""
Conversations conv_retention = self.config.get("conversation_retention_days", 365) await self._purge_old_conversations(conv_retention)
Logs d'audit (obligation legale plus longue) audit_retention = self.config.get("audit_retention_days", 2190) 6 ans await self._archive_old_audits(audit_retention)
Documents expires await self._handle_expired_documents()
Metadonnees orphelines await self._cleanup_orphaned_data()
async def _purge_old_conversations(self, days: int): """Supprime les conversations anciennes""" cutoff_date = datetime.now() - timedelta(days=days)
Anonymiser plutot que supprimer si besoin analytics await self.db.update_many( "conversations", {"created_at": {"$lt": cutoff_date}}, { "$set": { "user_id": "anonymized", "messages": [], "purged_at": datetime.now() } } )
async def _handle_expired_documents(self): """Gere les documents avec date d'expiration""" expired = await self.db.find( "documents", {"expires_at": {"$lt": datetime.now()}} )
for doc in expired: Supprimer de l'index vectoriel await self.vector_db.delete(doc["id"])
Marquer comme expire await self.db.update( "documents", {"id": doc["id"]}, {"$set": {"status": "expired", "content": None}} ) `
Securite operationnelle
Protection contre les injections
`python class PromptSecurityGuard: def __init__(self): self.injection_patterns = [ r"ignore\s+(previous|all|above)\s+instructions", r"disregard\s+(your|the)\s+(rules|instructions)", r"you\s+are\s+now\s+", r"pretend\s+(you|to)\s+", r"act\s+as\s+if", r"system\s:\s", r"<\|.\|>", r"\[INST\]", r"###\s(instruction|system)", ]
def check_query(self, query: str) -> dict: """Verifie la securite d'une requete""" query_lower = query.lower()
Detection patterns d'injection for pattern in self.injection_patterns: if re.search(pattern, query_lower, re.IGNORECASE): return { "safe": False, "reason": "potential_injection", "pattern_matched": pattern }
Detection tentatives d'exfiltration if self._detect_exfiltration_attempt(query): return { "safe": False, "reason": "potential_exfiltration" }
Detection contenu malveillant if self._detect_malicious_content(query): return { "safe": False, "reason": "malicious_content" }
return {"safe": True}
def sanitize_context(self, context: str) -> str: """Nettoie le contexte avant injection dans le prompt""" Supprimer les tentatives d'injection dans les documents sanitized = context
for pattern in self.injection_patterns: sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE)
return sanitized
def _detect_exfiltration_attempt(self, query: str) -> bool: """Detecte les tentatives d'exfiltration de donnees""" exfil_patterns = [ r"list\s+all\s+(users|passwords|keys|secrets)", r"show\s+(me\s+)?(the\s+)?system\s+prompt", r"what\s+(are|is)\s+(your|the)\s+(instructions|rules)", r"dump\s+(all|the)\s+data", r"export\s+.*\s+to\s+(email|url|external)" ]
for pattern in exfil_patterns: if re.search(pattern, query.lower()): return True
return False `
Rate limiting et protection DDoS
`python from redis import Redis import time
class RateLimiter: def __init__(self, redis_client: Redis): self.redis = redis_client
async def check_rate_limit( self, identifier: str, limit_type: str = "query" ) -> dict: """Verifie les limites de taux"""
limits = { "query": {"requests": 60, "window": 60}, 60 req/min "heavy_query": {"requests": 10, "window": 60}, 10 req/min "indexing": {"requests": 100, "window": 3600}, 100/heure "export": {"requests": 5, "window": 86400} 5/jour }
config = limits.get(limit_type, limits["query"])
key = f"ratelimit:{limit_type}:{identifier}" current = await self.redis.incr(key)
if current == 1: await self.redis.expire(key, config["window"])
remaining = max(0, config["requests"] - current)
if current > config["requests"]: return { "allowed": False, "remaining": 0, "reset_in": await self.redis.ttl(key) }
return { "allowed": True, "remaining": remaining, "limit": config["requests"] } `
Checklist conformite
Avant mise en production
`markdown RGPD • [ ] Registre des traitements mis a jour • [ ] Analyse d'impact (DPIA) si donnees sensibles • [ ] Information des personnes concernees • [ ] Mecanismes d'exercice des droits implementes • [ ] Contrats sous-traitants (DPA) signes • [ ] Mesures de securite documentees
AI Act (si applicable) • [ ] Classification du risque effectuee • [ ] Systeme de gestion des risques (si haut risque) • [ ] Documentation technique complete • [ ] Mecanisme de supervision humaine • [ ] Tests de robustesse et biais
Securite • [ ] Chiffrement au repos et en transit • [ ] Controle d'acces RBAC configure • [ ] Audit logging actif • [ ] Tests de penetration effectues • [ ] Plan de reponse aux incidents • [ ] Sauvegardes testees
Operationnel • [ ] Politique de retention definie • [ ] Processus de purge automatise • [ ] Monitoring securite actif • [ ] Formation equipes effectuee ``
Pour aller plus loin • RGPD et Chatbots - Focus chatbot client • AI Act et RAG - Implications reglementaires • Donnees sensibles - Traitement donnees critiques
---
Conformite simplifiee avec Ailog
La conformite RGPD et AI Act peut sembler complexe. Avec Ailog, beneficiez d'une infrastructure deja conforme : • Hebergement 100% France chez OVH, donnees jamais transferees hors UE • Chiffrement bout en bout AES-256 • RBAC natif avec gestion des permissions par document • Anonymisation automatique des donnees personnelles • Logs d'audit exportables pour vos DPO • DPA signe et documentation RGPD fournie
Testez Ailog gratuitement et deployez un RAG conforme des aujourd'hui.
---
FAQ
Un systeme RAG est-il concerne par le RGPD ?
Oui, des qu'il traite des donnees personnelles (noms, emails, conversations). Le RAG est considere comme un traitement automatise necessitant consentement, information des personnes et mesures de securite. Les donnees vectorielles sont aussi concernees car elles permettent de reconstituer les textes originaux.
L'AI Act s'applique-t-il aux chatbots RAG ?
Cela depend du cas d'usage. Un chatbot support client est generalement "risque limite" (obligations de transparence). Un RAG utilise pour des decisions RH ou credit serait "haut risque" avec des obligations strictes de documentation et supervision humaine.
Comment anonymiser les donnees dans un RAG ?
Trois approches : suppression des identifiants avant indexation, pseudonymisation avec mapping securise, ou filtrage dynamique a la generation. L'anonymisation doit etre irreversible pour les donnees non necessaires, reversible (pseudonymisation) si besoin de traceabilite.
Peut-on heberger un RAG hors de l'UE ?
Techniquement oui, mais cela complexifie la conformite RGPD. Les transferts hors UE necessitent des garanties (clauses contractuelles, decisions d'adequation). Pour les donnees sensibles, l'hebergement souverain en France est fortement recommande.
Quels logs conserver pour l'audit ?
Au minimum : identifiant utilisateur, timestamp, requete, sources utilisees, reponse generee. Pour la conformite AI Act : decisions automatisees, interventions humaines, incidents. Duree de conservation : selon votre politique, generalement 1 a 3 ans.