บทนำ

หลายคนอาจเคยได้ยินคำว่า Event-Driven Architecture หรือการประมวลผลแบบอิงเหตุการณ์ แต่ไม่รู้ว่าจะเริ่มต้นอย่างไร บทความนี้จะพาคุณทำความเข้าใจแนวคิดนี้และนำไปใช้งานจริงกับ AI API ของ HolySheep AI ผู้ให้บริการ AI API ราคาประหยัด รองรับ GPT-4.1 และ Claude Sonnet 4.5 ในราคาเริ่มต้นเพียง $8/ล้านโทเค็น และ $15/ล้านโทเค็นตามลำดับ

Event-Driven Architecture คืออะไร

ลองนึกภาพร้านกาแฟที่คุณสั่งกาแฟแล้วได้รับหมายเลขคิว คุณไม่ต้องยืนรอที่เคาน์เตอร์ เพียงนั่งรอจนเรียกหมายเลข ระบบแบบนี้เรียกว่า Asynchronous หรือไม่ต้องรอตอบกลับทันที ในโลกคอมพิวเตอร์ Event-Driven ก็คล้ายกัน คือแทนที่จะเรียก AI API แล้วรอจนได้คำตอบ คุณส่งข้อความไปยัง Message Queue แล้วทำงานอื่นต่อ เมื่อ AI ประมวลผลเสร็จจะส่งผลลัพธ์กลับมา

ทำไมต้องใช้ Kafka

Kafka เป็นระบบ Message Queue ที่นิยมใช้ในการส่งข้อความระหว่างบริการต่างๆ มีข้อดีดังนี้

การติดตั้งโปรแกรมที่จำเป็น

ติดตั้ง Python

ดาวน์โหลด Python จากเว็บ python.org แล้วติดตั้งให้เรียบร้อย ระหว่างติดตั้งอย่าลืมเลือก Add Python to PATH

ติดตั้ง Kafka

สำหรับผู้เริ่มต้น แนะนำใช้ Docker ซึ่งทำให้ติดตั้ง Kafka ได้ง่ายมาก
# สร้างไฟล์ docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"
  
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
# รัน Kafka
docker-compose up -d

ตรวจสอบว่าทำงานหรือไม่

docker-compose ps

สร้าง Python API Wrapper สำหรับ HolySheep

ติดตั้งไลบรารี

pip install kafka-python openai requests

สร้างไฟล์ ai_client.py

import json
import requests
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import threading
import time

class HolySheepAIClient:
    """คลาสสำหรับเชื่อมต่อกับ HolySheep AI API แบบ Event-Driven"""
    
    def __init__(self, api_key, base_url="https://api.holysheep.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.producer = None
        self.consumer = None
        
    def _send_request(self, messages, model="gpt-4.1"):
        """ส่งคำขอไปยัง HolySheep API"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "model": model,
            "messages": messages
        }
        
        response = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload
        )
        
        return response.json()
    
    def setup_producer(self, bootstrap_servers=['localhost:9092']):
        """ตั้งค่าผู้ส่งข้อความ (Producer)"""
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
        print("ตั้งค่า Kafka Producer สำเร็จ")
        
    def setup_consumer(self, topic, group_id='ai-consumer-group', bootstrap_servers=['localhost:9092']):
        """ตั้งค่าผู้รับข้อความ (Consumer)"""
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        print(f"ตั้งค่า Kafka Consumer สำหรับ Topic: {topic} สำเร็จ")
        
    def send_message(self, topic, message_data, key=None):
        """ส่งข้อความไปยัง Topic"""
        if not self.producer:
            raise Exception("กรุณาเรียก setup_producer() ก่อน")
            
        try:
            future = self.producer.send(topic, value=message_data, key=key)
            result = future.get(timeout=10)
            print(f"ส่งข้อความสำเร็จไปยัง {topic}, Partition: {result.partition}")
            return result
        except KafkaError as e:
            print(f"เกิดข้อผิดพลาดในการส่งข้อความ: {e}")
            return None
            
    def process_ai_request(self, request_data):
        """ประมวลผลคำขอ AI"""
        messages = request_data.get('messages', [])
        model = request_data.get('model', 'gpt-4.1')
        
        print(f"กำลังประมวลผลด้วย Model: {model}")
        result = self._send_request(messages, model)
        
        return result
        
    def start_processing(self, output_topic):
        """เริ่มประมวลผลข้อความจาก Kafka"""
        if not self.consumer:
            raise Exception("กรุณาเรียก setup_consumer() ก่อน")
            
        print("เริ่มรอรับข้อความ...")
        
        for message in self.consumer:
            print(f"\nได้รับข้อความ: {message.value}")
            
            # ประมวลผลด้วย AI
            ai_result = self.process_ai_request(message.value)
            
            # ส่งผลลัพธ์ไปยัง Topic ปลายทาง
            if ai_result:
                self.send_message(
                    output_topic,
                    {
                        'original_request': message.value,
                        'ai_response': ai_result,
                        'processed_at': time.time()
                    },
                    key=str(message.offset())
                )
                
    def close(self):
        """ปิดการเชื่อมต่อ"""
        if self.producer:
            self.producer.close()
        if self.consumer:
            self.consumer.close()
        print("ปิดการเชื่อมต่อเรียบร้อย")

สร้างไฟล์ main.py เพื่อรันระบบ

from ai_client import HolySheepAIClient
import json

สร้าง Client ด้วย API Key ของคุณ

สมัครได้ที่ https://holysheep.ai/register

client = HolySheepAIClient(api_key="YOUR_HOLYSHEEP_API_KEY")

ตั้งค่า Producer และ Consumer

client.setup_producer() client.setup_consumer(topic='ai-requests')

ส่งข้อความทดสอบ

test_message = { 'messages': [ {'role': 'user', 'content': 'ทักทายฉันเป็นภาษาไทย'} ], 'model': 'gpt-4.1' } print("กำลังส่งข้อความทดสอบ...") client.send_message('ai-requests', test_message, key='test-001')

รัน Consumer เพื่อรอรับและประมวลผล

ปิดบรรทัดด้านล่างหากต้องการทดสอบแค่ส่งข้อความ

client.start_processing(output_topic='ai-responses')

client.close()

การแบ่งงานเป็น Worker แบบ Parallel

หากต้องการประมวลผลข้อความจำนวนมากพร้อมกัน สามารถใช้หลาย Consumer ใน Consumer Group เดียวกันได้
import multiprocessing

def worker_process(worker_id, api_key):
    """ฟังก์ชันสำหรับ Worker แต่ละตัว"""
    client = HolySheepAIClient(api_key=api_key)
    client