Introduction et enjeux architecturaux

Dans le paysage médiatique actuel, la vitesse de production de contenu doit coexister avec l'exigence de fiabilité factuelle. Cet article présente une architecture complète de pipeline de production de contenu IA pour les médias d'information, articulée autour de deux modules critiques : l'expansion de brouillons et la vérification des faits par agents.

Nous déploierons une solution production-ready en Python, exploitant les capacités des modèles de génération avec le provider HolySheep AI qui offre un taux de change ¥1=$1 avec une économie de 85% par rapport aux solutions occidentales traditionnelles.

Architecture globale du pipeline

Schéma de flux de données

Le pipeline se décompose en quatre étapes principales : ingestion du brouillon, expansion contextuelle via LLM, extraction d'entités pour vérification, et validation factuelle multi-sources.

┌─────────────────────────────────────────────────────────────────────┐
│                    PIPELINE DE PRODUCTION MÉDIAS                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌──────────┐    ┌──────────────┐    ┌────────────────┐             │
│  │ INGESTION│───▶│  EXPANSION   │───▶│ EXTRACTION NER │             │
│  │ Draft    │    │  LLM (Draft  │    │ Named Entities │             │
│  │ Article  │    │  Expander)   │    │                │             │
│  └──────────┘    └──────────────┘    └───────┬────────┘             │
│                                              │                       │
│                                              ▼                       │
│                                    ┌────────────────┐                │
│                                    │ FACT-CHECKER   │                │
│                                    │ Multi-agents   │                │
│                                    │ + Recherche    │                │
│                                    └───────┬────────┘                │
│                                            │                          │
│                                            ▼                          │
│                                    ┌────────────────┐                │
│                                    │   VALIDATION   │                │
│                                    │   & PUBLISH    │                │
│                                    └────────────────┘                │
└─────────────────────────────────────────────────────────────────────┘

Implémentation du client HolySheep

Configuration et gestion des connexions

import asyncio
import aiohttp
import json
from typing import Optional, Dict, List, AsyncIterator
from dataclasses import dataclass
from datetime import datetime
import hashlib

@dataclass
class HolySheepConfig:
    """Configuration optimisée pour la production médiatique"""
    base_url: str = "https://api.holysheep.ai/v1"
    api_key: str = "YOUR_HOLYSHEEP_API_KEY"
    timeout: int = 120  # Timeout étendu pour articles longs
    max_retries: int = 3
    retry_delay: float = 1.5
    model: str = "deepseek-v3.2"  # Modèle économique: $0.42/MTok

class HolySheepMediaClient:
    """
    Client haute-performance pour la production de contenu médiatique.
    Supporte la génération streaming et les appels batch.
    """
    
    def __init__(self, config: HolySheepConfig):
        self.config = config
        self._session: Optional[aiohttp.ClientSession] = None
        self._semaphore = asyncio.Semaphore(10)  # Limite de concurrence
        
    async def __aenter__(self):
        connector = aiohttp.TCPConnector(
            limit=100,
            limit_per_host=20,
            keepalive_timeout=30
        )
        timeout = aiohttp.ClientTimeout(total=self.config.timeout)
        self._session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        return self
        
    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()
            
    async def generate_streaming(
        self,
        prompt: str,
        system_prompt: str = "",
        temperature: float = 0.7
    ) -> AsyncIterator[str]:
        """Génération avec streaming pour expérience utilisateur fluide."""
        
        url = f"{self.config.base_url}/chat/completions"
        headers = {
            "Authorization": f"Bearer {self.config.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": self.config.model,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": prompt}
            ],
            "temperature": temperature,
            "stream": True,
            "max_tokens": 4096
        }
        
        async with self._semaphore:
            async with self._session.post(url, json=payload, headers=headers) as resp:
                resp.raise_for_status()
                async for line in resp.content:
                    line = line.decode('utf-8').strip()
                    if line.startswith('data: '):
                        if line == 'data: [DONE]':
                            break
                        data = json.loads(line[6:])
                        if content := data.get('choices', [{}])[0].get('delta', {}).get('content'):
                            yield content
                            
    async def generate_batch(
        self,
        prompts: List[Dict[str, str]],
        temperature: float = 0.7
    ) -> List[Dict]:
        """Traitement batch pour optimisation des coûts."""
        
        tasks = [
            self._single_completion(prompt, temperature)
            for prompt in prompts
        ]
        return await asyncio.gather(*tasks)
        
    async def _single_completion(
        self,
        message: Dict[str, str],
        temperature: float
    ) -> Dict:
        """Appel unitaire avec retry automatique."""
        
        url = f"{self.config.base_url}/chat/completions"
        headers = {
            "Authorization": f"Bearer {self.config.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": self.config.model,
            "messages": [message],
            "temperature": temperature,
            "stream": False,
            "max_tokens": 2048
        }
        
        for attempt in range(self.config.max_retries):
            try:
                async with self._session.post(url, json=payload, headers=headers) as resp:
                    if resp.status == 429:
                        await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
                        continue
                    resp.raise_for_status()
                    data = await resp.json()
                    return {
                        "content": data['choices'][0]['message']['content'],
                        "usage": data.get('usage', {}),
                        "model": data.get('model')
                    }
            except Exception as e:
                if attempt == self.config.max_retries - 1:
                    raise
                await asyncio.sleep(self.config.retry_delay)
                
        raise RuntimeError("Échec après toutes les tentatives")

