บทนำ
หลายคนอาจเคยได้ยินคำว่า Event-Driven Architecture หรือการประมวลผลแบบอิงเหตุการณ์ แต่ไม่รู้ว่าจะเริ่มต้นอย่างไร บทความนี้จะพาคุณทำความเข้าใจแนวคิดนี้และนำไปใช้งานจริงกับ AI API ของ
HolySheep AI ผู้ให้บริการ AI API ราคาประหยัด รองรับ GPT-4.1 และ Claude Sonnet 4.5 ในราคาเริ่มต้นเพียง $8/ล้านโทเค็น และ $15/ล้านโทเค็นตามลำดับ
Event-Driven Architecture คืออะไร
ลองนึกภาพร้านกาแฟที่คุณสั่งกาแฟแล้วได้รับหมายเลขคิว คุณไม่ต้องยืนรอที่เคาน์เตอร์ เพียงนั่งรอจนเรียกหมายเลข ระบบแบบนี้เรียกว่า Asynchronous หรือไม่ต้องรอตอบกลับทันที
ในโลกคอมพิวเตอร์ Event-Driven ก็คล้ายกัน คือแทนที่จะเรียก AI API แล้วรอจนได้คำตอบ คุณส่งข้อความไปยัง Message Queue แล้วทำงานอื่นต่อ เมื่อ AI ประมวลผลเสร็จจะส่งผลลัพธ์กลับมา
ทำไมต้องใช้ Kafka
Kafka เป็นระบบ Message Queue ที่นิยมใช้ในการส่งข้อความระหว่างบริการต่างๆ มีข้อดีดังนี้
- รองรับข้อความจำนวนมากโดยไม่สูญเสียข้อมูล
- สามารถอ่านข้อความซ้ำได้หลายครั้ง (Consumer Group)
- ทำงานแบบ Real-time ให้ความเร็วตอบสนองต่ำกว่า 50 มิลลิวินาที
- เป็นมาตรฐานอุตสาหกรรมที่ใช้งานโดยบริษัทใหญ่ทั่วโลก
การติดตั้งโปรแกรมที่จำเป็น
ติดตั้ง Python
ดาวน์โหลด Python จากเว็บ python.org แล้วติดตั้งให้เรียบร้อย ระหว่างติดตั้งอย่าลืมเลือก Add Python to PATH
ติดตั้ง Kafka
สำหรับผู้เริ่มต้น แนะนำใช้ Docker ซึ่งทำให้ติดตั้ง Kafka ได้ง่ายมาก
# สร้างไฟล์ docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
# รัน Kafka
docker-compose up -d
ตรวจสอบว่าทำงานหรือไม่
docker-compose ps
สร้าง Python API Wrapper สำหรับ HolySheep
ติดตั้งไลบรารี
pip install kafka-python openai requests
สร้างไฟล์ ai_client.py
import json
import requests
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import threading
import time
class HolySheepAIClient:
"""คลาสสำหรับเชื่อมต่อกับ HolySheep AI API แบบ Event-Driven"""
def __init__(self, api_key, base_url="https://api.holysheep.ai/v1"):
self.api_key = api_key
self.base_url = base_url
self.producer = None
self.consumer = None
def _send_request(self, messages, model="gpt-4.1"):
"""ส่งคำขอไปยัง HolySheep API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages
}
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
return response.json()
def setup_producer(self, bootstrap_servers=['localhost:9092']):
"""ตั้งค่าผู้ส่งข้อความ (Producer)"""
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
print("ตั้งค่า Kafka Producer สำเร็จ")
def setup_consumer(self, topic, group_id='ai-consumer-group', bootstrap_servers=['localhost:9092']):
"""ตั้งค่าผู้รับข้อความ (Consumer)"""
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print(f"ตั้งค่า Kafka Consumer สำหรับ Topic: {topic} สำเร็จ")
def send_message(self, topic, message_data, key=None):
"""ส่งข้อความไปยัง Topic"""
if not self.producer:
raise Exception("กรุณาเรียก setup_producer() ก่อน")
try:
future = self.producer.send(topic, value=message_data, key=key)
result = future.get(timeout=10)
print(f"ส่งข้อความสำเร็จไปยัง {topic}, Partition: {result.partition}")
return result
except KafkaError as e:
print(f"เกิดข้อผิดพลาดในการส่งข้อความ: {e}")
return None
def process_ai_request(self, request_data):
"""ประมวลผลคำขอ AI"""
messages = request_data.get('messages', [])
model = request_data.get('model', 'gpt-4.1')
print(f"กำลังประมวลผลด้วย Model: {model}")
result = self._send_request(messages, model)
return result
def start_processing(self, output_topic):
"""เริ่มประมวลผลข้อความจาก Kafka"""
if not self.consumer:
raise Exception("กรุณาเรียก setup_consumer() ก่อน")
print("เริ่มรอรับข้อความ...")
for message in self.consumer:
print(f"\nได้รับข้อความ: {message.value}")
# ประมวลผลด้วย AI
ai_result = self.process_ai_request(message.value)
# ส่งผลลัพธ์ไปยัง Topic ปลายทาง
if ai_result:
self.send_message(
output_topic,
{
'original_request': message.value,
'ai_response': ai_result,
'processed_at': time.time()
},
key=str(message.offset())
)
def close(self):
"""ปิดการเชื่อมต่อ"""
if self.producer:
self.producer.close()
if self.consumer:
self.consumer.close()
print("ปิดการเชื่อมต่อเรียบร้อย")
สร้างไฟล์ main.py เพื่อรันระบบ
from ai_client import HolySheepAIClient
import json
สร้าง Client ด้วย API Key ของคุณ
สมัครได้ที่ https://holysheep.ai/register
client = HolySheepAIClient(api_key="YOUR_HOLYSHEEP_API_KEY")
ตั้งค่า Producer และ Consumer
client.setup_producer()
client.setup_consumer(topic='ai-requests')
ส่งข้อความทดสอบ
test_message = {
'messages': [
{'role': 'user', 'content': 'ทักทายฉันเป็นภาษาไทย'}
],
'model': 'gpt-4.1'
}
print("กำลังส่งข้อความทดสอบ...")
client.send_message('ai-requests', test_message, key='test-001')
รัน Consumer เพื่อรอรับและประมวลผล
ปิดบรรทัดด้านล่างหากต้องการทดสอบแค่ส่งข้อความ
client.start_processing(output_topic='ai-responses')
client.close()
การแบ่งงานเป็น Worker แบบ Parallel
หากต้องการประมวลผลข้อความจำนวนมากพร้อมกัน สามารถใช้หลาย Consumer ใน Consumer Group เดียวกันได้
import multiprocessing
def worker_process(worker_id, api_key):
"""ฟังก์ชันสำหรับ Worker แต่ละตัว"""
client = HolySheepAIClient(api_key=api_key)
client
แหล่งข้อมูลที่เกี่ยวข้อง
บทความที่เกี่ยวข้อง