บทความนี้จะพาคุณเจาะลึกการใช้งาน ChatCompletion API อย่างเป็นระบบ ตั้งแต่โครงสร้างคำขอ การแยกวิเคราะห์การตอบกลับ ไปจนถึงการเขียนโค้ดระดับ Production พร้อม Benchmark จริง สำหรับวิศวกรที่ต้องการสร้างระบบที่เชื่อถือได้และประหยัดต้นทุน

ทำความเข้าใจ ChatCompletion API Endpoint

ChatCompletion API ทำงานผ่าน REST endpoint ด้วย HTTP POST method โดยมีโครงสร้างพื้นฐานดังนี้:

สำหรับการใช้งานจริงใน Production ฉันแนะนำให้ใช้ HolySheep AI ซึ่งให้อัตรา ¥1=$1 ประหยัดได้ถึง 85% พร้อม latency เฉลี่ยต่ำกว่า 50ms

โครงสร้าง Request Object เชิงลึก

2.1 Required Parameters

ส่วนที่จำเป็นต้องมีในทุกคำขอคือ model และ messages array:

import requests
import json

def create_chat_request(model: str, messages: list) -> dict:
    """
    สร้าง request payload สำหรับ ChatCompletion API
    model: ชื่อโมเดล เช่น gpt-4, gpt-4-turbo, gpt-3.5-turbo
    messages: list of message objects ที่มี role และ content
    """
    return {
        "model": model,
        "messages": messages,
        "stream": False  # ปิด streaming เพื่อรับ response ทั้งหมดในครั้งเดียว
    }

ตัวอย่างการใช้งาน

payload = create_chat_request( model="gpt-4", messages=[ {"role": "system", "content": "คุณเป็นผู้ช่วย AI ที่เชี่ยวชาญ"}, {"role": "user", "content": "อธิบายเรื่อง REST API โดยย่อ"} ] ) print(json.dumps(payload, indent=2, ensure_ascii=False))

2.2 Message Structure และ Roles

Message Object ประกอบด้วย 3 roles หลักที่คุณต้องเข้าใจ:

# Multi-turn conversation example
messages = [
    # System prompt กำหนด persona
    {"role": "system", "content": "คุณเป็น senior software architect"},
    
    # Turn 1: User asks
    {"role": "user", "content": "ออกแบบ microservices architecture สำหรับ e-commerce"},
    
    # Turn 1: Assistant responds
    {"role": "assistant", "content": "สำหรับ e-commerce ผมแนะนำให้แบ่งเป็น services หลักดังนี้..."},
    
    # Turn 2: User asks follow-up
    {"role": "user", "content": "แล้ว API Gateway ควรจัดการเรื่องอะไรบ้าง?"}
]

ส่ง conversation history ทั้งหมดเพื่อให้ AI เข้าใจ context

response = requests.post( f"{BASE_URL}/chat/completions", headers=HEADERS, json={"model": "gpt-4", "messages": messages} )

การแยกวิเคราะห์ Response Object

Response จาก ChatCompletion API มีโครงสร้างที่ค่อนข้างลึก ต้องแยกวิเคราะห์ให้ถูกต้อง:

import json
from dataclasses import dataclass
from typing import Optional

@dataclass
class ChatResponse:
    """Data class สำหรับเก็บข้อมูล response อย่างเป็นระบบ"""
    content: str
    model: str
    completion_tokens: int
    prompt_tokens: int
    total_tokens: int
    finish_reason: str
    response_id: str

def parse_chat_completion(response: requests.Response) -> ChatResponse:
    """
    แยกวิเคราะห์ response จาก ChatCompletion API
    """
    data = response.json()
    
    # Extract choices (มักมีแค่ 1 choice ในกรณีปกติ)
    choice = data["choices"][0]
    
    # Extract usage statistics สำหรับคำนวณ cost
    usage = data.get("usage", {})
    
    return ChatResponse(
        content=choice["message"]["content"],
        model=data["model"],
        completion_tokens=usage.get("completion_tokens", 0),
        prompt_tokens=usage.get("prompt_tokens", 0),
        total_tokens=usage.get("total_tokens", 0),
        finish_reason=choice["finish_reason"],
        response_id=data["id"]
    )

