Research_AI_Assistant / orchestrator_engine.py
JatsTheAIGen's picture
workflow errors debugging V3
ae20ff2
raw
history blame
5.19 kB
# orchestrator_engine.py
import uuid
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class MVPOrchestrator:
def __init__(self, llm_router, context_manager, agents):
self.llm_router = llm_router
self.context_manager = context_manager
self.agents = agents
self.execution_trace = []
logger.info("MVPOrchestrator initialized")
async def process_request(self, session_id: str, user_input: str) -> dict:
"""
Main orchestration flow with academic differentiation
"""
logger.info(f"Processing request for session {session_id}")
logger.info(f"User input: {user_input[:100]}")
try:
# Step 1: Generate unique interaction ID
interaction_id = self._generate_interaction_id(session_id)
logger.info(f"Generated interaction ID: {interaction_id}")
# Step 2: Context management
logger.info("Step 2: Managing context...")
context = await self.context_manager.manage_context(session_id, user_input)
logger.info(f"Context retrieved: {len(context.get('interactions', []))} interactions")
# Step 3: Intent recognition with CoT
logger.info("Step 3: Recognizing intent...")
intent_result = await self.agents['intent_recognition'].execute(
user_input=user_input,
context=context
)
logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}")
# Step 4: Agent execution planning
logger.info("Step 4: Creating execution plan...")
execution_plan = await self._create_execution_plan(intent_result, context)
# Step 5: Parallel agent execution
logger.info("Step 5: Executing agents...")
agent_results = await self._execute_agents(execution_plan, user_input, context)
logger.info(f"Agent execution complete: {len(agent_results)} results")
# Step 6: Response synthesis
logger.info("Step 6: Synthesizing response...")
final_response = await self.agents['response_synthesis'].execute(
agent_outputs=agent_results,
user_input=user_input,
context=context
)
# Step 7: Safety and bias check
logger.info("Step 7: Safety check...")
safety_checked = await self.agents['safety_check'].execute(
response=final_response,
context=context
)
result = self._format_final_output(safety_checked, interaction_id)
logger.info(f"Request processing complete. Response length: {len(str(result.get('response', '')))}")
return result
except Exception as e:
logger.error(f"Error in process_request: {e}", exc_info=True)
return {
"response": f"Error processing request: {str(e)}",
"error": str(e),
"interaction_id": str(uuid.uuid4())[:8]
}
def _generate_interaction_id(self, session_id: str) -> str:
"""
Generate unique interaction identifier
"""
timestamp = datetime.now().isoformat()
unique_id = str(uuid.uuid4())[:8]
return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}"
async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict:
"""
Create execution plan based on intent recognition
"""
# TODO: Implement agent selection and sequencing logic
return {
"agents_to_execute": [],
"execution_order": "parallel",
"priority": "normal"
}
async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict:
"""
Execute agents in parallel or sequential order based on plan
"""
# TODO: Implement parallel/sequential agent execution
return {}
def _format_final_output(self, response: dict, interaction_id: str) -> dict:
"""
Format final output with tracing and metadata
"""
return {
"interaction_id": interaction_id,
"response": response.get("final_response", ""),
"confidence_score": response.get("confidence_score", 0.0),
"agent_trace": self.execution_trace,
"timestamp": datetime.now().isoformat(),
"metadata": {
"agents_used": response.get("agents_used", []),
"processing_time": response.get("processing_time", 0),
"token_count": response.get("token_count", 0)
}
}
def get_execution_trace(self) -> list:
"""
Return execution trace for debugging and analysis
"""
return self.execution_trace
def clear_execution_trace(self):
"""
Clear the execution trace
"""
self.execution_trace = []