ในยุคที่ AI API กลายเป็นหัวใจสำคัญของ application สมัยใหม่ การรักษาความปลอดภัยข้อมูลและการกรองเนื้อหาที่ไม่เหมาะสมก่อนส่งไปยัง LLM provider เป็นสิ่งจำเป็นอย่างยิ่ง บทความนี้จะพาคุณไปทำความเข้าใจสถาปัตยกรรมการ filtering ระดับ production พร้อมโค้ดที่พร้อมใช้งานจริง
ทำไมต้องมี Content Filtering Layer
เมื่อใช้บริการ สมัครที่นี่ ซึ่งเป็น API gateway ที่รวม model หลากหลายเช่น GPT-4.1, Claude Sonnet 4.5, Gemini 2.5 Flash และ DeepSeek V3.2 ในราคาที่ประหยัดถึง 85%+ (อัตรา ¥1=$1) คุณต้องมั่นใจว่าข้อมูลที่ส่งไปนั้นปลอดภัยและเหมาะสม
ประโยชน์หลักของการกรองเนื้อหา
- ป้องกันข้อมูลรั่วไหล — ลบ PII (Personal Identifiable Information) ก่อนส่งไปยัง external API
- ลดต้นทุน — ลด token usage โดยการ sanitize input
- ปฏิบัติตามกฎหมาย — PDPA, GDPR compliance
- ป้องกัน abuse — หยุดการใช้งานที่ไม่เหมาะสมตั้งแต่ต้นทาง
สถาปัตยกรรม Content Filtering Pipeline
สถาปัตยกรรมที่แนะนำใช้โมเดล Pipeline แบบ Chain of Responsibility เพื่อให้สามารถเพิ่มหรือลด filters ได้ง่ายโดยไม่กระทบ code เดิม
Core Architecture
"""
Content Filtering Pipeline Architecture
ใช้ Chain of Responsibility pattern สำหรับ production-grade filtering
"""
import re
import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import Enum
import json
class FilterSeverity(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
BLOCK = 4
@dataclass
class FilterResult:
passed: bool
severity: FilterSeverity
reason: str
original_text: str
sanitized_text: str
detected_patterns: list = field(default_factory=list)
processing_time_ms: float = 0.0
class ContentFilter(ABC):
"""Abstract base class สำหรับทุก filter"""
def __init__(self, name: str):
self.name = name
self._next_filter: Optional[ContentFilter] = None
def set_next(self, filter_chain: 'ContentFilter') -> 'ContentFilter':
self._next_filter = filter_chain
return filter_chain
def process(self, text: str, context: dict = None) -> FilterResult:
start_time = time.perf_counter()
result = self._filter(text, context or {})
result.processing_time_ms = (time.perf_counter() - start_time) * 1000
if not result.passed and self._next_filter:
return self._next_filter.process(text, context)
return result
@abstractmethod
def _filter(self, text: str, context: dict) -> FilterResult:
pass
class PIIFilter(ContentFilter):
"""Filter สำหรับลบข้อมูล PII เช่น เบอร์โทร, อีเมล, เลขบัตรประชาชน"""
PATTERNS = {
'thai_id': r'\b[0-9]{13}\b',
'phone': r'\b0[0-9]{9}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
}
REPLACEMENTS = {
'thai_id': '[THAI_ID_REDACTED]',
'phone': '[PHONE_REDACTED]',
'email': '[EMAIL_REDACTED]',
'credit_card': '[CC_REDACTED]',
'ip_address': '[IP_REDACTED]',
}
def __init__(self):
super().__init__("PIIFilter")
self._compiled_patterns = {
k: re.compile(v) for k, v in self.PATTERNS.items()
}
def _filter(self, text: str, context: dict) -> FilterResult:
detected = []
sanitized = text
for pattern_name, regex in self._compiled_patterns.items():
matches = regex.findall(sanitized)
if matches:
detected.append({
'type': pattern_name,
'count': len(matches),
'positions': [m.span() for m in regex.finditer(sanitized)]
})
sanitized = regex.sub(self.REPLACEMENTS[pattern_name], sanitized)
return FilterResult(
passed=True,
severity=FilterSeverity.LOW,
reason="PII detected and redacted" if detected else "No PII found",
original_text=text,
sanitized_text=sanitized,
detected_patterns=detected
)
ตัวอย่างการใช้งาน
pii_filter = PIIFilter()
result = pii_filter.process("กรุณาติดต่อ 0812345678 หรือ [email protected]")
print(f"Sanitized: {result.sanitized_text}")
Output: กรุณาติดต่อ [PHONE_REDACTED] หรือ [EMAIL_REDACTED]
การ Implement profanity และ Sensitive Content Filter
นอกจาก PII แล้ว คุณต้องกรอง content ที่ไม่เหมาะสมออกด้วย โค้ดต่อไปนี้แสดงการ implement profanity filter ที่รองรับภาษาไทยและหลายภาษา
"""
Advanced Content Filter สำหรับ Profanity และ Sensitive Content
รองรับ Thai, English และ multi-language patterns
"""
import re
from typing import Set, Dict, List
import ahocorasick
import json
class ProfanityFilter:
"""Aho-Corasick based profanity filter สำหรับ performance สูงสุด"""
def __init__(self):
self.automaton = ahocorasick.Automaton()
self.word_list: List[str] = []
self._build_default_wordlist()
def _build_default_wordlist(self):
# คำหยาบภาษาไทยที่พบบ่อย
thai_profanity = [
'คำหยาบ1', 'คำหยาบ2', # แทนที่ด้วยคำจริงใน production
]
# English profanity
english_profanity = [
'badword1', 'badword2', # แทนที่ด้วยคำจริงใน production
]
self.word_list = thai_profanity + english_profanity
for word in self.word_list:
self.automaton.add_word(word, word)
self.automaton.make_automaton()
def add_word(self, word: str):
"""เพิ่มคำใหม่เข้า automaton แบบ dynamic"""
self.word_list.append(word)
self.automaton = ahocorasick.Automaton()
for w in self.word_list:
self.automaton.add_word(w, w)
self.automaton.make_automaton()
def filter(self, text: str, replacement: str = '[CONTENT_FILTERED]') -> Dict:
"""กรองคำหยาบและ return result"""
found_words = []
for end_index, word in self.automaton.iter(text):
start_index = end_index - len(word) + 1
found_words.append({
'word': word,
'start': start_index,
'end': end_index
})
# Mask คำที่พบ
sanitized = text
for item in reversed(found_words):
sanitized = (
sanitized[:item['start']] +
replacement +
sanitized[item['end']+1:]
)
return {
'sanitized': sanitized,
'found_count': len(found_words),
'found_words': list(set([w['word'] for w in found_words]))
}
class ContentModerationFilter(ContentFilter):
"""Filter สำหรับ sensitive topics เช่น ความรุนแรง, สุขภาพจิต"""
SENSITIVE_PATTERNS = {
'violence': {
'patterns': [
r'(?: убить|ударить|нож|пистолет)', # Violence patterns
r'วิธีทำร้าย|สร้างอาวุธ',
],
'severity': FilterSeverity.HIGH
},
'self_harm': {
'patterns': [
r'วิธีทำร้ายตัวเอง|ฆ่าตัวตาย',
r'how to suicide|end my life',
],
'severity': FilterSeverity.BLOCK
}
}
แหล่งข้อมูลที่เกี่ยวข้อง
บทความที่เกี่ยวข้อง