ตัวอย่างการใช้งาน

response = requests.post( f"{BASE_URL}/chat/completions", headers=HEADERS, json={"model": "gpt-4", "messages": messages} ) result = parse_chat_completion(response) print(f"Content: {result.content}") print(f"Tokens used: {result.total_tokens}") print(f"Cost: ${calculate_cost(result.total_tokens, result.model)}")

โค้ด Production พร้อม Error Handling และ Retry Logic

ในระบบ Production คุณต้องจัดการกับหลาย edge cases เช่น rate limiting, timeout, และ server errors:

import time
import logging
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logger = logging.getLogger(__name__)

class ChatCompletionClient:
    """Production-ready client สำหรับ ChatCompletion API"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.holysheep.ai/v1"):
        self.base_url = base_url
        self.session = self._create_session_with_retries()
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def _create_session_with_retries(self) -> requests.Session:
        """Setup session พร้อม automatic retry สำหรับ transient errors"""
        session = requests.Session()
        
        # Retry strategy: 3 attempts, exponential backoff
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["POST", "GET"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        return session
    
    def chat(
        self,
        model: str,
        messages: list,
        temperature: float = 0.7,
        max_tokens: int = 2048,
        timeout: int = 60
    ) -> ChatResponse:
        """
        Send chat completion request พร้อม full error handling
        """
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }
        
        try:
            response = self.session.post(
                f"{self.base_url}/chat/completions",
                headers=self.headers,
                json=payload,
                timeout=timeout
            )
            
            # Handle specific HTTP errors
            if response.status_code == 401:
                raise AuthenticationError("Invalid API key")
            elif response.status_code == 429:
                raise RateLimitError("Rate limit exceeded")
            elif response.status_code >= 500:
                raise ServerError(f"Server error: {response.status_code}")
            
            response.raise_for_status()
            return parse_chat_completion(response)
            
        except requests.exceptions.Timeout:
            logger.error("Request timeout after %ds", timeout)
            raise TimeoutError(f"Request timed out after {timeout} seconds")
        except requests.exceptions.ConnectionError as e:
            logger.error("Connection error: %s", str(e))
            raise ConnectionError(f"Failed to connect: {str(e)}")

Initialize client

client = ChatCompletionClient( api_key="YOUR_HOLYSHEEP_API_KEY", base_url="https://api.holysheep.ai/v1" )

การควบคุม Concurrent Requests และ Rate Limiting

สำหรับระบบที่ต้องรับ request จำนวนมาก คุณต้องจัดการ concurrency อย่างเหมาะสม:

from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio

class ConcurrentChatClient:
    """Client ที่รองรับ concurrent requests"""
    
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.client = ChatCompletionClient(api_key)
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def chat_async(self, model: str, messages: list, request_id: str) -> dict:
        """Async version พร้อม semaphore ควบคุม concurrency"""
        async with self.semaphore:
            try:
                # Run sync code in thread pool
                result = await asyncio.to_thread(
                    self.client.chat, model, messages
                )
                return {
                    "request_id": request_id,
                    "status": "success",
                    "content": result.content,
                    "tokens": result.total_tokens
                }
            except Exception as e:
                return {
                    "request_id": request_id,
                    "status": "error",
                    "error": str(e)
                }
    
    async def batch_chat(self, requests: list[dict]) -> list[dict]:
        """ประมวลผล batch ของ requests พร้อมกัน"""
        tasks = [
            self.chat_async(
                model=req["model"],
                messages=req["messages"],
                request_id=req.get("id", f"req_{i}")
            )
            for i, req in enumerate(requests)
        ]
        
        results = await asyncio.gather(*tasks)
        return results

ตัวอย่างการใช้งาน batch

async def main(): client = ConcurrentChatClient( api_key="YOUR_HOLYSHEEP_API_KEY", max_concurrent=10 ) requests