|
|
""" |
|
|
Response Synthesis Agent |
|
|
Specialized in integrating multiple agent outputs into coherent responses |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, Any, List |
|
|
import re |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ResponseSynthesisAgent: |
|
|
def __init__(self, llm_router=None): |
|
|
self.llm_router = llm_router |
|
|
self.agent_id = "RESP_SYNTH_001" |
|
|
self.specialization = "Multi-source information integration and coherent response generation" |
|
|
|
|
|
|
|
|
self.response_templates = { |
|
|
"information_request": { |
|
|
"structure": "introduction β key_points β conclusion", |
|
|
"tone": "informative, clear, authoritative" |
|
|
}, |
|
|
"task_execution": { |
|
|
"structure": "confirmation β steps β expected_outcome", |
|
|
"tone": "action-oriented, precise, reassuring" |
|
|
}, |
|
|
"creative_generation": { |
|
|
"structure": "concept β development β refinement", |
|
|
"tone": "creative, engaging, expressive" |
|
|
}, |
|
|
"analysis_research": { |
|
|
"structure": "hypothesis β analysis β insights", |
|
|
"tone": "analytical, evidence-based, objective" |
|
|
}, |
|
|
"casual_conversation": { |
|
|
"structure": "engagement β response β follow_up", |
|
|
"tone": "friendly, conversational, natural" |
|
|
} |
|
|
} |
|
|
|
|
|
async def execute(self, agent_outputs: List[Dict[str, Any]], user_input: str, |
|
|
context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Synthesize responses from multiple agent outputs |
|
|
""" |
|
|
try: |
|
|
logger.info(f"{self.agent_id} synthesizing {len(agent_outputs)} agent outputs") |
|
|
|
|
|
|
|
|
intent_info = self._extract_intent_info(agent_outputs) |
|
|
primary_intent = intent_info.get('primary_intent', 'casual_conversation') |
|
|
|
|
|
|
|
|
synthesis_result = await self._synthesize_response( |
|
|
agent_outputs, user_input, context, primary_intent |
|
|
) |
|
|
|
|
|
|
|
|
synthesis_result.update({ |
|
|
"agent_id": self.agent_id, |
|
|
"synthesis_quality_metrics": self._calculate_quality_metrics(synthesis_result), |
|
|
"intent_alignment": self._check_intent_alignment(synthesis_result, intent_info) |
|
|
}) |
|
|
|
|
|
logger.info(f"{self.agent_id} completed synthesis") |
|
|
return synthesis_result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"{self.agent_id} synthesis error: {str(e)}") |
|
|
return self._get_fallback_response(user_input, agent_outputs) |
|
|
|
|
|
async def _synthesize_response(self, agent_outputs: List[Dict[str, Any]], |
|
|
user_input: str, context: Dict[str, Any], |
|
|
primary_intent: str) -> Dict[str, Any]: |
|
|
"""Synthesize responses using appropriate method based on intent""" |
|
|
|
|
|
if self.llm_router: |
|
|
|
|
|
return await self._llm_based_synthesis(agent_outputs, user_input, context, primary_intent) |
|
|
else: |
|
|
|
|
|
return await self._template_based_synthesis(agent_outputs, user_input, primary_intent) |
|
|
|
|
|
async def _llm_based_synthesis(self, agent_outputs: List[Dict[str, Any]], |
|
|
user_input: str, context: Dict[str, Any], |
|
|
primary_intent: str) -> Dict[str, Any]: |
|
|
"""Use LLM for sophisticated response synthesis""" |
|
|
|
|
|
synthesis_prompt = self._build_synthesis_prompt(agent_outputs, user_input, context, primary_intent) |
|
|
|
|
|
|
|
|
synthesized_response = await self._template_based_synthesis(agent_outputs, user_input, primary_intent) |
|
|
|
|
|
|
|
|
draft_response = synthesized_response["final_response"] |
|
|
enhanced_response = self._enhance_response_quality(draft_response, primary_intent) |
|
|
|
|
|
return { |
|
|
"draft_response": draft_response, |
|
|
"final_response": enhanced_response, |
|
|
"source_references": self._extract_source_references(agent_outputs), |
|
|
"coherence_score": 0.85, |
|
|
"improvement_opportunities": self._identify_improvements(enhanced_response), |
|
|
"synthesis_method": "llm_enhanced" |
|
|
} |
|
|
|
|
|
async def _template_based_synthesis(self, agent_outputs: List[Dict[str, Any]], |
|
|
user_input: str, primary_intent: str) -> Dict[str, Any]: |
|
|
"""Template-based response synthesis""" |
|
|
|
|
|
template = self.response_templates.get(primary_intent, self.response_templates["casual_conversation"]) |
|
|
|
|
|
|
|
|
content_blocks = self._extract_content_blocks(agent_outputs) |
|
|
|
|
|
|
|
|
structured_response = self._apply_response_template(content_blocks, template, primary_intent) |
|
|
|
|
|
return { |
|
|
"draft_response": structured_response, |
|
|
"final_response": structured_response, |
|
|
"source_references": self._extract_source_references(agent_outputs), |
|
|
"coherence_score": 0.75, |
|
|
"improvement_opportunities": ["Consider adding more specific details"], |
|
|
"synthesis_method": "template_based" |
|
|
} |
|
|
|
|
|
def _build_synthesis_prompt(self, agent_outputs: List[Dict[str, Any]], |
|
|
user_input: str, context: Dict[str, Any], |
|
|
primary_intent: str) -> str: |
|
|
"""Build prompt for LLM-based synthesis""" |
|
|
|
|
|
return f""" |
|
|
Synthesize a coherent response from multiple AI agent outputs: |
|
|
|
|
|
User Question: "{user_input}" |
|
|
Primary Intent: {primary_intent} |
|
|
|
|
|
Agent Outputs to Integrate: |
|
|
{self._format_agent_outputs_for_synthesis(agent_outputs)} |
|
|
|
|
|
Conversation Context: {context.get('conversation_history', [])[-3:] if context else 'No context'} |
|
|
|
|
|
Requirements: |
|
|
- Maintain accuracy from source materials |
|
|
- Ensure logical flow and coherence |
|
|
- Match the {primary_intent} intent style |
|
|
- Keep response concise but comprehensive |
|
|
- Include relevant details from agent outputs |
|
|
|
|
|
Provide a well-structured, natural-sounding response. |
|
|
""" |
|
|
|
|
|
def _extract_intent_info(self, agent_outputs: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
"""Extract intent information from agent outputs""" |
|
|
for output in agent_outputs: |
|
|
if 'primary_intent' in output: |
|
|
return { |
|
|
'primary_intent': output['primary_intent'], |
|
|
'confidence': output.get('confidence_scores', {}).get(output['primary_intent'], 0.5), |
|
|
'source_agent': output.get('agent_id', 'unknown') |
|
|
} |
|
|
return {'primary_intent': 'casual_conversation', 'confidence': 0.5} |
|
|
|
|
|
def _extract_content_blocks(self, agent_outputs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Extract content blocks from agent outputs for synthesis""" |
|
|
content_blocks = [] |
|
|
|
|
|
for output in agent_outputs: |
|
|
if 'result' in output: |
|
|
content_blocks.append({ |
|
|
'content': output['result'], |
|
|
'source': output.get('agent_id', 'unknown'), |
|
|
'confidence': output.get('confidence', 0.5) |
|
|
}) |
|
|
elif 'primary_intent' in output: |
|
|
content_blocks.append({ |
|
|
'content': f"Intent analysis: {output['primary_intent']}", |
|
|
'source': output.get('agent_id', 'intent_agent'), |
|
|
'confidence': output.get('confidence_scores', {}).get(output['primary_intent'], 0.5) |
|
|
}) |
|
|
elif 'final_response' in output: |
|
|
content_blocks.append({ |
|
|
'content': output['final_response'], |
|
|
'source': output.get('agent_id', 'unknown'), |
|
|
'confidence': output.get('confidence_score', 0.7) |
|
|
}) |
|
|
|
|
|
return content_blocks |
|
|
|
|
|
def _apply_response_template(self, content_blocks: List[Dict[str, Any]], |
|
|
template: Dict[str, str], intent: str) -> str: |
|
|
"""Apply response template to structure the content""" |
|
|
|
|
|
if intent == "information_request": |
|
|
return self._structure_informative_response(content_blocks) |
|
|
elif intent == "task_execution": |
|
|
return self._structure_actionable_response(content_blocks) |
|
|
else: |
|
|
return self._structure_conversational_response(content_blocks) |
|
|
|
|
|
def _structure_informative_response(self, content_blocks: List[Dict[str, Any]]) -> str: |
|
|
"""Structure an informative response (intro β key_points β conclusion)""" |
|
|
if not content_blocks: |
|
|
return "I'm here to help! Could you provide more details about what you're looking for?" |
|
|
|
|
|
intro = f"Based on the information available" |
|
|
key_points = "\n".join([f"β’ {block['content']}" for block in content_blocks[:3]]) |
|
|
conclusion = "I hope this helps! Let me know if you need any clarification." |
|
|
|
|
|
return f"{intro}:\n\n{key_points}\n\n{conclusion}" |
|
|
|
|
|
def _structure_actionable_response(self, content_blocks: List[Dict[str, Any]]) -> str: |
|
|
"""Structure an actionable response (confirmation β steps β outcome)""" |
|
|
if not content_blocks: |
|
|
return "I understand you'd like some help. What specific task would you like to accomplish?" |
|
|
|
|
|
confirmation = "I can help with that!" |
|
|
steps = "\n".join([f"{i+1}. {block['content']}" for i, block in enumerate(content_blocks[:5])]) |
|
|
outcome = "This should help you get started. Feel free to ask if you need further assistance." |
|
|
|
|
|
return f"{confirmation}\n\n{steps}\n\n{outcome}" |
|
|
|
|
|
def _structure_conversational_response(self, content_blocks: List[Dict[str, Any]]) -> str: |
|
|
"""Structure a conversational response""" |
|
|
if not content_blocks: |
|
|
return "Thanks for chatting! How can I assist you today?" |
|
|
|
|
|
|
|
|
combined_content = " ".join([block['content'] for block in content_blocks]) |
|
|
return combined_content[:500] + "..." if len(combined_content) > 500 else combined_content |
|
|
|
|
|
def _enhance_response_quality(self, response: str, intent: str) -> str: |
|
|
"""Enhance response quality based on intent""" |
|
|
|
|
|
enhanced = response |
|
|
|
|
|
|
|
|
if len(response.split()) < 5: |
|
|
enhanced += "\n\nWould you like me to expand on this?" |
|
|
|
|
|
|
|
|
if intent == "information_request" and "?" not in response: |
|
|
enhanced += "\n\nIs there anything specific you'd like to know more about?" |
|
|
|
|
|
return enhanced |
|
|
|
|
|
def _extract_source_references(self, agent_outputs: List[Dict[str, Any]]) -> List[str]: |
|
|
"""Extract source references from agent outputs""" |
|
|
sources = [] |
|
|
for output in agent_outputs: |
|
|
agent_id = output.get('agent_id', 'unknown') |
|
|
sources.append(agent_id) |
|
|
return list(set(sources)) |
|
|
|
|
|
def _format_agent_outputs_for_synthesis(self, agent_outputs: List[Dict[str, Any]]) -> str: |
|
|
"""Format agent outputs for LLM synthesis prompt""" |
|
|
formatted = [] |
|
|
for i, output in enumerate(agent_outputs, 1): |
|
|
agent_id = output.get('agent_id', 'unknown') |
|
|
content = output.get('result', output.get('final_response', str(output))) |
|
|
formatted.append(f"Agent {i} ({agent_id}): {content[:100]}...") |
|
|
return "\n".join(formatted) |
|
|
|
|
|
def _calculate_quality_metrics(self, synthesis_result: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Calculate quality metrics for synthesis""" |
|
|
response = synthesis_result.get('final_response', '') |
|
|
|
|
|
return { |
|
|
"length": len(response), |
|
|
"word_count": len(response.split()), |
|
|
"coherence_score": synthesis_result.get('coherence_score', 0.7), |
|
|
"source_count": len(synthesis_result.get('source_references', [])), |
|
|
"has_structured_elements": bool(re.search(r'[β’\d+\.]', response)) |
|
|
} |
|
|
|
|
|
def _check_intent_alignment(self, synthesis_result: Dict[str, Any], intent_info: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Check if synthesis aligns with detected intent""" |
|
|
alignment_score = 0.8 |
|
|
|
|
|
return { |
|
|
"intent_detected": intent_info.get('primary_intent'), |
|
|
"alignment_score": alignment_score, |
|
|
"alignment_verified": alignment_score > 0.7 |
|
|
} |
|
|
|
|
|
def _identify_improvements(self, response: str) -> List[str]: |
|
|
"""Identify opportunities to improve the response""" |
|
|
improvements = [] |
|
|
|
|
|
if len(response) < 50: |
|
|
improvements.append("Could be more detailed") |
|
|
|
|
|
if "?" not in response and len(response.split()) < 100: |
|
|
improvements.append("Consider adding examples") |
|
|
|
|
|
return improvements |
|
|
|
|
|
def _get_fallback_response(self, user_input: str, agent_outputs: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
"""Provide fallback response when synthesis fails""" |
|
|
return { |
|
|
"final_response": f"I apologize, but I'm having trouble generating a response. Your question was: {user_input[:100]}...", |
|
|
"draft_response": "", |
|
|
"source_references": [], |
|
|
"coherence_score": 0.3, |
|
|
"improvement_opportunities": ["System had synthesis error"], |
|
|
"synthesis_method": "fallback", |
|
|
"agent_id": self.agent_id, |
|
|
"synthesis_quality_metrics": {"error": "synthesis_failed"}, |
|
|
"intent_alignment": {"error": "not_available"}, |
|
|
"error_handled": True |
|
|
} |
|
|
|
|
|
|
|
|
def create_synthesis_agent(llm_router=None): |
|
|
return ResponseSynthesisAgent(llm_router) |
|
|
|
|
|
|