Base de connaissances intelligente : Centraliser le savoir d'entreprise
Créez une base de connaissances IA pour votre entreprise : documentation technique, onboarding, expertise métier accessibles instantanément.
- Auteur
- Équipe Ailog
- Date de publication
- Temps de lecture
- 19 min de lecture
- Niveau
- intermediate
Base de connaissances intelligente : Centraliser le savoir d'entreprise
Dans toute entreprise, le savoir est dispersé : documents Confluence, Google Drive, emails, expertise dans la tête des collaborateurs. Une base de connaissances intelligente propulsée par le RAG transforme ce chaos en un assistant capable de répondre instantanément à toute question métier.
Le problème de la connaissance dispersée
La réalité des entreprises
Une étude McKinsey révèle qu'un employé passe en moyenne 1.8 heures par jour à chercher des informations. C'est 9.3 heures par semaine, soit 23% du temps de travail perdu en recherche.
Les symptômes classiques : • "Où est le document sur la procédure X ?" • "Qui sait comment configurer Y ?" • "On avait déjà résolu ce problème, mais je ne retrouve plus la solution" • "Je suis nouveau, je ne sais pas où chercher"
Les limites des solutions traditionnelles
| Solution | Problème | |----------|----------| | Wiki/Confluence | Navigation complexe, recherche limitée aux mots-clés exacts | | Google Drive | Pas de structuration, impossible de chercher dans le contenu | | Slack/Teams | Messages éparpillés, pas de capitalisation | | Experts internes | Single point of failure, charge cognitive | | FAQ statiques | Jamais à jour, ne couvrent pas tous les cas |
L'avantage d'une KB RAG
Une base de connaissances RAG permet de : • Recherche sémantique : Trouver l'info même sans connaître les termes exacts • Synthèse : Obtenir une réponse consolidée de plusieurs sources • Contextualisation : Réponses adaptées au profil de l'utilisateur • Mise à jour continue : Synchronisation automatique avec les sources • Capitalisation : Chaque réponse enrichit la base
Architecture d'une KB intelligente
`` ┌─────────────────────────────────────────────────────────────┐ │ SOURCES DE DONNÉES │ ├──────────┬──────────┬──────────┬──────────┬────────────────┤ │Confluence│ Google │ Notion │ Slack │ Documents │ │ │ Drive │ │(archivé) │ PDF │ └────┬─────┴────┬─────┴────┬─────┴────┬─────┴───────┬────────┘ │ │ │ │ │ └──────────┴──────────┴──────────┴─────────────┘ │ ▼ ┌─────────────────────────┐ │ Ingestion Pipeline │ │ - Extraction texte │ │ - Chunking intelligent │ │ - Métadonnées │ └───────────┬─────────────┘ ▼ ┌─────────────────────────┐ │ Embedding + │ │ Indexation │ └───────────┬─────────────┘ ▼ ┌─────────────────────────┐ │ Base Vectorielle │ │ (Qdrant) │ └───────────┬─────────────┘ │ ┌─────────────────────────┴──────────────────────────────────┐ │ INTERFACE │ ├─────────────┬─────────────┬─────────────┬─────────────────┤ │ Chatbot │ Slack │ API │ Recherche │ │ Web │ Bot │ interne │ Avancée │ └─────────────┴─────────────┴─────────────┴─────────────────┘ `
Connecteurs de sources
Connecteur Confluence
`python from atlassian import Confluence import html2text
class ConfluenceConnector: def __init__(self, url: str, username: str, api_token: str): self.confluence = Confluence( url=url, username=username, password=api_token, cloud=True ) self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False
def get_all_pages(self, space_keys: list[str] = None) -> list[dict]: """ Récupère toutes les pages des espaces spécifiés """ documents = []
if space_keys is None: spaces = self.confluence.get_all_spaces() space_keys = [s['key'] for s in spaces['results']]
for space_key in space_keys: pages = self._get_space_pages(space_key) documents.extend(pages)
return documents
def _get_space_pages(self, space_key: str) -> list[dict]: """ Récupère toutes les pages d'un espace """ pages = [] start = 0 limit = 50
while True: result = self.confluence.get_all_pages_from_space( space_key, start=start, limit=limit, expand='body.storage,ancestors,version' )
for page in result: pages.append(self._format_page(page, space_key))
if len(result) < limit: break start += limit
return pages
def _format_page(self, page: dict, space_key: str) -> dict: """ Formate une page Confluence pour le RAG """ Convertir HTML en texte html_content = page.get('body', {}).get('storage', {}).get('value', '') text_content = self.html_converter.handle(html_content)
Construire le chemin hiérarchique ancestors = page.get('ancestors', []) path = ' > '.join([a['title'] for a in ancestors] + [page['title']])
return { "id": f"confluence_{page['id']}", "title": page['title'], "content": f"{page['title']}\n\n{text_content}", "metadata": { "type": "confluence", "source": "confluence", "space": space_key, "page_id": page['id'], "path": path, "url": f"{self.confluence.url}/wiki/spaces/{space_key}/pages/{page['id']}", "author": page.get('version', {}).get('by', {}).get('displayName'), "last_updated": page.get('version', {}).get('when'), "version": page.get('version', {}).get('number') } } `
Connecteur Google Drive
`python from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import io
class GoogleDriveConnector: def __init__(self, credentials_path: str, folder_ids: list[str] = None): credentials = service_account.Credentials.from_service_account_file( credentials_path, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) self.service = build('drive', 'v3', credentials=credentials) self.folder_ids = folder_ids or ['root']
def get_all_documents(self) -> list[dict]: """ Récupère tous les documents des dossiers spécifiés """ documents = []
for folder_id in self.folder_ids: docs = self._get_folder_documents(folder_id) documents.extend(docs)
return documents
def _get_folder_documents(self, folder_id: str, path: str = "") -> list[dict]: """ Récupère récursivement les documents d'un dossier """ documents = []
Lister les fichiers query = f"'{folder_id}' in parents and trashed = false" results = self.service.files().list( q=query, fields="files(id, name, mimeType, modifiedTime, owners, webViewLink)", pageSize=100 ).execute()
for file in results.get('files', []): current_path = f"{path}/{file['name']}" if path else file['name']
if file['mimeType'] == 'application/vnd.google-apps.folder': Récursion dans les sous-dossiers sub_docs = self._get_folder_documents(file['id'], current_path) documents.extend(sub_docs) else: Extraire le contenu du fichier content = self._extract_content(file) if content: documents.append({ "id": f"gdrive_{file['id']}", "title": file['name'], "content": content, "metadata": { "type": "google_drive", "source": "google_drive", "file_id": file['id'], "path": current_path, "mime_type": file['mimeType'], "url": file.get('webViewLink'), "author": file.get('owners', [{}])[0].get('displayName'), "last_updated": file.get('modifiedTime') } })
return documents
def _extract_content(self, file: dict) -> str: """ Extrait le contenu textuel d'un fichier """ mime_type = file['mimeType'] file_id = file['id']
Google Docs - export en texte if mime_type == 'application/vnd.google-apps.document': return self._export_google_doc(file_id)
Google Sheets - export en CSV elif mime_type == 'application/vnd.google-apps.spreadsheet': return self._export_google_sheet(file_id)
PDF - télécharger et extraire elif mime_type == 'application/pdf': return self._extract_pdf(file_id)
Fichiers texte elif mime_type.startswith('text/'): return self._download_text_file(file_id)
return None
def _export_google_doc(self, file_id: str) -> str: """ Exporte un Google Doc en texte """ request = self.service.files().export_media( fileId=file_id, mimeType='text/plain' ) content = io.BytesIO() downloader = MediaIoBaseDownload(content, request)
done = False while not done: _, done = downloader.next_chunk()
return content.getvalue().decode('utf-8') `
Connecteur Notion
`python from notion_client import Client
class NotionConnector: def __init__(self, token: str, database_ids: list[str] = None): self.client = Client(auth=token) self.database_ids = database_ids
def get_all_pages(self) -> list[dict]: """ Récupère toutes les pages Notion """ documents = []
if self.database_ids: for db_id in self.database_ids: pages = self._get_database_pages(db_id) documents.extend(pages) else: Rechercher toutes les pages accessibles results = self.client.search(filter={"property": "object", "value": "page"}) for page in results['results']: doc = self._format_page(page) if doc: documents.append(doc)
return documents
def _get_database_pages(self, database_id: str) -> list[dict]: """ Récupère les pages d'une database Notion """ documents = [] has_more = True start_cursor = None
while has_more: response = self.client.databases.query( database_id=database_id, start_cursor=start_cursor )
for page in response['results']: doc = self._format_page(page) if doc: documents.append(doc)
has_more = response['has_more'] start_cursor = response.get('next_cursor')
return documents
def _format_page(self, page: dict) -> dict: """ Formate une page Notion pour le RAG """ Extraire le titre title = self._extract_title(page) if not title: return None
Extraire le contenu content = self._extract_page_content(page['id'])
return { "id": f"notion_{page['id']}", "title": title, "content": f"{title}\n\n{content}", "metadata": { "type": "notion", "source": "notion", "page_id": page['id'], "url": page['url'], "created_time": page['created_time'], "last_updated": page['last_edited_time'] } }
def _extract_page_content(self, page_id: str) -> str: """ Extrait le contenu texte d'une page Notion """ blocks = self.client.blocks.children.list(block_id=page_id) content_parts = []
for block in blocks['results']: text = self._block_to_text(block) if text: content_parts.append(text)
return "\n\n".join(content_parts)
def _block_to_text(self, block: dict) -> str: """ Convertit un bloc Notion en texte """ block_type = block['type']
if block_type == 'paragraph': return self._rich_text_to_string(block['paragraph']['rich_text'])
elif block_type in ['heading_1', 'heading_2', 'heading_3']: prefix = '#' int(block_type[-1]) text = self._rich_text_to_string(block[block_type]['rich_text']) return f"{prefix} {text}"
elif block_type == 'bulleted_list_item': return f"- {self._rich_text_to_string(block['bulleted_list_item']['rich_text'])}"
elif block_type == 'numbered_list_item': return f"1. {self._rich_text_to_string(block['numbered_list_item']['rich_text'])}"
elif block_type == 'code': code = self._rich_text_to_string(block['code']['rich_text']) lang = block['code'].get('language', '') return f"`{lang}\n{code}\n`"
return ""
def _rich_text_to_string(self, rich_text: list) -> str: return "".join([rt['plain_text'] for rt in rich_text]) `
Pipeline d'ingestion intelligent
Chunking adaptatif par type de document
`python from langchain.text_splitter import ( RecursiveCharacterTextSplitter, MarkdownTextSplitter, Language )
class AdaptiveChunker: def __init__(self): self.chunkers = { "markdown": MarkdownTextSplitter( chunk_size=1000, chunk_overlap=100 ), "code": RecursiveCharacterTextSplitter.from_language( language=Language.PYTHON, chunk_size=500, chunk_overlap=50 ), "prose": RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, separators=["\n\n", "\n", ". ", " ", ""] ), "faq": RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=0, separators=["Q:", "Question:", "---"] ) }
def chunk(self, document: dict) -> list[dict]: """ Découpe un document avec la stratégie adaptée """ doc_type = self._detect_type(document) chunker = self.chunkers.get(doc_type, self.chunkers["prose"])
chunks = chunker.split_text(document["content"])
return [ { "id": f"{document['id']}_chunk_{i}", "content": chunk, "metadata": { document["metadata"], "parent_id": document["id"], "parent_title": document["title"], "chunk_index": i, "total_chunks": len(chunks) } } for i, chunk in enumerate(chunks) ]
def _detect_type(self, document: dict) -> str: """ Détecte le type de document pour adapter le chunking """ content = document["content"] title = document["title"].lower()
FAQ if any(kw in title for kw in ["faq", "questions", "q&a"]): return "faq"
Code code_indicators = ["`", "def ", "class ", "function ", "import "] if sum(1 for ind in code_indicators if ind in content) >= 2: return "code"
Markdown structuré if content.count("#") >= 3 or content.count("##") >= 2: return "markdown"
return "prose" `
Enrichissement des métadonnées
`python class MetadataEnricher: def __init__(self, llm=None): self.llm = llm
async def enrich(self, document: dict) -> dict: """ Enrichit les métadonnées d'un document """ enriched = document.copy()
Catégorisation automatique enriched["metadata"]["category"] = await self._categorize(document)
Extraction d'entités enriched["metadata"]["entities"] = await self._extract_entities(document)
Score de qualité enriched["metadata"]["quality_score"] = self._calculate_quality(document)
Tags automatiques enriched["metadata"]["auto_tags"] = await self._generate_tags(document)
return enriched
async def _categorize(self, document: dict) -> str: """ Catégorise automatiquement le document """ if not self.llm: return "uncategorized"
prompt = f""" Catégorise ce document parmi les catégories suivantes : • procedure : Procédure ou guide étape par étape • reference : Documentation de référence technique • policy : Politique d'entreprise ou règlement • tutorial : Tutoriel ou formation • faq : Questions fréquentes • other : Autre
Titre : {document['title']} Début du contenu : {document['content'][:500]}
Réponds uniquement avec le nom de la catégorie. """
response = await self.llm.generate(prompt, temperature=0) return response.strip().lower()
async def _extract_entities(self, document: dict) -> dict: """ Extrait les entités nommées """ Extraction basique par regex import re
content = document["content"]
return { "tools": re.findall(r'\b(?:Jira|Confluence|Slack|GitHub|GitLab|AWS|Azure|GCP)\b', content, re.I), "teams": re.findall(r'équipe\s+(\w+)', content, re.I), "people": [], Nécessite NER plus avancé "versions": re.findall(r'v?\d+\.\d+(?:\.\d+)?', content) }
def _calculate_quality(self, document: dict) -> float: """ Calcule un score de qualité du document """ score = 1.0 content = document["content"]
Pénalités if len(content) < 100: score -= 0.3 Trop court if "TODO" in content or "WIP" in content: score -= 0.2 Non finalisé if document["metadata"].get("last_updated"): Ancien document from datetime import datetime, timedelta try: updated = datetime.fromisoformat(document["metadata"]["last_updated"].replace("Z", "+00:00")) if datetime.now(updated.tzinfo) - updated > timedelta(days=365): score -= 0.2 except: pass
Bonus if content.count("#") >= 2: score += 0.1 Bien structuré if "exemple" in content.lower() or "example" in content.lower(): score += 0.1 Contient des exemples
return max(0, min(1, score))
async def _generate_tags(self, document: dict) -> list[str]: """ Génère des tags automatiques """ if not self.llm: return []
prompt = f""" Génère 3 à 5 tags pertinents pour ce document.
Titre : {document['title']} Contenu : {document['content'][:1000]}
Réponds avec les tags séparés par des virgules. """
response = await self.llm.generate(prompt, temperature=0.3) tags = [t.strip().lower() for t in response.split(",")] return tags[:5] `
Recherche intelligente
Recherche hybride avec personnalisation
`python class KBSearch: def __init__(self, vector_db, bm25_index, user_service): self.vector_db = vector_db self.bm25_index = bm25_index self.user_service = user_service
async def search( self, query: str, user_id: str, top_k: int = 10, filters: dict = None ) -> list[dict]: """ Recherche hybride personnalisée """ Profil utilisateur pour personnalisation user_profile = await self.user_service.get_profile(user_id)
Recherche vectorielle vector_results = await self._vector_search(query, top_k 2, filters)
Recherche lexicale BM25 bm25_results = await self._bm25_search(query, top_k 2, filters)
Fusion RRF (Reciprocal Rank Fusion) fused = self._rrf_fusion(vector_results, bm25_results)
Personnalisation basée sur le profil personalized = self._personalize(fused, user_profile)
return personalized[:top_k]
def _rrf_fusion( self, results_1: list[dict], results_2: list[dict], k: int = 60 ) -> list[dict]: """ Fusion RRF de deux listes de résultats """ scores = {}
for rank, doc in enumerate(results_1): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
for rank, doc in enumerate(results_2): doc_id = doc["id"] scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
Créer la liste fusionnée all_docs = {d["id"]: d for d in results_1 + results_2} sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [ {all_docs[doc_id], "fusion_score": scores[doc_id]} for doc_id in sorted_ids if doc_id in all_docs ]
def _personalize(self, results: list[dict], user_profile: dict) -> list[dict]: """ Personnalise les résultats selon le profil utilisateur """ user_team = user_profile.get("team") user_role = user_profile.get("role") user_history = set(user_profile.get("viewed_docs", []))
for result in results: boost = 0
Boost pour documents de son équipe if result["metadata"].get("team") == user_team: boost += 0.1
Boost pour documents de son niveau if result["metadata"].get("target_role") == user_role: boost += 0.05
Léger malus pour documents déjà vus (favoriser la découverte) if result["id"] in user_history: boost -= 0.02
result["personalized_score"] = result.get("fusion_score", 0.5) + boost
Retrier par score personnalisé return sorted(results, key=lambda x: x["personalized_score"], reverse=True) `
Suggestions intelligentes
`python class KBSuggestions: def __init__(self, search, analytics): self.search = search self.analytics = analytics
async def get_suggestions(self, user_id: str, context: str = None) -> list[dict]: """ Génère des suggestions personnalisées """ suggestions = [] Documents populaires popular = await self._get_popular_docs() suggestions.extend([{"type": "popular", d} for d in popular[:3]]) Documents récemment mis à jour recent = await self._get_recently_updated() suggestions.extend([{"type": "updated", d} for d in recent[:3]]) Basé sur l'historique de l'utilisateur if user_id: related = await self._get_related_to_history(user_id) suggestions.extend([{"type": "for_you", d} for d in related[:3]]) Basé sur le contexte actuel (page, projet, etc.) if context: contextual = await self.search.search(context, user_id, top_k=3) suggestions.extend([{"type": "contextual", d} for d in contextual])
Dédupliquer seen = set() unique_suggestions = [] for s in suggestions: if s["id"] not in seen: seen.add(s["id"]) unique_suggestions.append(s)
return unique_suggestions[:10]
async def _get_popular_docs(self) -> list[dict]: """ Documents les plus consultés """ return await self.analytics.get_top_documents(period_days=30, limit=5)
async def _get_recently_updated(self) -> list[dict]: """ Documents récemment mis à jour """ return await self.search.vector_db.query( sort_by="metadata.last_updated", sort_order="desc", limit=5 ) `
Prompt système pour KB interne
`python KB_ASSISTANT_PROMPT = """Tu es l'assistant de la base de connaissances de {company_name}.
CONTEXTE UTILISATEUR : • Nom : {user_name} • Équipe : {user_team} • Rôle : {user_role}
RÈGLES : Réponds UNIQUEMENT à partir de la documentation fournie Si l'info n'est pas disponible, dis-le clairement et suggère qui contacter Cite toujours tes sources avec les liens vers les documents Adapte ton niveau technique au rôle de l'utilisateur
FORMAT DE RÉPONSE : • Commence par une réponse directe et concise • Détaille ensuite si nécessaire • Termine par les sources utilisées
DOCUMENTS DISPONIBLES : {context}
QUESTION : {question} """ `
Gestion des permissions
`python class KBPermissions: def __init__(self, user_service, document_service): self.user_service = user_service self.document_service = document_service
async def filter_accessible( self, documents: list[dict], user_id: str ) -> list[dict]: """ Filtre les documents selon les permissions de l'utilisateur """ user = await self.user_service.get_user(user_id) user_groups = set(user.get("groups", [])) user_team = user.get("team")
accessible = [] for doc in documents: if self._can_access(doc, user_groups, user_team): accessible.append(doc)
return accessible
def _can_access( self, document: dict, user_groups: set, user_team: str ) -> bool: """ Vérifie si un utilisateur peut accéder à un document """ doc_meta = document.get("metadata", {})
Documents publics if doc_meta.get("visibility") == "public": return True
Documents d'équipe if doc_meta.get("visibility") == "team": return doc_meta.get("team") == user_team
Documents restreints à certains groupes allowed_groups = set(doc_meta.get("allowed_groups", [])) if allowed_groups and not allowed_groups.intersection(user_groups): return False
return True `
Métriques et analytics
`python class KBAnalytics: def __init__(self, db): self.db = db
def track_search( self, user_id: str, query: str, results_count: int, clicked_doc_id: str = None ): """ Track une recherche """ self.db.insert("kb_searches", { "user_id": user_id, "query": query, "results_count": results_count, "clicked_doc_id": clicked_doc_id, "timestamp": datetime.now() })
def get_search_metrics(self, period_days: int = 30) -> dict: """ Métriques de recherche """ return { "total_searches": self._count_searches(period_days), "unique_users": self._count_unique_searchers(period_days), "avg_results_per_search": self._avg_results(period_days), "zero_result_rate": self._zero_result_rate(period_days), "click_through_rate": self._ctr(period_days), "top_queries": self._top_queries(period_days), "top_documents": self._top_documents(period_days) }
def identify_gaps(self) -> list[dict]: """ Identifie les lacunes de la KB (requêtes sans résultat) """ return self.db.query(""" SELECT query, COUNT() as count FROM kb_searches WHERE results_count = 0 AND timestamp > NOW() - INTERVAL '30 days' GROUP BY query ORDER BY count DESC LIMIT 20 """) ``
Bonnes pratiques Maintenir la fraîcheur • Synchronisation automatique quotidienne • Alertes sur documents périmés (> 6 mois sans mise à jour) • Review périodique des documents les moins consultés Encourager la contribution • Interface simple pour signaler les erreurs • Suggestion de mise à jour après consultation • Gamification : badges pour les contributeurs Mesurer l'adoption
| Métrique | Cible | Action si sous-cible | |----------|-------|---------------------| | DAU / MAU | > 60% | Campagne de communication | | Requêtes sans résultat | < 5% | Enrichir la KB | | Temps moyen de résolution | < 30s | Améliorer le ranking | | Satisfaction (CSAT) | > 4.2/5 | Analyser les feedbacks |
Pour aller plus loin • Fondamentaux du Retrieval - Optimiser la recherche • RAG Multimodal - Indexer images et PDFs • Introduction au RAG - Comprendre les bases
---
Déployez votre KB intelligente avec Ailog
Créer une base de connaissances RAG demande d'intégrer de multiples sources et de gérer des permissions complexes. Avec Ailog, simplifiez le déploiement : • Connecteurs natifs : Confluence, Notion, Google Drive, SharePoint • Synchronisation automatique en temps réel • Gestion des permissions intégrée (SSO, groupes, équipes) • Interface de recherche personnalisable • Analytics pour mesurer l'adoption et identifier les lacunes • Hébergement France conforme RGPD
Testez Ailog gratuitement et centralisez le savoir de votre entreprise en quelques jours.
---
FAQ
Combien de temps faut-il pour déployer une base de connaissances RAG ?
Avec une solution comme Ailog, le déploiement initial prend quelques jours : connexion des sources (Confluence, Drive, Notion), indexation automatique et configuration du chatbot. L'enrichissement et l'optimisation se font ensuite en continu selon les retours utilisateurs.
Une KB intelligente peut-elle remplacer les experts internes ?
Non, elle les complète. La KB gère les questions répétitives et la documentation standard, libérant les experts pour les cas complexes. Elle capitalise aussi leur savoir pour le rendre accessible à tous, réduisant la dépendance aux "sachants".
Comment gérer les documents confidentiels dans une KB RAG ?
Les systèmes RAG modernes intègrent une gestion fine des permissions : chaque document peut être restreint par équipe, groupe ou rôle. L'utilisateur ne voit que les réponses issues de documents auxquels il a accès. Le SSO permet de synchroniser les droits existants.
Quelle est la différence entre une KB RAG et un moteur de recherche classique ?
Un moteur classique retourne des documents correspondant aux mots-clés. Une KB RAG comprend la question, synthétise l'information de plusieurs sources et génère une réponse directe avec citations. Elle gère aussi les reformulations et questions en langage naturel.
Comment mesurer le ROI d'une base de connaissances intelligente ?
Les métriques clés sont : temps de résolution des questions (avant/après), taux d'adoption (DAU/MAU), réduction des tickets escaladés aux experts, et satisfaction utilisateur (CSAT). Une réduction de 50% du temps de recherche d'information est un objectif réaliste.