Research_AI_Assistant / src /agents /synthesis_agent.py
JatsTheAIGen's picture
Initial commit V1
66dbebd
raw
history blame
14.7 kB
"""
Response Synthesis Agent
Specialized in integrating multiple agent outputs into coherent responses
"""
import logging
from typing import Dict, Any, List
import re
logger = logging.getLogger(__name__)
class ResponseSynthesisAgent:
def __init__(self, llm_router=None):
self.llm_router = llm_router
self.agent_id = "RESP_SYNTH_001"
self.specialization = "Multi-source information integration and coherent response generation"
# Response templates for different intent types
self.response_templates = {
"information_request": {
"structure": "introduction β†’ key_points β†’ conclusion",
"tone": "informative, clear, authoritative"
},
"task_execution": {
"structure": "confirmation β†’ steps β†’ expected_outcome",
"tone": "action-oriented, precise, reassuring"
},
"creative_generation": {
"structure": "concept β†’ development β†’ refinement",
"tone": "creative, engaging, expressive"
},
"analysis_research": {
"structure": "hypothesis β†’ analysis β†’ insights",
"tone": "analytical, evidence-based, objective"
},
"casual_conversation": {
"structure": "engagement β†’ response β†’ follow_up",
"tone": "friendly, conversational, natural"
}
}
async def execute(self, agent_outputs: List[Dict[str, Any]], user_input: str,
context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
"""
Synthesize responses from multiple agent outputs
"""
try:
logger.info(f"{self.agent_id} synthesizing {len(agent_outputs)} agent outputs")
# Extract intent information
intent_info = self._extract_intent_info(agent_outputs)
primary_intent = intent_info.get('primary_intent', 'casual_conversation')
# Structure the synthesis process
synthesis_result = await self._synthesize_response(
agent_outputs, user_input, context, primary_intent
)
# Add quality metrics
synthesis_result.update({
"agent_id": self.agent_id,
"synthesis_quality_metrics": self._calculate_quality_metrics(synthesis_result),
"intent_alignment": self._check_intent_alignment(synthesis_result, intent_info)
})
logger.info(f"{self.agent_id} completed synthesis")
return synthesis_result
except Exception as e:
logger.error(f"{self.agent_id} synthesis error: {str(e)}")
return self._get_fallback_response(user_input, agent_outputs)
async def _synthesize_response(self, agent_outputs: List[Dict[str, Any]],
user_input: str, context: Dict[str, Any],
primary_intent: str) -> Dict[str, Any]:
"""Synthesize responses using appropriate method based on intent"""
if self.llm_router:
# Use LLM for sophisticated synthesis
return await self._llm_based_synthesis(agent_outputs, user_input, context, primary_intent)
else:
# Use template-based synthesis
return await self._template_based_synthesis(agent_outputs, user_input, primary_intent)
async def _llm_based_synthesis(self, agent_outputs: List[Dict[str, Any]],
user_input: str, context: Dict[str, Any],
primary_intent: str) -> Dict[str, Any]:
"""Use LLM for sophisticated response synthesis"""
synthesis_prompt = self._build_synthesis_prompt(agent_outputs, user_input, context, primary_intent)
# Simulate LLM synthesis (replace with actual LLM call)
synthesized_response = await self._template_based_synthesis(agent_outputs, user_input, primary_intent)
# Enhance with simulated LLM improvements
draft_response = synthesized_response["final_response"]
enhanced_response = self._enhance_response_quality(draft_response, primary_intent)
return {
"draft_response": draft_response,
"final_response": enhanced_response,
"source_references": self._extract_source_references(agent_outputs),
"coherence_score": 0.85,
"improvement_opportunities": self._identify_improvements(enhanced_response),
"synthesis_method": "llm_enhanced"
}
async def _template_based_synthesis(self, agent_outputs: List[Dict[str, Any]],
user_input: str, primary_intent: str) -> Dict[str, Any]:
"""Template-based response synthesis"""
template = self.response_templates.get(primary_intent, self.response_templates["casual_conversation"])
# Extract relevant content from agent outputs
content_blocks = self._extract_content_blocks(agent_outputs)
# Apply template structure
structured_response = self._apply_response_template(content_blocks, template, primary_intent)
return {
"draft_response": structured_response,
"final_response": structured_response, # No enhancement in template mode
"source_references": self._extract_source_references(agent_outputs),
"coherence_score": 0.75,
"improvement_opportunities": ["Consider adding more specific details"],
"synthesis_method": "template_based"
}
def _build_synthesis_prompt(self, agent_outputs: List[Dict[str, Any]],
user_input: str, context: Dict[str, Any],
primary_intent: str) -> str:
"""Build prompt for LLM-based synthesis"""
return f"""
Synthesize a coherent response from multiple AI agent outputs:
User Question: "{user_input}"
Primary Intent: {primary_intent}
Agent Outputs to Integrate:
{self._format_agent_outputs_for_synthesis(agent_outputs)}
Conversation Context: {context.get('conversation_history', [])[-3:] if context else 'No context'}
Requirements:
- Maintain accuracy from source materials
- Ensure logical flow and coherence
- Match the {primary_intent} intent style
- Keep response concise but comprehensive
- Include relevant details from agent outputs
Provide a well-structured, natural-sounding response.
"""
def _extract_intent_info(self, agent_outputs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Extract intent information from agent outputs"""
for output in agent_outputs:
if 'primary_intent' in output:
return {
'primary_intent': output['primary_intent'],
'confidence': output.get('confidence_scores', {}).get(output['primary_intent'], 0.5),
'source_agent': output.get('agent_id', 'unknown')
}
return {'primary_intent': 'casual_conversation', 'confidence': 0.5}
def _extract_content_blocks(self, agent_outputs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract content blocks from agent outputs for synthesis"""
content_blocks = []
for output in agent_outputs:
if 'result' in output:
content_blocks.append({
'content': output['result'],
'source': output.get('agent_id', 'unknown'),
'confidence': output.get('confidence', 0.5)
})
elif 'primary_intent' in output:
content_blocks.append({
'content': f"Intent analysis: {output['primary_intent']}",
'source': output.get('agent_id', 'intent_agent'),
'confidence': output.get('confidence_scores', {}).get(output['primary_intent'], 0.5)
})
elif 'final_response' in output:
content_blocks.append({
'content': output['final_response'],
'source': output.get('agent_id', 'unknown'),
'confidence': output.get('confidence_score', 0.7)
})
return content_blocks
def _apply_response_template(self, content_blocks: List[Dict[str, Any]],
template: Dict[str, str], intent: str) -> str:
"""Apply response template to structure the content"""
if intent == "information_request":
return self._structure_informative_response(content_blocks)
elif intent == "task_execution":
return self._structure_actionable_response(content_blocks)
else:
return self._structure_conversational_response(content_blocks)
def _structure_informative_response(self, content_blocks: List[Dict[str, Any]]) -> str:
"""Structure an informative response (intro β†’ key_points β†’ conclusion)"""
if not content_blocks:
return "I'm here to help! Could you provide more details about what you're looking for?"
intro = f"Based on the information available"
key_points = "\n".join([f"β€’ {block['content']}" for block in content_blocks[:3]])
conclusion = "I hope this helps! Let me know if you need any clarification."
return f"{intro}:\n\n{key_points}\n\n{conclusion}"
def _structure_actionable_response(self, content_blocks: List[Dict[str, Any]]) -> str:
"""Structure an actionable response (confirmation β†’ steps β†’ outcome)"""
if not content_blocks:
return "I understand you'd like some help. What specific task would you like to accomplish?"
confirmation = "I can help with that!"
steps = "\n".join([f"{i+1}. {block['content']}" for i, block in enumerate(content_blocks[:5])])
outcome = "This should help you get started. Feel free to ask if you need further assistance."
return f"{confirmation}\n\n{steps}\n\n{outcome}"
def _structure_conversational_response(self, content_blocks: List[Dict[str, Any]]) -> str:
"""Structure a conversational response"""
if not content_blocks:
return "Thanks for chatting! How can I assist you today?"
# Combine content naturally
combined_content = " ".join([block['content'] for block in content_blocks])
return combined_content[:500] + "..." if len(combined_content) > 500 else combined_content
def _enhance_response_quality(self, response: str, intent: str) -> str:
"""Enhance response quality based on intent"""
# Add simple enhancements
enhanced = response
# Check for clarity
if len(response.split()) < 5:
enhanced += "\n\nWould you like me to expand on this?"
# Add intent-specific enhancements
if intent == "information_request" and "?" not in response:
enhanced += "\n\nIs there anything specific you'd like to know more about?"
return enhanced
def _extract_source_references(self, agent_outputs: List[Dict[str, Any]]) -> List[str]:
"""Extract source references from agent outputs"""
sources = []
for output in agent_outputs:
agent_id = output.get('agent_id', 'unknown')
sources.append(agent_id)
return list(set(sources)) # Remove duplicates
def _format_agent_outputs_for_synthesis(self, agent_outputs: List[Dict[str, Any]]) -> str:
"""Format agent outputs for LLM synthesis prompt"""
formatted = []
for i, output in enumerate(agent_outputs, 1):
agent_id = output.get('agent_id', 'unknown')
content = output.get('result', output.get('final_response', str(output)))
formatted.append(f"Agent {i} ({agent_id}): {content[:100]}...")
return "\n".join(formatted)
def _calculate_quality_metrics(self, synthesis_result: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate quality metrics for synthesis"""
response = synthesis_result.get('final_response', '')
return {
"length": len(response),
"word_count": len(response.split()),
"coherence_score": synthesis_result.get('coherence_score', 0.7),
"source_count": len(synthesis_result.get('source_references', [])),
"has_structured_elements": bool(re.search(r'[β€’\d+\.]', response))
}
def _check_intent_alignment(self, synthesis_result: Dict[str, Any], intent_info: Dict[str, Any]) -> Dict[str, Any]:
"""Check if synthesis aligns with detected intent"""
alignment_score = 0.8 # Placeholder
return {
"intent_detected": intent_info.get('primary_intent'),
"alignment_score": alignment_score,
"alignment_verified": alignment_score > 0.7
}
def _identify_improvements(self, response: str) -> List[str]:
"""Identify opportunities to improve the response"""
improvements = []
if len(response) < 50:
improvements.append("Could be more detailed")
if "?" not in response and len(response.split()) < 100:
improvements.append("Consider adding examples")
return improvements
def _get_fallback_response(self, user_input: str, agent_outputs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Provide fallback response when synthesis fails"""
return {
"final_response": f"I apologize, but I'm having trouble generating a response. Your question was: {user_input[:100]}...",
"draft_response": "",
"source_references": [],
"coherence_score": 0.3,
"improvement_opportunities": ["System had synthesis error"],
"synthesis_method": "fallback",
"agent_id": self.agent_id,
"synthesis_quality_metrics": {"error": "synthesis_failed"},
"intent_alignment": {"error": "not_available"},
"error_handled": True
}
# Factory function for easy instantiation
def create_synthesis_agent(llm_router=None):
return ResponseSynthesisAgent(llm_router)