ในยุคที่ AI API กลายเป็นหัวใจสำคัญของ application สมัยใหม่ การรักษาความปลอดภัยข้อมูลและการกรองเนื้อหาที่ไม่เหมาะสมก่อนส่งไปยัง LLM provider เป็นสิ่งจำเป็นอย่างยิ่ง บทความนี้จะพาคุณไปทำความเข้าใจสถาปัตยกรรมการ filtering ระดับ production พร้อมโค้ดที่พร้อมใช้งานจริง

ทำไมต้องมี Content Filtering Layer

เมื่อใช้บริการ สมัครที่นี่ ซึ่งเป็น API gateway ที่รวม model หลากหลายเช่น GPT-4.1, Claude Sonnet 4.5, Gemini 2.5 Flash และ DeepSeek V3.2 ในราคาที่ประหยัดถึง 85%+ (อัตรา ¥1=$1) คุณต้องมั่นใจว่าข้อมูลที่ส่งไปนั้นปลอดภัยและเหมาะสม

ประโยชน์หลักของการกรองเนื้อหา

สถาปัตยกรรม Content Filtering Pipeline

สถาปัตยกรรมที่แนะนำใช้โมเดล Pipeline แบบ Chain of Responsibility เพื่อให้สามารถเพิ่มหรือลด filters ได้ง่ายโดยไม่กระทบ code เดิม

Core Architecture

"""
Content Filtering Pipeline Architecture
ใช้ Chain of Responsibility pattern สำหรับ production-grade filtering
"""

import re
import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import Enum
import json

class FilterSeverity(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    BLOCK = 4

@dataclass
class FilterResult:
    passed: bool
    severity: FilterSeverity
    reason: str
    original_text: str
    sanitized_text: str
    detected_patterns: list = field(default_factory=list)
    processing_time_ms: float = 0.0

class ContentFilter(ABC):
    """Abstract base class สำหรับทุก filter"""
    
    def __init__(self, name: str):
        self.name = name
        self._next_filter: Optional[ContentFilter] = None
    
    def set_next(self, filter_chain: 'ContentFilter') -> 'ContentFilter':
        self._next_filter = filter_chain
        return filter_chain
    
    def process(self, text: str, context: dict = None) -> FilterResult:
        start_time = time.perf_counter()
        result = self._filter(text, context or {})
        result.processing_time_ms = (time.perf_counter() - start_time) * 1000
        
        if not result.passed and self._next_filter:
            return self._next_filter.process(text, context)
        
        return result
    
    @abstractmethod
    def _filter(self, text: str, context: dict) -> FilterResult:
        pass

class PIIFilter(ContentFilter):
    """Filter สำหรับลบข้อมูล PII เช่น เบอร์โทร, อีเมล, เลขบัตรประชาชน"""
    
    PATTERNS = {
        'thai_id': r'\b[0-9]{13}\b',
        'phone': r'\b0[0-9]{9}\b',
        'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
        'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
    }
    
    REPLACEMENTS = {
        'thai_id': '[THAI_ID_REDACTED]',
        'phone': '[PHONE_REDACTED]',
        'email': '[EMAIL_REDACTED]',
        'credit_card': '[CC_REDACTED]',
        'ip_address': '[IP_REDACTED]',
    }
    
    def __init__(self):
        super().__init__("PIIFilter")
        self._compiled_patterns = {
            k: re.compile(v) for k, v in self.PATTERNS.items()
        }
    
    def _filter(self, text: str, context: dict) -> FilterResult:
        detected = []
        sanitized = text
        
        for pattern_name, regex in self._compiled_patterns.items():
            matches = regex.findall(sanitized)
            if matches:
                detected.append({
                    'type': pattern_name,
                    'count': len(matches),
                    'positions': [m.span() for m in regex.finditer(sanitized)]
                })
                sanitized = regex.sub(self.REPLACEMENTS[pattern_name], sanitized)
        
        return FilterResult(
            passed=True,
            severity=FilterSeverity.LOW,
            reason="PII detected and redacted" if detected else "No PII found",
            original_text=text,
            sanitized_text=sanitized,
            detected_patterns=detected
        )

ตัวอย่างการใช้งาน

pii_filter = PIIFilter() result = pii_filter.process("กรุณาติดต่อ 0812345678 หรือ [email protected]") print(f"Sanitized: {result.sanitized_text}")

Output: กรุณาติดต่อ [PHONE_REDACTED] หรือ [EMAIL_REDACTED]

การ Implement profanity และ Sensitive Content Filter

นอกจาก PII แล้ว คุณต้องกรอง content ที่ไม่เหมาะสมออกด้วย โค้ดต่อไปนี้แสดงการ implement profanity filter ที่รองรับภาษาไทยและหลายภาษา

"""
Advanced Content Filter สำหรับ Profanity และ Sensitive Content
รองรับ Thai, English และ multi-language patterns
"""

import re
from typing import Set, Dict, List
import ahocorasick
import json

class ProfanityFilter:
    """Aho-Corasick based profanity filter สำหรับ performance สูงสุด"""
    
    def __init__(self):
        self.automaton = ahocorasick.Automaton()
        self.word_list: List[str] = []
        self._build_default_wordlist()
    
    def _build_default_wordlist(self):
        # คำหยาบภาษาไทยที่พบบ่อย
        thai_profanity = [
            'คำหยาบ1', 'คำหยาบ2',  # แทนที่ด้วยคำจริงใน production
        ]
        
        # English profanity
        english_profanity = [
            'badword1', 'badword2',  # แทนที่ด้วยคำจริงใน production
        ]
        
        self.word_list = thai_profanity + english_profanity
        
        for word in self.word_list:
            self.automaton.add_word(word, word)
        
        self.automaton.make_automaton()
    
    def add_word(self, word: str):
        """เพิ่มคำใหม่เข้า automaton แบบ dynamic"""
        self.word_list.append(word)
        self.automaton = ahocorasick.Automaton()
        for w in self.word_list:
            self.automaton.add_word(w, w)
        self.automaton.make_automaton()
    
    def filter(self, text: str, replacement: str = '[CONTENT_FILTERED]') -> Dict:
        """กรองคำหยาบและ return result"""
        found_words = []
        
        for end_index, word in self.automaton.iter(text):
            start_index = end_index - len(word) + 1
            found_words.append({
                'word': word,
                'start': start_index,
                'end': end_index
            })
        
        # Mask คำที่พบ
        sanitized = text
        for item in reversed(found_words):
            sanitized = (
                sanitized[:item['start']] + 
                replacement + 
                sanitized[item['end']+1:]
            )
        
        return {
            'sanitized': sanitized,
            'found_count': len(found_words),
            'found_words': list(set([w['word'] for w in found_words]))
        }

class ContentModerationFilter(ContentFilter):
    """Filter สำหรับ sensitive topics เช่น ความรุนแรง, สุขภาพจิต"""
    
    SENSITIVE_PATTERNS = {
        'violence': {
            'patterns': [
                r'(?: убить|ударить|нож|пистолет)',  # Violence patterns
                r'วิธีทำร้าย|สร้างอาวุธ',
            ],
            'severity': FilterSeverity.HIGH
        },
        'self_harm': {
            'patterns': [
                r'วิธีทำร้ายตัวเอง|ฆ่าตัวตาย',
                r'how to suicide|end my life',
            ],
            'severity': FilterSeverity.BLOCK
        }
    }