""" Multi-Agent RAG-Enhanced LLM System 감독자(Supervisor) -> 창의성 생성자(Creative) -> 비평자(Critic) -> 감독자(Final) 4단계 파이프라인을 통한 고품질 답변 생성 시스템 """ import os import json import asyncio import time from typing import Optional, List, Dict, Any, Tuple from contextlib import asynccontextmanager from datetime import datetime from enum import Enum import requests import uvicorn from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field import gradio as gr from dotenv import load_dotenv # 환경변수 로드 load_dotenv() # ============================================================================ # 데이터 모델 정의 # ============================================================================ class AgentRole(Enum): """에이전트 역할 정의""" SUPERVISOR = "supervisor" CREATIVE = "creative" CRITIC = "critic" FINALIZER = "finalizer" class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[Message] model: str = "accounts/fireworks/models/qwen3-235b-a22b-instruct-2507" max_tokens: int = Field(default=4096, ge=1, le=8192) temperature: float = Field(default=0.6, ge=0, le=2) top_p: float = Field(default=1.0, ge=0, le=1) top_k: int = Field(default=40, ge=1, le=100) use_search: bool = Field(default=True) class AgentResponse(BaseModel): role: AgentRole content: str metadata: Optional[Dict] = None class FinalResponse(BaseModel): final_answer: str agent_responses: List[AgentResponse] search_results: Optional[List[Dict]] = None processing_time: float # ============================================================================ # Brave Search 클라이언트 # ============================================================================ class BraveSearchClient: def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("BRAVE_SEARCH_API_KEY") if not self.api_key: print("⚠️ Warning: Brave Search API key not found. Search disabled.") self.base_url = "https://api.search.brave.com/res/v1/web/search" self.headers = { "Accept": "application/json", "X-Subscription-Token": self.api_key } if self.api_key else {} def search(self, query: str, count: int = 5) -> List[Dict]: """웹 검색 수행""" if not self.api_key: return [] params = { "q": query, "count": count, "text_decorations": False, "search_lang": "ko", "country": "KR" } try: response = requests.get( self.base_url, headers=self.headers, params=params, timeout=10 ) response.raise_for_status() data = response.json() results = [] if "web" in data and "results" in data["web"]: for item in data["web"]["results"][:count]: results.append({ "title": item.get("title", ""), "url": item.get("url", ""), "description": item.get("description", ""), "age": item.get("age", "") }) return results except Exception as e: print(f"Search error: {str(e)}") return [] # ============================================================================ # Fireworks LLM 클라이언트 # ============================================================================ class FireworksClient: def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("FIREWORKS_API_KEY") if not self.api_key: raise ValueError("FIREWORKS_API_KEY is required!") self.base_url = "https://api.fireworks.ai/inference/v1/chat/completions" self.headers = { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } def chat(self, messages: List[Dict], **kwargs) -> str: """LLM과 대화""" payload = { "model": kwargs.get("model", "accounts/fireworks/models/qwen3-235b-a22b-instruct-2507"), "messages": messages, "max_tokens": kwargs.get("max_tokens", 4096), "temperature": kwargs.get("temperature", 0.7), "top_p": kwargs.get("top_p", 1.0), "top_k": kwargs.get("top_k", 40) } try: response = requests.post( self.base_url, headers=self.headers, data=json.dumps(payload), timeout=60 ) response.raise_for_status() data = response.json() if "choices" in data and len(data["choices"]) > 0: return data["choices"][0]["message"]["content"] return "응답을 생성할 수 없습니다." except Exception as e: return f"오류 발생: {str(e)}" # ============================================================================ # 멀티 에이전트 시스템 # ============================================================================ class MultiAgentSystem: """4단계 멀티 에이전트 처리 시스템""" def __init__(self, llm_client: FireworksClient, search_client: BraveSearchClient): self.llm = llm_client self.search = search_client self.agent_configs = self._initialize_agent_configs() def _initialize_agent_configs(self) -> Dict: """각 에이전트별 설정 초기화""" return { AgentRole.SUPERVISOR: { "temperature": 0.3, "system_prompt": """당신은 감독자 에이전트입니다. 사용자의 질문과 검색 결과를 분석하여 답변의 전체적인 방향성과 구조를 제시해야 합니다. 역할: 1. 질문의 핵심 의도 파악 2. 검색 결과에서 핵심 정보 추출 3. 답변이 포함해야 할 주요 요소들 정의 4. 논리적 흐름과 구조 제시 출력 형식: - 질문 분석: [핵심 의도] - 주요 포함 사항: [항목들] - 답변 구조: [논리적 흐름] - 검색 결과 활용 방안: [어떤 정보를 어떻게 활용할지]""" }, AgentRole.CREATIVE: { "temperature": 0.9, "system_prompt": """당신은 창의성 생성자 에이전트입니다. 감독자의 지침을 바탕으로 창의적이고 흥미로운 답변을 생성해야 합니다. 역할: 1. 감독자의 구조를 따르되 창의적으로 확장 2. 예시, 비유, 스토리텔링 활용 3. 사용자 관점에서 이해하기 쉬운 설명 추가 4. 실용적이고 구체적인 조언 포함 5. 독창적인 관점과 통찰 제공 주의사항: - 정확성을 해치지 않는 선에서 창의성 발휘 - 검색 결과를 창의적으로 재구성 - 사용자 참여를 유도하는 내용 포함""" }, AgentRole.CRITIC: { "temperature": 0.2, "system_prompt": """당신은 비평자 에이전트입니다. 창의성 생성자의 답변을 검토하고 개선점을 제시해야 합니다. 역할: 1. 사실 관계 검증 2. 논리적 일관성 확인 3. 오해의 소지가 있는 표현 지적 4. 누락된 중요 정보 확인 5. 개선 방향 구체적 제시 평가 기준: - 정확성: 사실과 데이터의 정확성 - 완전성: 질문에 대한 충분한 답변 여부 - 명확성: 이해하기 쉬운 설명인지 - 유용성: 실제로 도움이 되는 정보인지 - 신뢰성: 검증 가능한 출처 포함 여부 출력 형식: ✅ 긍정적 측면: [잘된 점들] ⚠️ 개선 필요: [문제점과 개선 방안] 💡 추가 제안: [보완할 내용]""" }, AgentRole.FINALIZER: { "temperature": 0.5, "system_prompt": """당신은 최종 감독자입니다. 모든 에이전트의 의견을 종합하여 최종 답변을 생성해야 합니다. 역할: 1. 창의성 생성자의 답변을 기반으로 2. 비평자의 피드백을 반영하여 3. 감독자의 초기 구조를 유지하며 4. 논리적이고 이해하기 쉬운 최종 답변 생성 최종 답변 기준: - 정확성과 창의성의 균형 - 명확한 구조와 논리적 흐름 - 실용적이고 유용한 정보 - 사용자 친화적인 톤 - 검색 결과 출처 명시 반드시 포함할 요소: 1. 핵심 답변 (직접적인 응답) 2. 상세 설명 (배경과 맥락) 3. 실용적 조언 (해당 시) 4. 참고 자료 (검색 결과 기반)""" } } def _format_search_results(self, results: List[Dict]) -> str: """검색 결과 포맷팅""" if not results: return "검색 결과 없음" formatted = [] for i, result in enumerate(results, 1): formatted.append(f""" [검색결과 {i}] 제목: {result.get('title', 'N/A')} URL: {result.get('url', 'N/A')} 내용: {result.get('description', 'N/A')} 게시: {result.get('age', 'N/A')}""") return "\n".join(formatted) async def process_with_agents( self, query: str, search_results: List[Dict], config: Dict ) -> FinalResponse: """멀티 에이전트 파이프라인 실행""" start_time = time.time() agent_responses = [] search_context = self._format_search_results(search_results) # 1단계: 감독자 - 방향성 제시 supervisor_prompt = f""" 사용자 질문: {query} 검색 결과: {search_context} 위 정보를 바탕으로 답변의 방향성과 구조를 제시하세요.""" supervisor_response = self.llm.chat( messages=[ {"role": "system", "content": self.agent_configs[AgentRole.SUPERVISOR]["system_prompt"]}, {"role": "user", "content": supervisor_prompt} ], temperature=self.agent_configs[AgentRole.SUPERVISOR]["temperature"], max_tokens=config.get("max_tokens", 1000) ) agent_responses.append(AgentResponse( role=AgentRole.SUPERVISOR, content=supervisor_response )) # 2단계: 창의성 생성자 - 창의적 답변 생성 creative_prompt = f""" 사용자 질문: {query} 감독자 지침: {supervisor_response} 검색 결과: {search_context} 위 지침과 정보를 바탕으로 창의적이고 유용한 답변을 생성하세요.""" creative_response = self.llm.chat( messages=[ {"role": "system", "content": self.agent_configs[AgentRole.CREATIVE]["system_prompt"]}, {"role": "user", "content": creative_prompt} ], temperature=self.agent_configs[AgentRole.CREATIVE]["temperature"], max_tokens=config.get("max_tokens", 2000) ) agent_responses.append(AgentResponse( role=AgentRole.CREATIVE, content=creative_response )) # 3단계: 비평자 - 검토 및 개선점 제시 critic_prompt = f""" 원본 질문: {query} 창의성 생성자의 답변: {creative_response} 검색 결과: {search_context} 위 답변을 검토하고 개선점을 제시하세요.""" critic_response = self.llm.chat( messages=[ {"role": "system", "content": self.agent_configs[AgentRole.CRITIC]["system_prompt"]}, {"role": "user", "content": critic_prompt} ], temperature=self.agent_configs[AgentRole.CRITIC]["temperature"], max_tokens=config.get("max_tokens", 1000) ) agent_responses.append(AgentResponse( role=AgentRole.CRITIC, content=critic_response )) # 4단계: 최종 감독자 - 종합 및 최종 답변 final_prompt = f""" 사용자 질문: {query} 창의성 생성자의 답변: {creative_response} 비평자의 피드백: {critic_response} 초기 감독자 지침: {supervisor_response} 검색 결과: {search_context} 모든 의견을 종합하여 최종 답변을 생성하세요. 비평자의 피드백을 반영하여 개선된 버전을 만들어주세요.""" final_response = self.llm.chat( messages=[ {"role": "system", "content": self.agent_configs[AgentRole.FINALIZER]["system_prompt"]}, {"role": "user", "content": final_prompt} ], temperature=self.agent_configs[AgentRole.FINALIZER]["temperature"], max_tokens=config.get("max_tokens", 3000) ) agent_responses.append(AgentResponse( role=AgentRole.FINALIZER, content=final_response )) processing_time = time.time() - start_time return FinalResponse( final_answer=final_response, agent_responses=agent_responses, search_results=search_results, processing_time=processing_time ) # ============================================================================ # Gradio UI # ============================================================================ def create_gradio_interface(multi_agent_system: MultiAgentSystem, search_client: BraveSearchClient): """Gradio 인터페이스 생성""" async def process_query( message: str, history: List[List[str]], use_search: bool, show_agent_thoughts: bool, search_count: int, temperature: float, max_tokens: int ): """쿼리 처리 함수""" if not message: return "", history, "", "" try: # 검색 수행 search_results = [] if use_search and search_client.api_key: search_results = search_client.search(message, count=search_count) # 설정 config = { "temperature": temperature, "max_tokens": max_tokens } # 멀티 에이전트 처리 response = await multi_agent_system.process_with_agents( query=message, search_results=search_results, config=config ) # 에이전트 사고 과정 포맷팅 agent_thoughts = "" if show_agent_thoughts: agent_thoughts = "## 🤖 에이전트 사고 과정\n\n" for agent_resp in response.agent_responses: role_emoji = { AgentRole.SUPERVISOR: "👔", AgentRole.CREATIVE: "🎨", AgentRole.CRITIC: "🔍", AgentRole.FINALIZER: "✅" } role_name = { AgentRole.SUPERVISOR: "감독자 (초기 구조화)", AgentRole.CREATIVE: "창의성 생성자", AgentRole.CRITIC: "비평자", AgentRole.FINALIZER: "최종 감독자" } agent_thoughts += f"### {role_emoji[agent_resp.role]} {role_name[agent_resp.role]}\n" agent_thoughts += f"{agent_resp.content[:500]}...\n\n" # 검색 결과 포맷팅 search_display = "" if search_results: search_display = "## 📚 참고 자료\n\n" for i, result in enumerate(search_results, 1): search_display += f"**{i}. [{result['title']}]({result['url']})**\n" search_display += f" {result['description'][:100]}...\n\n" # 처리 시간 추가 final_answer = response.final_answer final_answer += f"\n\n---\n⏱️ *처리 시간: {response.processing_time:.2f}초*" # 히스토리 업데이트 history.append([message, final_answer]) return "", history, agent_thoughts, search_display except Exception as e: error_msg = f"❌ 오류 발생: {str(e)}" history.append([message, error_msg]) return "", history, "", "" # Gradio 인터페이스 with gr.Blocks( title="Multi-Agent RAG System", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1400px !important; margin: auto !important; } #chatbot { height: 600px !important; } """ ) as demo: gr.Markdown(""" # 🧠 Multi-Agent RAG System ### 4단계 에이전트 협업을 통한 고품질 답변 생성 **처리 과정:** 감독자(구조화) → 창의성 생성자(창의적 답변) → 비평자(검증) → 최종 감독자(종합) """) with gr.Row(): # 메인 채팅 영역 with gr.Column(scale=3): chatbot = gr.Chatbot( height=500, label="💬 대화", elem_id="chatbot" ) msg = gr.Textbox( label="질문 입력", placeholder="질문을 입력하세요... (멀티 에이전트가 협업하여 답변합니다)", lines=3 ) with gr.Row(): submit = gr.Button("🚀 전송", variant="primary") clear = gr.Button("🔄 초기화") # 에이전트 사고 과정 with gr.Accordion("🤖 에이전트 사고 과정", open=False): agent_thoughts = gr.Markdown() # 검색 결과 with gr.Accordion("📚 검색 소스", open=False): search_sources = gr.Markdown() # 설정 패널 with gr.Column(scale=1): gr.Markdown("### ⚙️ 설정") with gr.Group(): use_search = gr.Checkbox( label="🔍 웹 검색 사용", value=True ) show_agent_thoughts = gr.Checkbox( label="🧠 에이전트 사고과정 표시", value=True ) search_count = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="검색 결과 수" ) temperature = gr.Slider( minimum=0, maximum=1, value=0.6, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=500, maximum=4000, value=2000, step=100, label="Max Tokens" ) gr.Markdown(""" ### 📊 시스템 정보 **에이전트 역할:** - 👔 **감독자**: 구조 설계 - 🎨 **창의성**: 창의적 생성 - 🔍 **비평자**: 검증/개선 - ✅ **최종**: 종합/완성 """) # 예제 gr.Examples( examples=[ "양자 컴퓨터의 원리를 초등학생도 이해할 수 있게 설명해줘", "2024년 AI 기술 트렌드와 미래 전망은?", "효과적인 프로그래밍 학습 방법을 단계별로 알려줘", "기후 변화가 한국 경제에 미치는 영향 분석해줘", "스타트업 창업 시 고려해야 할 핵심 요소들은?" ], inputs=msg ) # 이벤트 바인딩 submit.click( process_query, inputs=[msg, chatbot, use_search, show_agent_thoughts, search_count, temperature, max_tokens], outputs=[msg, chatbot, agent_thoughts, search_sources] ) msg.submit( process_query, inputs=[msg, chatbot, use_search, show_agent_thoughts, search_count, temperature, max_tokens], outputs=[msg, chatbot, agent_thoughts, search_sources] ) clear.click( lambda: (None, None, None), None, [chatbot, agent_thoughts, search_sources] ) return demo # ============================================================================ # FastAPI 앱 # ============================================================================ @asynccontextmanager async def lifespan(app: FastAPI): """앱 생명주기 관리""" print("\n" + "="*60) print("🚀 Multi-Agent RAG System Starting...") print("="*60) yield print("\n👋 Shutting down...") app = FastAPI( title="Multi-Agent RAG System API", description="4-Stage Agent Collaboration System with RAG", version="3.0.0", lifespan=lifespan ) # CORS 설정 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) # 클라이언트 초기화 try: llm_client = FireworksClient() search_client = BraveSearchClient() multi_agent_system = MultiAgentSystem(llm_client, search_client) except Exception as e: print(f"⚠️ Initialization error: {e}") llm_client = None search_client = None multi_agent_system = None # API 엔드포인트 @app.get("/") async def root(): """루트 엔드포인트""" return { "name": "Multi-Agent RAG System", "version": "3.0.0", "status": "running", "ui": "http://localhost:8000/ui", "docs": "http://localhost:8000/docs" } @app.post("/api/chat") async def chat_endpoint(request: ChatRequest): """멀티 에이전트 채팅 API""" if not multi_agent_system: raise HTTPException(status_code=500, detail="System not initialized") try: # 검색 수행 search_results = [] if request.use_search and search_client.api_key: last_message = request.messages[-1].content if request.messages else "" search_results = search_client.search(last_message, count=5) # 멀티 에이전트 처리 response = await multi_agent_system.process_with_agents( query=request.messages[-1].content, search_results=search_results, config={ "temperature": request.temperature, "max_tokens": request.max_tokens } ) return response except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health_check(): """헬스 체크""" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "services": { "llm": "ready" if llm_client else "not configured", "search": "ready" if search_client and search_client.api_key else "not configured", "multi_agent": "ready" if multi_agent_system else "not configured" } } # Gradio 마운트 if multi_agent_system: gradio_app = create_gradio_interface(multi_agent_system, search_client) app = gr.mount_gradio_app(app, gradio_app, path="/ui") # ============================================================================ # 메인 실행 # ============================================================================ if __name__ == "__main__": print(""" ╔══════════════════════════════════════════════════════════════╗ ║ 🧠 Multi-Agent RAG-Enhanced LLM System 🧠 ║ ║ ║ ║ 감독자 → 창의성 생성자 → 비평자 → 최종 감독자 ║ ║ 4단계 협업을 통한 고품질 답변 생성 ║ ╚══════════════════════════════════════════════════════════════╝ """) # API 키 확인 if not os.getenv("FIREWORKS_API_KEY"): print("\n⚠️ FIREWORKS_API_KEY가 설정되지 않았습니다.") key = input("Fireworks API Key 입력: ").strip() if key: os.environ["FIREWORKS_API_KEY"] = key llm_client = FireworksClient(key) if not os.getenv("BRAVE_SEARCH_API_KEY"): print("\n⚠️ BRAVE_SEARCH_API_KEY가 설정되지 않았습니다.") print(" (선택사항: 검색 기능을 사용하려면 입력)") key = input("Brave Search API Key 입력 (Enter=건너뛰기): ").strip() if key: os.environ["BRAVE_SEARCH_API_KEY"] = key search_client = BraveSearchClient(key) # 시스템 재초기화 if llm_client: multi_agent_system = MultiAgentSystem(llm_client, search_client) gradio_app = create_gradio_interface(multi_agent_system, search_client) app = gr.mount_gradio_app(app, gradio_app, path="/ui") print("\n" + "="*60) print("✅ 시스템 준비 완료!") print("="*60) print("\n📍 접속 주소:") print(" 🎨 Gradio UI: http://localhost:8000/ui") print(" 📚 API Docs: http://localhost:8000/docs") print(" 🔧 Chat API: POST http://localhost:8000/api/chat") print("\n💡 Ctrl+C를 눌러 종료") print("="*60 + "\n") uvicorn.run( app, host="0.0.0.0", port=8000, reload=False, log_level="info" )