Module d'expansion de brouillons

Stratégie de chunking et génération contextuelle

Pour les articles longs, nous implémentons une stratégie de chunking intelligent qui préserve la cohérence sémantique tout en permettant une génération parallèle.

import re
from typing import List, Tuple
import tiktoken  # Tokenisation pour estimation précise

class DraftExpander:
    """
    Module d'expansion de brouillons pour articles médiatiques.
    Sépare le contenu en chunks sémantiques et génère en parallèle.
    """
    
    def __init__(self, client: HolySheepMediaClient, chunk_size: int = 1500):
        self.client = client
        self.chunk_size = chunk_size
        self.encoder = tiktoken.get_encoding("cl100k_base")
        
        self.system_prompt = """Vous êtes un rédacteur en chef adjoint spécialisé 
        dans l'expansion d'articles d'actualités. Votre rôle :
        1. Enrichir le contenu avec des détails contextuels pertinents
        2. Maintenir le style journalistique objectif et factuel
        3. Ajouter des citations et références si approprié
        4. Respecter la structure narrative originale
        5. N'inventez jamais de faits — uniquement développer le existantes"""
    
    def _chunk_text(self, text: str) -> List[Tuple[str, int, int]]:
        """Découpage intelligent préservant les limites de phrases."""
        
        sentences = re.split(r'(?<=[.!?])\s+', text)
        chunks = []
        current_chunk = ""
        current_start = 0
        
        for i, sentence in enumerate(sentences):
            # Estimation tokens (1 token ≈ 4 caractères)
            estimated_tokens = len(self.encoder.encode(current_chunk + sentence))
            
            if estimated_tokens > self.chunk_size and current_chunk:
                chunks.append((current_chunk.strip(), current_start, i))
                current_chunk = sentence
                current_start = i
            else:
                current_chunk += " " + sentence if current_chunk else sentence
                
        if current_chunk:
            chunks.append((current_chunk.strip(), current_start, len(sentences)))
            
        return chunks
    
    async def expand_draft(
        self,
        draft: str,
        metadata: Dict = None,
        enable_parallel: bool = True
    ) -> str:
        """
        Expansion d'un brouillon avec gestion de la concurrence.
        
        Args:
            draft: Texte du brouillon original
            metadata: Métadonnées (thème, public cible, ton)
            enable_parallel: Utiliser le traitement parallèle si True
            
        Returns:
            Texte expansé complet
        """
        
        chunks = self._chunk_text(draft)
        metadata_context = self._build_metadata_context(metadata)
        
        if enable_parallel and len(chunks) > 1:
            # Traitement parallèle pour les gros articles
            tasks = [
                self._expand_chunk(chunk_text, metadata_context, chunk_idx)
                for chunk_idx, (chunk_text, _, _) in enumerate(chunks)
            ]
            expanded_chunks = await asyncio.gather(*tasks)
            
            # Réassemblage dans l'ordre original
            result_parts = [None] * len(expanded_chunks)
            for idx, (_, start, end) in enumerate(chunks):
                result_parts[idx] = expanded_chunks[idx]
                
            return "\n\n".join(filter(None, result_parts))
        else:
            # Traitement séquentiel pour petits articles
            return await self._sequential_expansion(chunks, metadata_context)
    
    async def _expand_chunk(
        self,
        chunk: str,
        metadata_context: str,
        chunk_index: int
    ) -> str:
        """Expansion d'un chunk individuel."""
        
        prompt = f"""{metadata_context}

Extrait à développer (partie {chunk_index + 1})

{chunk} Développez cet extrait en enrichissant le contenu tout en préservant le style et la structure. Répondez uniquement avec le texte développé.""" full_response = "" async for token in self.client.generate_streaming( prompt=prompt, system_prompt=self.system_prompt, temperature=0.6 ): full_response += token return full_response.strip() def _build_metadata_context(self, metadata: Dict = None) -> str: """Construction du contexte métadonnées.""" if not metadata: return "" context_parts = ["## Contexte éditorial"] if theme := metadata.get('theme'): context_parts.append(f"- Thème principal: {theme}") if audience := metadata.get('target_audience'): context_parts.append(f"- Audience: {audience}") if tone := metadata.get('tone'): context_parts.append(f"- Ton: {tone}") return "\n".join(context_parts) async def _sequential_expansion( self, chunks: List[Tuple[str, int, int]], metadata_context: str ) -> str: """Expansion séquentielle préservant la cohérence.""" full_expansion = "" accumulated_context = "" for idx, (chunk_text, _, _) in enumerate(chunks): # Ajouter le contexte des chunks précédents prompt = f"""{metadata_context}

