단일 AI Agent의 한계를 넘어서, 여러 전문 Agent가 협업하는 시스템은 복잡한 워크플로우를 효과적으로 처리할 수 있게 합니다. 본 튜토리얼에서는 HolySheep AI를 활용하여 프로덕션 수준의 다중 Agent 협업 아키텍처를 설계하고 구현하는 방법을 심층적으로 다룹니다.
다중 Agent 협업 시스템 아키텍처
효과적인 다중 Agent 시스템은 세 가지 핵심 컴포넌트로 구성됩니다:
- Orchestrator Agent: 작업 분해 및 Agent 할당 조율
- Specialist Agents: 특정 도메인에 전문화된 작업 수행
- Shared State Manager: Agent 간 상태 및 결과 동기화
"""
다중 Agent 협업 시스템 코어 아키텍처
HolySheep AI API 기반 다중 Agent 오케스트레이션
"""
import asyncio
import hashlib
import json
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor
import aiohttp
from aiohttp import ClientTimeout
============================================================================
HolySheep AI API Configuration
============================================================================
HOLYSHEEP_BASE_URL = "https://api.holysheep.ai/v1"
API_KEY = "YOUR_HOLYSHEEP_API_KEY" # HolySheep AI API 키로 교체
class AgentStatus(Enum):
IDLE = "idle"
WORKING = "working"
WAITING = "waiting"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentTask:
task_id: str
agent_id: str
task_type: str
payload: Dict[str, Any]
status: AgentStatus = AgentStatus.IDLE
result: Optional[Any] = None
error: Optional[str] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
retry_count: int = 0
max_retries: int = 3
@dataclass
class SharedState:
"""Agent 간 공유 상태 관리"""
state_id: str
data: Dict[str, Any] = field(default_factory=dict)
version: int = 0
locks: Dict[str, asyncio.Lock] = field(default_factory=dict)
subscribers: List[str] = field(default_factory=list)
class HolySheepAIClient:
"""HolySheep AI API 클라이언트"""
def __init__(self, api_key: str, base_url: str = HOLYSHEEP_BASE_URL):
self.api_key = api_key
self.base_url = base_url
self.timeout = ClientTimeout(total=120)
self.rate_limiter = asyncio.Semaphore(10) # 동시 요청 제한
async def chat_completion(
self,
model: str,
messages: List[Dict],
temperature: float = 0.7,
max_tokens: int = 2048,
**kwargs
) -> Dict[str, Any]:
"""HolySheep AI 채팅 완성 API 호출"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
async with self.rate_limiter:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
) as response:
if response.status != 200:
error_body = await response.text()
raise Exception(f"API Error {response.status}: {error_body}")
result = await response.json()
return result
============================================================================
Agent 구현
============================================================================
class BaseAgent(ABC):
"""기본 Agent 추상 클래스"""
def __init__(
self,
agent_id: str,
specialty: str,
model: str,
client: HolySheepAIClient
):
self.agent_id = agent_id
self.specialty = specialty
self.model = model
self.client = client
self.status = AgentStatus.IDLE
self.task_queue: asyncio.Queue = asyncio.Queue()
self.result_callbacks: Dict[str, Callable] = {}
@abstractmethod
async def process_task(self, task: AgentTask) -> Any:
"""작업 처리 로직 구현"""
pass
async def execute_task(self, task: AgentTask) -> AgentTask:
"""작업 실행 및 상태 업데이트"""
task.status = AgentStatus.WORKING
task.updated_at = time.time()
try:
self.status = AgentStatus.WORKING
result = await self.process_task(task)
task.result = result
task.status = AgentStatus.COMPLETED
except Exception as e:
task.error = str(e)
task.status = AgentStatus.FAILED
self.status = AgentStatus.FAILED
# 재시도 로직
if task.retry_count < task.max_retries:
task.retry_count += 1
task.status = AgentStatus.IDLE
await asyncio.sleep(2 ** task.retry_count) # 지수 백오프
asyncio.create_task(self.execute_task(task))
finally:
task.updated_at = time.time()
self.status = AgentStatus.IDLE
return task
class OrchestratorAgent(BaseAgent):
"""작업 분해 및 할당 Orchestrator Agent"""
def __init__(
self,
agent_id: str,
model: str,
client: HolySheepAIClient,
shared_state: SharedState
):
super().__init__(agent_id, "orchestrator", model, client)
self.shared_state = shared_state
self.specialist_agents: Dict[str, BaseAgent] = {}
def register_specialist(self, agent: BaseAgent):
"""전문가 Agent 등록"""
self.specialist_agents[agent.agent_id] = agent
async def process_task(self, task: AgentTask) -> Dict[str, Any]:
"""복잡한 작업을 하위 작업으로 분해"""
# HolySheep AI를 사용한 작업 분해
decomposition_prompt = f"""다음 작업을 하위 작업으로 분解해주세요.
작업: {task.payload.get('description', 'N/A')}
응답 형식 (JSON):
{{
"subtasks": [
{{"type": "task_type", "assign_to": "agent_id", "payload": {{}}}}
]
}}
"""
response = await self.client.chat_completion(
model=self.model,
messages=[
{"role": "system", "content": "당신은 작업 분해 전문가입니다."},
{"role": "user", "content": decomposition_prompt}
],
temperature=0.3
)
# 분해된 작업 결과 파싱
content = response['choices'][0]['message']['content']
subtasks_data = json.loads(content)
# 공유 상태에 분해된 작업 등록
async with self.shared_state.locks.get('tasks', asyncio.Lock()):
self.shared_state.data['pending_tasks'] = subtasks_data.get('subtasks', [])
self.shared_state.version += 1
return {
"task_id": task.task_id,
"subtasks": subtasks_data.get('subtasks', []),
"decomposition_model": self.model,
"cost": response.get('usage', {}).get('total_tokens', 0)
}
print("다중 Agent 협업 시스템 기본 구조 로드 완료")
작업 할당 시스템 구현
효과적인 작업 할당은 Agent의 현재 부하, 전문 분야, 가용성을 고려해야 합니다. 로드 밸런싱 알고리즘을 적용하여 최적의 할당을 구현합니다.
"""
작업 할당 및 로드 밸런싱 시스템
"""
import heapq
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class AgentMetrics:
"""Agent 성능 메트릭스"""
agent_id: str
current_load: int = 0
completed_tasks: int = 0
failed_tasks: int = 0
avg_response_time: float = 0.0
success_rate: float = 1.0
class TaskScheduler:
"""우선순위 기반 작업 스케줄러"""
def __init__(self):
self.task_queue: List[Tuple[int, int, AgentTask]] = [] # (priority, timestamp, task)
self.agent_metrics: Dict[str, AgentMetrics] = {}
self.task_counter = 0
def add_task(self, task: AgentTask, priority: int = 5):
"""우선순위 큐에 작업 추가 (0이 가장 높음)"""
heapq.heappush(
self.task_queue,
(priority, self.task_counter, task)
)
self.task_counter += 1
def get_next_task(self) -> Optional[AgentTask]:
"""최고 우선순위 작업 가져오기"""
if self.task_queue:
_, _, task = heapq.heappop(self.task_queue)
return task
return None
def update_agent_metrics(self, agent_id: str, success: bool, response_time: float):
"""Agent 성능 메트릭 업데이트"""
if agent_id not in self.agent_metrics:
self.agent_metrics[agent_id] = AgentMetrics(agent_id=agent_id)
metrics = self.agent_metrics[agent_id]
metrics.current_load = max(0, metrics.current_load - 1)
if success:
metrics.completed_tasks += 1
else:
metrics.failed_tasks += 1
# 이동 평균으로 응답 시간 업데이트
metrics.avg_response_time = (
metrics.avg_response_time * 0.7 + response_time * 0.3
)
total = metrics.completed_tasks + metrics.failed_tasks
metrics.success_rate = metrics.completed_tasks / total if total > 0 else 1.0
class LoadBalancedAllocator:
"""부하 분산 기반 Agent 할당기"""
def __init__(self, scheduler: TaskScheduler):
self.scheduler = scheduler
self.agent_pool: Dict[str, BaseAgent] = {}
self.affinity_rules: Dict[str, List[str]] = {} # 작업 유형별 선호 Agent
def register_agent(self, agent: BaseAgent, specialties: List[str]):
"""Agent 등록 및 전문 분야 매핑"""
self.agent_pool[agent.agent_id] = agent
for specialty in specialties:
if specialty not in self.affinity_rules:
self.affinity_rules[specialty] = []
self.affinity_rules[specialty].append(agent.agent_id)
def allocate_task(self, task: AgentTask) -> Optional[str]:
"""최적 Agent 선택 및 작업 할당"""
# 선호도 기반 후보 필터링
task_type = task.task_type
candidates = self.affinity_rules.get(task_type, list(self.agent_pool.keys()))
if not candidates:
candidates = list(self.agent_pool.keys())
# 성능 기반 선택
best_agent = None
best_score = float('-inf')
for agent_id in candidates:
metrics = self.scheduler.agent_metrics.get(agent_id)
agent = self.agent_pool[agent_id]
# 점수 계산: 성공률 높고, 응답 시간 빠르고, 부하 낮은 Agent 선호
if metrics:
success_bonus = metrics.success_rate * 100
response_penalty = metrics.avg_response_time * 0.1
load_penalty = metrics.current_load * 5
score = success_bonus - response_penalty - load_penalty
else:
score = 100 # 새 Agent는 기본 점수
if score > best_score:
best_score = score
best_agent = agent_id
# 선택된 Agent의 부하 증가
if best_agent and best_agent in self.scheduler.agent_metrics:
self.scheduler.agent_metrics[best_agent].current_load += 1
return best_agent
async def task_allocation_example():
"""작업 할당 예제 시나리오"""
client = HolySheepAIClient(API_KEY)
shared_state = SharedState(state_id="main")
shared_state.locks['tasks'] = asyncio.Lock()
# Agent 풀 생성
agents = {
'orchestrator': OrchestratorAgent('orchestrator', 'gpt-4.1', client, shared_state),
'researcher': BaseAgent.__new__(BaseAgent),
'coder': BaseAgent.__new__(BaseAgent),
'reviewer': BaseAgent.__new__(BaseAgent)
}
# 스케줄러 및 할당기 초기화
scheduler = TaskScheduler()
allocator = LoadBalancedAllocator(scheduler)
# Agent 등록
allocator.register_agent(agents['orchestrator'], ['orchestration', 'planning'])
allocator.register_agent(agents['researcher'], ['research', 'analysis'])
allocator.register_agent(agents['coder'], ['coding', 'implementation'])
allocator.register_agent(agents['reviewer'], ['review', 'quality'])
# 테스트 작업 생성
test_tasks = [
AgentTask(
task_id=f"task_{i}",
agent_id="",
task_type=['research', 'coding', 'review'][i % 3],
payload={'description': f'Test task {i}'}
)
for i in range(10)
]
# 우선순위와 함께 작업 추가
for i, task in enumerate(test_tasks):
priority = (i % 3) + 1 # 1, 2, 3 우선순위
scheduler.add_task(task, priority)
# 작업 할당 실행
allocation_results = []
while True:
task = scheduler.get_next_task()
if not task:
break
agent_id = allocator.allocate_task(task)
if agent_id:
task.agent_id = agent_id
allocation_results.append({
'task_id': task.task_id,
'assigned_agent': agent_id,
'task_type': task.task_type
})
print(f"할당 완료: {len(allocation_results)}개 작업")
for result in allocation_results[:3]:
print(f" {result['task_id']} -> {result['assigned_agent']}")
return allocation_results
상태 공유 및 동기화 메커니즘
다중 Agent 환경에서 일관된 상태 관리는 시스템 신뢰성의 핵심입니다. 분산 잠금 및 이벤트 기반 동기화를 구현합니다.
"""
분산 상태 관리 및 동기화 시스템
"""
import asyncio
from collections import defaultdict
from typing import Set, Callable, Any
import asyncpg
import redis.asyncio as redis
class DistributedStateManager:
"""분산 환경용 상태 관리자"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
pg_dsn: str = "postgresql://user:pass@localhost/db"
):
self.redis: Optional[redis.Redis] = None
self.pg_pool: Optional[asyncpg.Pool] = None
self.redis_url = redis_url
self.pg_dsn = pg_dsn
self.local_cache: Dict[str, Any] = {}
self.subscribers: Dict[str, Set[Callable]] = defaultdict(set)
self.locks: Dict[str, asyncio.Lock] = {}
async def connect(self):
"""연결 풀 초기화"""
self.redis = await redis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True
)
self.pg_pool = await asyncpg.create_pool(self.pg_dsn)
async def disconnect(self):
"""연결 해제"""
if self.redis:
await self.redis.close()
if self.pg_pool:
await self.pg_pool.close()
async def get(self, key: str, use_cache: bool = True) -> Optional[Any]:
"""분산 캐시에서 값 조회"""
if use_cache and key in self.local_cache:
return self.local_cache[key]
value = await self.redis.get(key)
if value:
result = json.loads(value)
self.local_cache[key] = result
return result
return None
async