|
|
|
|
|
import uuid |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class MVPOrchestrator: |
|
|
def __init__(self, llm_router, context_manager, agents): |
|
|
self.llm_router = llm_router |
|
|
self.context_manager = context_manager |
|
|
self.agents = agents |
|
|
self.execution_trace = [] |
|
|
logger.info("MVPOrchestrator initialized") |
|
|
|
|
|
async def process_request(self, session_id: str, user_input: str) -> dict: |
|
|
""" |
|
|
Main orchestration flow with academic differentiation |
|
|
""" |
|
|
logger.info(f"Processing request for session {session_id}") |
|
|
logger.info(f"User input: {user_input[:100]}") |
|
|
|
|
|
try: |
|
|
|
|
|
interaction_id = self._generate_interaction_id(session_id) |
|
|
logger.info(f"Generated interaction ID: {interaction_id}") |
|
|
|
|
|
|
|
|
logger.info("Step 2: Managing context...") |
|
|
context = await self.context_manager.manage_context(session_id, user_input) |
|
|
logger.info(f"Context retrieved: {len(context.get('interactions', []))} interactions") |
|
|
|
|
|
|
|
|
logger.info("Step 3: Recognizing intent...") |
|
|
intent_result = await self.agents['intent_recognition'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}") |
|
|
|
|
|
|
|
|
logger.info("Step 4: Creating execution plan...") |
|
|
execution_plan = await self._create_execution_plan(intent_result, context) |
|
|
|
|
|
|
|
|
logger.info("Step 5: Executing agents...") |
|
|
agent_results = await self._execute_agents(execution_plan, user_input, context) |
|
|
logger.info(f"Agent execution complete: {len(agent_results)} results") |
|
|
|
|
|
|
|
|
logger.info("Step 6: Synthesizing response...") |
|
|
final_response = await self.agents['response_synthesis'].execute( |
|
|
agent_outputs=agent_results, |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Step 7: Safety check...") |
|
|
safety_checked = await self.agents['safety_check'].execute( |
|
|
response=final_response, |
|
|
context=context |
|
|
) |
|
|
|
|
|
result = self._format_final_output(safety_checked, interaction_id) |
|
|
logger.info(f"Request processing complete. Response length: {len(str(result.get('response', '')))}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in process_request: {e}", exc_info=True) |
|
|
return { |
|
|
"response": f"Error processing request: {str(e)}", |
|
|
"error": str(e), |
|
|
"interaction_id": str(uuid.uuid4())[:8] |
|
|
} |
|
|
|
|
|
def _generate_interaction_id(self, session_id: str) -> str: |
|
|
""" |
|
|
Generate unique interaction identifier |
|
|
""" |
|
|
timestamp = datetime.now().isoformat() |
|
|
unique_id = str(uuid.uuid4())[:8] |
|
|
return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}" |
|
|
|
|
|
async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict: |
|
|
""" |
|
|
Create execution plan based on intent recognition |
|
|
""" |
|
|
|
|
|
return { |
|
|
"agents_to_execute": [], |
|
|
"execution_order": "parallel", |
|
|
"priority": "normal" |
|
|
} |
|
|
|
|
|
async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict: |
|
|
""" |
|
|
Execute agents in parallel or sequential order based on plan |
|
|
""" |
|
|
|
|
|
return {} |
|
|
|
|
|
def _format_final_output(self, response: dict, interaction_id: str) -> dict: |
|
|
""" |
|
|
Format final output with tracing and metadata |
|
|
""" |
|
|
return { |
|
|
"interaction_id": interaction_id, |
|
|
"response": response.get("final_response", ""), |
|
|
"confidence_score": response.get("confidence_score", 0.0), |
|
|
"agent_trace": self.execution_trace, |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"metadata": { |
|
|
"agents_used": response.get("agents_used", []), |
|
|
"processing_time": response.get("processing_time", 0), |
|
|
"token_count": response.get("token_count", 0) |
|
|
} |
|
|
} |
|
|
|
|
|
def get_execution_trace(self) -> list: |
|
|
""" |
|
|
Return execution trace for debugging and analysis |
|
|
""" |
|
|
return self.execution_trace |
|
|
|
|
|
def clear_execution_trace(self): |
|
|
""" |
|
|
Clear the execution trace |
|
|
""" |
|
|
self.execution_trace = [] |
|
|
|
|
|
|