Contexte des sections précédentes

{accumulated_context}

Section actuelle à développer (partie {idx + 1})

{chunk_text} Développez cette section en maintenant la cohérence avec le contexte précédent.""" expanded = "" async for token in self.client.generate_streaming( prompt=prompt, system_prompt=self.system_prompt, temperature=0.5 # Température plus basse pour cohérence ): expanded += token full_expansion += "\n\n" + expanded accumulated_context += f"\n{expanded}" # Préparer pour le chunk suivant return full_expansion.strip()

Système de vérification factuelle multi-agents

Architecture des agents de validation

La vérification factuelle repose sur un système multi-agents asynchrones : extraction d'entités, recherche de sources, comparaison et scoring de confiance.

from typing import Set, Tuple, Dict
from dataclasses import dataclass
from enum import Enum
import concurrent.futures

class ClaimType(Enum):
    """Classification des types d'affirmations."""
    FACTUAL_NUMERIC = "numeric"
    TEMPORAL = "temporal"
    GEOGRAPHIC = "geographic"
    ENTITY_RELATION = "entity_relation"
    QUOTE_ATTRIBUTION = "quote"

@dataclass
class Claim:
    """Représentation d'une affirmation à vérifier."""
    text: str
    claim_type: ClaimType
    entities: List[str]
    confidence_score: float = 1.0
    verified: bool = False
    source: str = ""
    correction: str = ""

class FactChecker:
    """
    Système de vérification factuelle multi-agents.
    Utilise des requêtes parallèle pour optimiser le temps de traitement.
    """
    
    def __init__(
        self,
        client: HolySheepMediaClient,
        max_concurrent_checks: int = 5
    ):
        self.client = client
        self.semaphore = asyncio.Semaphore(max_concurrent_checks)
        
        self.extraction_prompt = """Extrait toutes les affirmations factuelles 
        du texte ci-dessous. Pour chaque affirmation, identifie :
        1. Le type (numérique, temporel, géographique, relation, citation)
        2. Les entités mentionnées (noms, lieux, dates, chiffres)
        3. Le texte exact de l'affirmation
        
        Réponds au format JSON avec un tableau d'objets.
        Si aucune affirmation vérifiable n'existe, retourne un tableau vide."""
        
        self.verification_prompt = """Vérifie l'affirmation suivante :

        "{claim_text}"
        
        Type d'affirmation: {claim_type}
        Entités mentionnées: {entities}
        
        Réponds au format JSON avec :
        - verified: true/false
        - confidence: score de 0 à 1
        - source: source de vérification ou "non vérifiable"
        - correction: texte corrigé si nécessaire"""
    
    async def extract_claims(self, text: str) -> List[Claim]:
        """Extraction des affirmations via LLM."""
        
        response = ""
        async for token in self.client.generate_streaming(
            prompt=f"{self.extraction_prompt}\n\n## Texte à analyser\n\n{text}",
            system_prompt="Tu es un expert en extraction d'informations factuelles.",
            temperature=0.3
        ):
            response += token
            
        # Parsing JSON de la réponse
        try:
            claims_data = json.loads(response)
        except json.JSONDecodeError:
            # Fallback: extraction regex
            claims_data = self._fallback_extraction(text)
            
        return [
            Claim(
                text=c.get("text", ""),
                claim_type=ClaimType(c.get("type", "factual")),
                entities=c.get("entities", [])
            )
            for c in claims_data
        ]
    
    async def verify_claims_parallel(
        self,
        claims: List[Claim]
    ) -> List[Claim]:
        """Vérification parallèle de toutes les affirmations."""
        
        tasks = [self._verify_single(claim) for claim in claims]
        return await asyncio.gather(*tasks)
    
    async def _verify_single(self, claim: Claim) -> Claim:
        """Vérification d'une affirmation unique avec semaphore."""
        
        async with self.semaphore:
            response = ""
            async for token in self.client.generate_streaming(
                prompt=self.verification_prompt.format(
                    claim_text=claim.text,
                    claim_type=claim.claim_type.value,
                    entities=", ".join(claim.entities)
                ),
                system_prompt="Tu es un vérificateur de faits impartial.",
                temperature=0.2
            ):
                response += token
                
            try:
                result = json.loads(response)
                claim.verified = result.get("verified", False)
                claim.confidence_score = result.get("confidence", 1.0)
                claim.source = result.get("source", "")
                claim.correction = result.get("correction", "")
            except json.JSONDecodeError:
                claim.source = "Vérification impossible"
                
            return claim
    
    def _fallback_extraction(self, text: str) -> List[Dict]:
        """Extraction regex de secours pour affirmations