Securite et Conformite RAG : RGPD, AI Act et bonnes pratiques
Securisez votre systeme RAG : conformite RGPD, AI Act europeen, protection des donnees et audit. Guide complet pour les entreprises.
Securite et Conformite RAG : RGPD, AI Act et bonnes pratiques
Deployer un systeme RAG en entreprise implique de manipuler des donnees souvent sensibles : documents internes, informations clients, propriete intellectuelle. Ce guide couvre les aspects securite et conformite essentiels pour un deploiement serein.
Le cadre reglementaire europeen
RGPD : les fondamentaux pour le RAG
Le Reglement General sur la Protection des Donnees s'applique des qu'un systeme RAG traite des donnees personnelles europeennes.
| Principe RGPD | Application RAG |
|---|---|
| Liceit du traitement | Base legale pour indexer et utiliser les donnees |
| Minimisation | N'indexer que les donnees necessaires |
| Exactitude | Maintenir la base de connaissances a jour |
| Limitation conservation | Purger les donnees obsoletes |
| Integrite et confidentialite | Chiffrement et controle d'acces |
| Responsabilite | Documenter les traitements |
AI Act : ce qui change pour les systemes RAG
L'AI Act europeen (entree en vigueur progressive 2024-2027) classifie les systemes IA par niveau de risque :
Risque limite (majorite des RAG) :
- Chatbots support client
- Assistants internes
- Recherche documentaire
Obligations :
- Transparence : indiquer que l'utilisateur interagit avec une IA
- Conservation des logs pour audit
Risque eleve (cas specifiques) :
- RAG pour decisions RH (recrutement, evaluation)
- RAG pour decisions credit/assurance
- RAG sante avec impact medical
Obligations supplementaires :
- Evaluation de conformite
- Systeme de gestion des risques
- Donnees d'entrainement documentees
- Supervision humaine
DEVELOPERpython# Classification du risque selon l'AI Act def classify_rag_risk(use_case: dict) -> str: high_risk_domains = [ "hr_recruitment", "hr_evaluation", "credit_scoring", "insurance_pricing", "medical_diagnosis", "legal_decision" ] if use_case.get("domain") in high_risk_domains: return "high_risk" if use_case.get("automated_decision") and use_case.get("significant_impact"): return "high_risk" return "limited_risk"
Architecture securisee
Separation des environnements
┌─────────────────────────────────────────────────────────────┐
│ PRODUCTION │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ API GW │──│ RAG API │──│ Vector DB │ │
│ │ (WAF) │ │ (Isolated) │ │ (Encrypted) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Network Isolation (VPC) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ STAGING │
│ (Donnees anonymisees uniquement) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ DEVELOPMENT │
│ (Donnees synthetiques) │
└─────────────────────────────────────────────────────────────┘
Chiffrement bout en bout
DEVELOPERpythonfrom cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 class SecureRAGStorage: def __init__(self, master_key: bytes): self.cipher = Fernet(master_key) def encrypt_document(self, content: str, metadata: dict) -> dict: """Chiffre le contenu avant stockage""" encrypted_content = self.cipher.encrypt(content.encode()) # Chiffrer les metadonnees sensibles sensitive_fields = ["author", "email", "department"] encrypted_metadata = metadata.copy() for field in sensitive_fields: if field in encrypted_metadata: encrypted_metadata[field] = self.cipher.encrypt( str(encrypted_metadata[field]).encode() ).decode() return { "content": encrypted_content.decode(), "metadata": encrypted_metadata, "encrypted": True } def decrypt_document(self, encrypted_doc: dict) -> dict: """Dechiffre le document pour utilisation""" if not encrypted_doc.get("encrypted"): return encrypted_doc content = self.cipher.decrypt( encrypted_doc["content"].encode() ).decode() return { "content": content, "metadata": self._decrypt_metadata(encrypted_doc["metadata"]) } def encrypt_embedding(self, embedding: list[float]) -> bytes: """Chiffre les embeddings (optionnel, impact performance)""" import numpy as np embedding_bytes = np.array(embedding).tobytes() return self.cipher.encrypt(embedding_bytes)
Controle d'acces granulaire (RBAC)
DEVELOPERpythonfrom enum import Enum from functools import wraps class Permission(Enum): READ_PUBLIC = "read:public" READ_INTERNAL = "read:internal" READ_CONFIDENTIAL = "read:confidential" READ_RESTRICTED = "read:restricted" ADMIN = "admin" class DocumentClassification(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted" class RBACController: def __init__(self, user_service): self.user_service = user_service self.permission_hierarchy = { Permission.READ_PUBLIC: [DocumentClassification.PUBLIC], Permission.READ_INTERNAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL ], Permission.READ_CONFIDENTIAL: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL ], Permission.READ_RESTRICTED: [ DocumentClassification.PUBLIC, DocumentClassification.INTERNAL, DocumentClassification.CONFIDENTIAL, DocumentClassification.RESTRICTED ] } async def filter_accessible_documents( self, documents: list[dict], user_id: str ) -> list[dict]: """Filtre les documents selon les permissions utilisateur""" user = await self.user_service.get_user(user_id) user_permissions = set(user.get("permissions", [])) accessible = [] for doc in documents: doc_class = DocumentClassification( doc.get("metadata", {}).get("classification", "internal") ) if self._can_access(user_permissions, doc_class): accessible.append(doc) return accessible def _can_access( self, user_permissions: set, doc_classification: DocumentClassification ) -> bool: """Verifie si l'utilisateur peut acceder au document""" for perm in user_permissions: try: allowed = self.permission_hierarchy.get(Permission(perm), []) if doc_classification in allowed: return True except ValueError: continue return False
Protection des donnees personnelles
Detection et anonymisation automatique
DEVELOPERpythonimport re from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine class PIIProtector: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine() # Patterns personnalises pour la France self.french_patterns = { "FRENCH_SSN": r"\b[12][0-9]{2}[0-1][0-9][0-9]{2}[0-9]{3}[0-9]{3}[0-9]{2}\b", "FRENCH_PHONE": r"\b(?:(?:\+33|0033|0)[1-9](?:[.\-\s]?\d{2}){4})\b", "IBAN_FR": r"\bFR\d{2}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{4}[\s]?\d{3}\b" } def detect_pii(self, text: str, language: str = "fr") -> list[dict]: """Detecte les donnees personnelles dans le texte""" # Detection Presidio (email, nom, adresse, etc.) results = self.analyzer.analyze( text=text, language=language, entities=[ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", "CREDIT_CARD", "IBAN_CODE" ] ) # Detection patterns francais french_results = self._detect_french_patterns(text) all_results = list(results) + french_results return self._format_results(all_results) def anonymize( self, text: str, mode: str = "replace" # replace, hash, mask ) -> str: """Anonymise les donnees personnelles""" results = self.analyzer.analyze(text=text, language="fr") if mode == "replace": anonymized = self.anonymizer.anonymize( text=text, analyzer_results=results ) elif mode == "hash": anonymized = self._hash_pii(text, results) elif mode == "mask": anonymized = self._mask_pii(text, results) return anonymized.text def _hash_pii(self, text: str, results: list) -> str: """Remplace les PII par leur hash (reversible avec cle)""" import hashlib result_text = text for result in sorted(results, key=lambda x: x.start, reverse=True): original = text[result.start:result.end] hashed = hashlib.sha256(original.encode()).hexdigest()[:12] result_text = result_text[:result.start] + f"[{result.entity_type}:{hashed}]" + result_text[result.end:] return result_text def check_before_indexing(self, document: dict) -> dict: """Verifie et traite un document avant indexation""" content = document.get("content", "") pii_detected = self.detect_pii(content) if pii_detected: return { "safe_to_index": False, "pii_found": pii_detected, "action_required": "review_or_anonymize", "anonymized_content": self.anonymize(content) } return { "safe_to_index": True, "pii_found": [], "original_content": content }
Consentement et droits des personnes
DEVELOPERpythonclass ConsentManager: def __init__(self, db): self.db = db async def record_consent( self, user_id: str, purpose: str, data_categories: list[str], expiry_date: datetime = None ): """Enregistre le consentement utilisateur""" consent = { "user_id": user_id, "purpose": purpose, "data_categories": data_categories, "granted_at": datetime.now(), "expires_at": expiry_date, "status": "active" } await self.db.insert("consents", consent) async def check_consent( self, user_id: str, purpose: str, data_category: str ) -> bool: """Verifie si le consentement est valide""" consent = await self.db.find_one("consents", { "user_id": user_id, "purpose": purpose, "data_categories": {"$in": [data_category]}, "status": "active", "$or": [ {"expires_at": None}, {"expires_at": {"$gt": datetime.now()}} ] }) return consent is not None async def handle_deletion_request(self, user_id: str) -> dict: """Traite une demande de suppression (droit a l'oubli)""" # 1. Identifier toutes les donnees user_data = await self._find_all_user_data(user_id) # 2. Supprimer des index vectoriels await self._delete_from_vector_db(user_data["document_ids"]) # 3. Supprimer des bases de donnees await self._delete_from_databases(user_id) # 4. Journaliser la suppression await self._log_deletion(user_id, user_data) return { "status": "completed", "deleted_documents": len(user_data["document_ids"]), "deleted_conversations": user_data["conversation_count"], "completion_date": datetime.now().isoformat() } async def handle_export_request(self, user_id: str) -> dict: """Traite une demande d'export (droit a la portabilite)""" user_data = await self._find_all_user_data(user_id) export = { "user_profile": user_data["profile"], "conversations": user_data["conversations"], "indexed_documents": user_data["documents"], "consents": user_data["consents"], "export_date": datetime.now().isoformat() } return export
Audit et tracabilite
Logging securise
DEVELOPERpythonimport logging import json from datetime import datetime class AuditLogger: def __init__(self, log_destination: str): self.logger = logging.getLogger("rag_audit") self.logger.setLevel(logging.INFO) # Handler securise (fichier chiffre ou SIEM) handler = logging.FileHandler(log_destination) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) def log_query( self, user_id: str, query: str, documents_accessed: list[str], response_generated: bool ): """Log chaque requete RAG""" # NE PAS logger le contenu de la query si sensible log_entry = { "event_type": "rag_query", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "query_hash": self._hash_content(query), "query_length": len(query), "documents_accessed": documents_accessed, "document_count": len(documents_accessed), "response_generated": response_generated } self.logger.info(json.dumps(log_entry)) def log_document_access( self, user_id: str, document_id: str, access_type: str, classification: str ): """Log les acces aux documents""" log_entry = { "event_type": "document_access", "timestamp": datetime.now().isoformat(), "user_id": self._hash_if_needed(user_id), "document_id": document_id, "access_type": access_type, "classification": classification } self.logger.info(json.dumps(log_entry)) def log_security_event( self, event_type: str, severity: str, details: dict ): """Log les evenements securite""" log_entry = { "event_type": f"security_{event_type}", "timestamp": datetime.now().isoformat(), "severity": severity, "details": details } if severity in ["high", "critical"]: self.logger.warning(json.dumps(log_entry)) self._alert_security_team(log_entry) else: self.logger.info(json.dumps(log_entry)) def _hash_content(self, content: str) -> str: import hashlib return hashlib.sha256(content.encode()).hexdigest()
Retention et purge
DEVELOPERpythonclass DataRetentionManager: def __init__(self, db, vector_db, config: dict): self.db = db self.vector_db = vector_db self.config = config async def apply_retention_policy(self): """Applique la politique de retention""" # Conversations conv_retention = self.config.get("conversation_retention_days", 365) await self._purge_old_conversations(conv_retention) # Logs d'audit (obligation legale plus longue) audit_retention = self.config.get("audit_retention_days", 2190) # 6 ans await self._archive_old_audits(audit_retention) # Documents expires await self._handle_expired_documents() # Metadonnees orphelines await self._cleanup_orphaned_data() async def _purge_old_conversations(self, days: int): """Supprime les conversations anciennes""" cutoff_date = datetime.now() - timedelta(days=days) # Anonymiser plutot que supprimer si besoin analytics await self.db.update_many( "conversations", {"created_at": {"$lt": cutoff_date}}, { "$set": { "user_id": "anonymized", "messages": [], "purged_at": datetime.now() } } ) async def _handle_expired_documents(self): """Gere les documents avec date d'expiration""" expired = await self.db.find( "documents", {"expires_at": {"$lt": datetime.now()}} ) for doc in expired: # Supprimer de l'index vectoriel await self.vector_db.delete(doc["id"]) # Marquer comme expire await self.db.update( "documents", {"id": doc["id"]}, {"$set": {"status": "expired", "content": None}} )
Securite operationnelle
Protection contre les injections
DEVELOPERpythonclass PromptSecurityGuard: def __init__(self): self.injection_patterns = [ r"ignore\s+(previous|all|above)\s+instructions", r"disregard\s+(your|the)\s+(rules|instructions)", r"you\s+are\s+now\s+", r"pretend\s+(you|to)\s+", r"act\s+as\s+if", r"system\s*:\s*", r"<\|.*\|>", r"\[INST\]", r"###\s*(instruction|system)", ] def check_query(self, query: str) -> dict: """Verifie la securite d'une requete""" query_lower = query.lower() # Detection patterns d'injection for pattern in self.injection_patterns: if re.search(pattern, query_lower, re.IGNORECASE): return { "safe": False, "reason": "potential_injection", "pattern_matched": pattern } # Detection tentatives d'exfiltration if self._detect_exfiltration_attempt(query): return { "safe": False, "reason": "potential_exfiltration" } # Detection contenu malveillant if self._detect_malicious_content(query): return { "safe": False, "reason": "malicious_content" } return {"safe": True} def sanitize_context(self, context: str) -> str: """Nettoie le contexte avant injection dans le prompt""" # Supprimer les tentatives d'injection dans les documents sanitized = context for pattern in self.injection_patterns: sanitized = re.sub(pattern, "[FILTERED]", sanitized, flags=re.IGNORECASE) return sanitized def _detect_exfiltration_attempt(self, query: str) -> bool: """Detecte les tentatives d'exfiltration de donnees""" exfil_patterns = [ r"list\s+all\s+(users|passwords|keys|secrets)", r"show\s+(me\s+)?(the\s+)?system\s+prompt", r"what\s+(are|is)\s+(your|the)\s+(instructions|rules)", r"dump\s+(all|the)\s+data", r"export\s+.*\s+to\s+(email|url|external)" ] for pattern in exfil_patterns: if re.search(pattern, query.lower()): return True return False
Rate limiting et protection DDoS
DEVELOPERpythonfrom redis import Redis import time class RateLimiter: def __init__(self, redis_client: Redis): self.redis = redis_client async def check_rate_limit( self, identifier: str, limit_type: str = "query" ) -> dict: """Verifie les limites de taux""" limits = { "query": {"requests": 60, "window": 60}, # 60 req/min "heavy_query": {"requests": 10, "window": 60}, # 10 req/min "indexing": {"requests": 100, "window": 3600}, # 100/heure "export": {"requests": 5, "window": 86400} # 5/jour } config = limits.get(limit_type, limits["query"]) key = f"ratelimit:{limit_type}:{identifier}" current = await self.redis.incr(key) if current == 1: await self.redis.expire(key, config["window"]) remaining = max(0, config["requests"] - current) if current > config["requests"]: return { "allowed": False, "remaining": 0, "reset_in": await self.redis.ttl(key) } return { "allowed": True, "remaining": remaining, "limit": config["requests"] }
Checklist conformite
Avant mise en production
DEVELOPERmarkdown## RGPD - [ ] Registre des traitements mis a jour - [ ] Analyse d'impact (DPIA) si donnees sensibles - [ ] Information des personnes concernees - [ ] Mecanismes d'exercice des droits implementes - [ ] Contrats sous-traitants (DPA) signes - [ ] Mesures de securite documentees ## AI Act (si applicable) - [ ] Classification du risque effectuee - [ ] Systeme de gestion des risques (si haut risque) - [ ] Documentation technique complete - [ ] Mecanisme de supervision humaine - [ ] Tests de robustesse et biais ## Securite - [ ] Chiffrement au repos et en transit - [ ] Controle d'acces RBAC configure - [ ] Audit logging actif - [ ] Tests de penetration effectues - [ ] Plan de reponse aux incidents - [ ] Sauvegardes testees ## Operationnel - [ ] Politique de retention definie - [ ] Processus de purge automatise - [ ] Monitoring securite actif - [ ] Formation equipes effectuee
Pour aller plus loin
- RGPD et Chatbots - Focus chatbot client
- AI Act et RAG - Implications reglementaires
- Donnees sensibles - Traitement donnees critiques
Conformite simplifiee avec Ailog
La conformite RGPD et AI Act peut sembler complexe. Avec Ailog, beneficiez d'une infrastructure deja conforme :
- Hebergement 100% France chez OVH, donnees jamais transferees hors UE
- Chiffrement bout en bout AES-256
- RBAC natif avec gestion des permissions par document
- Anonymisation automatique des donnees personnelles
- Logs d'audit exportables pour vos DPO
- DPA signe et documentation RGPD fournie
Testez Ailog gratuitement et deployez un RAG conforme des aujourd'hui.
FAQ
Tags
Articles connexes
Magento : Assistant catalogue intelligent
Deployer un assistant IA sur Magento pour naviguer dans les catalogues complexes, recommander des produits et ameliorer l'experience B2B et B2C.
Upsell et Cross-sell : Recommandations IA personnalisees
Augmenter le panier moyen avec des recommandations IA intelligentes : upsell, cross-sell et bundles personnalises bases sur le RAG.
FAQ dynamique e-commerce : Generer des reponses contextuelles
Creer une FAQ intelligente pour votre boutique en ligne : reponses personnalisees selon le produit, le client et le contexte d'achat.