|
|
|
|
|
import uuid |
|
|
import logging |
|
|
import time |
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Optional |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import sys |
|
|
import os |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
parent_dir = os.path.dirname(current_dir) |
|
|
sys.path.insert(0, parent_dir) |
|
|
sys.path.insert(0, current_dir) |
|
|
|
|
|
try: |
|
|
from safety_threshold_matrix import should_trigger_user_choice |
|
|
from safety_user_choice import create_safety_choice_prompt, process_safety_choice |
|
|
from safety_choice_orchestrator import SafetyChoiceOrchestrator |
|
|
SAFETY_CHOICE_AVAILABLE = True |
|
|
logger.info("Safety choice modules loaded successfully") |
|
|
except ImportError as e: |
|
|
logger.warning(f"Safety choice modules not available: {e}") |
|
|
SAFETY_CHOICE_AVAILABLE = False |
|
|
|
|
|
class MVPOrchestrator: |
|
|
def __init__(self, llm_router, context_manager, agents): |
|
|
self.llm_router = llm_router |
|
|
self.context_manager = context_manager |
|
|
self.agents = agents |
|
|
self.execution_trace = [] |
|
|
|
|
|
self._topic_cache = {} |
|
|
self._topic_cache_max_size = 100 |
|
|
|
|
|
|
|
|
self.safety_thresholds = { |
|
|
"toxicity_or_harmful_language": 0.3, |
|
|
"potential_biases_or_stereotypes": 0.05, |
|
|
"privacy_or_security_concerns": 0.2, |
|
|
"controversial_or_sensitive_topics": 0.3 |
|
|
} |
|
|
self.max_revision_attempts = 2 |
|
|
self.revision_timeout = 30 |
|
|
|
|
|
|
|
|
self.awaiting_safety_response = {} |
|
|
self._pending_choices = {} |
|
|
|
|
|
|
|
|
self._current_user_id = {} |
|
|
|
|
|
|
|
|
self._context_cache = {} |
|
|
|
|
|
|
|
|
self.recent_queries = [] |
|
|
self.max_recent_queries = 50 |
|
|
|
|
|
|
|
|
self.agent_call_count = 0 |
|
|
self.response_metrics_history = [] |
|
|
|
|
|
|
|
|
self.context_classifier = None |
|
|
self._classifier_initialized = False |
|
|
|
|
|
logger.info("MVPOrchestrator initialized with safety revision thresholds") |
|
|
|
|
|
def set_user_id(self, session_id: str, user_id: str): |
|
|
"""Set user_id with loop prevention""" |
|
|
|
|
|
old_user_id = self._current_user_id.get(session_id) |
|
|
|
|
|
if old_user_id != user_id: |
|
|
self._current_user_id[session_id] = user_id |
|
|
logger.info(f"Set user_id={user_id} for session {session_id} (was: {old_user_id})") |
|
|
|
|
|
|
|
|
cache_key = f"context_{session_id}" |
|
|
if cache_key in self._context_cache: |
|
|
del self._context_cache[cache_key] |
|
|
logger.info(f"Cleared context cache for session {session_id} due to user change") |
|
|
else: |
|
|
self._current_user_id[session_id] = user_id |
|
|
|
|
|
def _get_user_id_for_session(self, session_id: str) -> str: |
|
|
"""Get user_id without triggering context loops""" |
|
|
|
|
|
if hasattr(self, '_current_user_id') and session_id in self._current_user_id: |
|
|
return self._current_user_id[session_id] |
|
|
|
|
|
|
|
|
return "Test_Any" |
|
|
|
|
|
async def _get_or_create_context(self, session_id: str, user_input: str, user_id: str) -> dict: |
|
|
"""Get context with loop prevention and caching""" |
|
|
|
|
|
cache_key = f"context_{session_id}" |
|
|
current_time = time.time() |
|
|
|
|
|
if hasattr(self, '_context_cache'): |
|
|
cached = self._context_cache.get(cache_key) |
|
|
if cached and (current_time - cached['timestamp']) < 5: |
|
|
logger.info(f"Using cached context for session {session_id}") |
|
|
return cached['context'] |
|
|
|
|
|
|
|
|
context = await self.context_manager.manage_context(session_id, user_input, user_id=user_id) |
|
|
|
|
|
|
|
|
if not hasattr(self, '_context_cache'): |
|
|
self._context_cache = {} |
|
|
|
|
|
self._context_cache[cache_key] = { |
|
|
'context': context, |
|
|
'timestamp': current_time |
|
|
} |
|
|
|
|
|
|
|
|
if len(self._context_cache) > 100: |
|
|
|
|
|
sorted_items = sorted(self._context_cache.items(), key=lambda x: x[1]['timestamp']) |
|
|
self._context_cache = dict(sorted_items[-50:]) |
|
|
|
|
|
return context |
|
|
|
|
|
async def process_request(self, session_id: str, user_input: str) -> dict: |
|
|
""" |
|
|
Main orchestration flow with loop prevention |
|
|
""" |
|
|
logger.info(f"Processing request for session {session_id}") |
|
|
logger.info(f"User input: {user_input[:100]}") |
|
|
|
|
|
|
|
|
user_input_upper = user_input.strip().upper() |
|
|
is_binary_response = user_input_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N'] |
|
|
|
|
|
|
|
|
if is_binary_response and self.awaiting_safety_response.get(session_id, False): |
|
|
logger.info(f"Binary safety response detected ({user_input_upper}) - bypassing recursive safety check") |
|
|
|
|
|
|
|
|
self.awaiting_safety_response[session_id] = False |
|
|
|
|
|
|
|
|
if hasattr(self, '_pending_choices'): |
|
|
self._pending_choices.pop(session_id, None) |
|
|
|
|
|
|
|
|
return { |
|
|
'is_safety_response': True, |
|
|
'response': user_input_upper, |
|
|
'requires_user_choice': False, |
|
|
'skip_safety_check': True, |
|
|
'final_response': f"Choice '{user_input_upper}' has been applied.", |
|
|
'bypass_reason': 'binary_safety_response' |
|
|
} |
|
|
|
|
|
|
|
|
self.execution_trace = [] |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
reasoning_chain = { |
|
|
"chain_of_thought": {}, |
|
|
"alternative_paths": [], |
|
|
"uncertainty_areas": [], |
|
|
"evidence_sources": [], |
|
|
"confidence_calibration": {} |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
similar_response = self.check_query_similarity(user_input, threshold=0.95) |
|
|
if similar_response: |
|
|
logger.info(f"Similar/duplicate query detected, using cached response") |
|
|
|
|
|
metrics_start = time.time() |
|
|
self.track_response_metrics(metrics_start, similar_response) |
|
|
return similar_response |
|
|
|
|
|
|
|
|
interaction_id = self._generate_interaction_id(session_id) |
|
|
logger.info(f"Generated interaction ID: {interaction_id}") |
|
|
|
|
|
|
|
|
logger.info("Step 2: Managing context with loop prevention...") |
|
|
|
|
|
|
|
|
user_id = self._get_user_id_for_session(session_id) |
|
|
|
|
|
|
|
|
base_context = await self._get_or_create_context(session_id, user_input, user_id) |
|
|
|
|
|
|
|
|
context_mode = 'fresh' |
|
|
try: |
|
|
if hasattr(self.context_manager, 'get_context_mode'): |
|
|
context_mode = self.context_manager.get_context_mode(session_id) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error getting context mode: {e}, using default 'fresh'") |
|
|
|
|
|
|
|
|
relevance_classification = None |
|
|
if context_mode == 'relevant': |
|
|
try: |
|
|
logger.info("Relevant context mode: Classifying and summarizing relevant sessions...") |
|
|
|
|
|
|
|
|
if not self._classifier_initialized: |
|
|
try: |
|
|
from src.context_relevance_classifier import ContextRelevanceClassifier |
|
|
self.context_classifier = ContextRelevanceClassifier(self.llm_router) |
|
|
self._classifier_initialized = True |
|
|
logger.info("Context relevance classifier initialized") |
|
|
except ImportError as e: |
|
|
logger.warning(f"Context relevance classifier not available: {e}") |
|
|
self._classifier_initialized = True |
|
|
|
|
|
|
|
|
if self.context_classifier: |
|
|
all_session_contexts = [] |
|
|
try: |
|
|
if hasattr(self.context_manager, 'get_all_user_sessions'): |
|
|
all_session_contexts = await self.context_manager.get_all_user_sessions(user_id) |
|
|
else: |
|
|
|
|
|
all_session_contexts = await self._get_all_user_sessions(user_id) |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching user sessions: {e}", exc_info=True) |
|
|
all_session_contexts = [] |
|
|
|
|
|
if all_session_contexts: |
|
|
|
|
|
relevance_classification = await self.context_classifier.classify_and_summarize_relevant_contexts( |
|
|
current_input=user_input, |
|
|
session_contexts=all_session_contexts, |
|
|
user_id=user_id |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"Relevance classification complete: " |
|
|
f"{len(relevance_classification.get('relevant_summaries', []))} sessions summarized, " |
|
|
f"topic: '{relevance_classification.get('topic', 'unknown')}', " |
|
|
f"time: {relevance_classification.get('processing_time', 0):.2f}s" |
|
|
) |
|
|
else: |
|
|
logger.info("No session contexts available for relevance classification") |
|
|
else: |
|
|
logger.debug("Context classifier not available, skipping relevance classification") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in relevance classification: {e}", exc_info=True) |
|
|
|
|
|
relevance_classification = None |
|
|
|
|
|
|
|
|
try: |
|
|
context = self.context_manager._optimize_context( |
|
|
base_context, |
|
|
relevance_classification=relevance_classification |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error optimizing context: {e}", exc_info=True) |
|
|
|
|
|
context = base_context |
|
|
|
|
|
interaction_contexts_count = len(context.get('interaction_contexts', [])) |
|
|
logger.info(f"Context retrieved: {interaction_contexts_count} interaction contexts, mode: {context_mode}") |
|
|
|
|
|
|
|
|
user_context = context.get('user_context', '') |
|
|
has_user_context = bool(user_context) |
|
|
|
|
|
|
|
|
main_topic = await self._extract_main_topic(user_input, context) |
|
|
topic_continuity = await self._analyze_topic_continuity(context, user_input) |
|
|
query_keywords = await self._extract_keywords(user_input) |
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_1"] = { |
|
|
"hypothesis": f"User is asking about: '{main_topic}'", |
|
|
"evidence": [ |
|
|
f"Previous interaction contexts: {interaction_contexts_count}", |
|
|
f"User context available: {has_user_context}", |
|
|
f"Session duration: {self._calculate_session_duration(context)}", |
|
|
f"Topic continuity: {topic_continuity}", |
|
|
f"Query keywords: {query_keywords}" |
|
|
], |
|
|
"confidence": 0.85, |
|
|
"reasoning": f"Context analysis shows user is focused on {main_topic} with {interaction_contexts_count} previous interaction contexts and {'existing' if has_user_context else 'new'} user context" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
use_parallel = getattr(self, '_parallel_processing_enabled', True) |
|
|
|
|
|
if use_parallel: |
|
|
logger.info("Step 3: Processing intent, skills, and safety in parallel...") |
|
|
parallel_results = await self.process_request_parallel(session_id, user_input, context) |
|
|
intent_result = parallel_results.get('intent', {}) |
|
|
skills_result = parallel_results.get('skills', {}) |
|
|
|
|
|
else: |
|
|
|
|
|
logger.info("Step 3: Recognizing intent...") |
|
|
self.execution_trace.append({ |
|
|
"step": "intent_recognition", |
|
|
"agent": "intent_recognition", |
|
|
"status": "executing" |
|
|
}) |
|
|
intent_result = await self.agents['intent_recognition'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"primary_intent": intent_result.get('primary_intent', 'unknown')} |
|
|
}) |
|
|
logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}") |
|
|
|
|
|
|
|
|
logger.info("Step 3.5: Identifying relevant skills...") |
|
|
self.execution_trace.append({ |
|
|
"step": "skills_identification", |
|
|
"agent": "skills_identification", |
|
|
"status": "executing" |
|
|
}) |
|
|
skills_result = await self.agents['skills_identification'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"skills_count": len(skills_result.get('identified_skills', []))} |
|
|
}) |
|
|
logger.info(f"Skills identified: {len(skills_result.get('identified_skills', []))} skills") |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_2_5"] = { |
|
|
"hypothesis": f"User input relates to {len(skills_result.get('identified_skills', []))} expert skills", |
|
|
"evidence": [ |
|
|
f"Market analysis: {skills_result.get('market_analysis', {}).get('overall_analysis', 'N/A')}", |
|
|
f"Skill classification: {skills_result.get('skill_classification', {}).get('classification_reasoning', 'N/A')}", |
|
|
f"High-probability skills: {[s.get('skill', '') for s in skills_result.get('identified_skills', [])[:3]]}", |
|
|
f"Confidence score: {skills_result.get('confidence_score', 0.5)}" |
|
|
], |
|
|
"confidence": skills_result.get('confidence_score', 0.5), |
|
|
"reasoning": f"Skills identification completed for topic '{main_topic}' with {len(skills_result.get('identified_skills', []))} relevant skills" |
|
|
} |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_2"] = { |
|
|
"hypothesis": f"User intent is '{intent_result.get('primary_intent', 'unknown')}' for topic '{main_topic}'", |
|
|
"evidence": [ |
|
|
f"Pattern analysis: {self._extract_pattern_evidence(user_input)}", |
|
|
f"Confidence scores: {intent_result.get('confidence_scores', {})}", |
|
|
f"Secondary intents: {intent_result.get('secondary_intents', [])}", |
|
|
f"Query complexity: {self._assess_query_complexity(user_input)}" |
|
|
], |
|
|
"confidence": intent_result.get('confidence_scores', {}).get(intent_result.get('primary_intent', 'unknown'), 0.7), |
|
|
"reasoning": f"Intent '{intent_result.get('primary_intent', 'unknown')}' detected for {main_topic} based on linguistic patterns and context" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 4: Creating execution plan...") |
|
|
execution_plan = await self._create_execution_plan(intent_result, context) |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_3"] = { |
|
|
"hypothesis": f"Optimal approach for '{intent_result.get('primary_intent', 'unknown')}' intent on '{main_topic}'", |
|
|
"evidence": [ |
|
|
f"Intent complexity: {self._assess_intent_complexity(intent_result)}", |
|
|
f"Required agents: {execution_plan.get('agents_to_execute', [])}", |
|
|
f"Execution strategy: {execution_plan.get('execution_order', 'sequential')}", |
|
|
f"Response scope: {self._determine_response_scope(user_input)}" |
|
|
], |
|
|
"confidence": 0.80, |
|
|
"reasoning": f"Agent selection optimized for {intent_result.get('primary_intent', 'unknown')} intent regarding {main_topic}" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 5: Executing agents...") |
|
|
agent_results = await self._execute_agents(execution_plan, user_input, context) |
|
|
logger.info(f"Agent execution complete: {len(agent_results)} results") |
|
|
|
|
|
|
|
|
logger.info("Step 6: Synthesizing response...") |
|
|
self.execution_trace.append({ |
|
|
"step": "response_synthesis", |
|
|
"agent": "response_synthesis", |
|
|
"status": "executing" |
|
|
}) |
|
|
final_response = await self.agents['response_synthesis'].execute( |
|
|
agent_outputs=agent_results, |
|
|
user_input=user_input, |
|
|
context=context, |
|
|
skills_result=skills_result |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')} |
|
|
}) |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_4"] = { |
|
|
"hypothesis": f"Response synthesis for '{main_topic}' using '{final_response.get('synthesis_method', 'unknown')}' method", |
|
|
"evidence": [ |
|
|
f"Synthesis quality: {final_response.get('coherence_score', 0.7)}", |
|
|
f"Source integration: {len(final_response.get('source_references', []))} sources", |
|
|
f"Response length: {len(str(final_response.get('final_response', '')))} characters", |
|
|
f"Content relevance: {self._assess_content_relevance(user_input, final_response)}" |
|
|
], |
|
|
"confidence": final_response.get('coherence_score', 0.7), |
|
|
"reasoning": f"Multi-source synthesis for {main_topic} using {final_response.get('synthesis_method', 'unknown')} approach" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 7: Safety check...") |
|
|
self.execution_trace.append({ |
|
|
"step": "safety_check", |
|
|
"agent": "safety_check", |
|
|
"status": "executing" |
|
|
}) |
|
|
safety_checked = await self.agents['safety_check'].execute( |
|
|
response=final_response, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"warnings": safety_checked.get('warnings', [])} |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
intent_class = intent_result.get('primary_intent', 'casual_conversation') |
|
|
response_content = final_response.get('final_response', '') or str(final_response.get('response', '')) |
|
|
|
|
|
|
|
|
if SAFETY_CHOICE_AVAILABLE: |
|
|
safety_analysis = safety_checked.get('safety_analysis', {}) |
|
|
|
|
|
|
|
|
if should_trigger_user_choice(safety_analysis, intent_class): |
|
|
logger.info(f"Safety concerns detected for intent '{intent_class}' - appending warnings to response") |
|
|
|
|
|
|
|
|
from safety_threshold_matrix import format_safety_concerns |
|
|
concerns_text = format_safety_concerns(safety_analysis, intent_class) |
|
|
|
|
|
if concerns_text: |
|
|
|
|
|
warning_section = f""" |
|
|
|
|
|
--- |
|
|
|
|
|
## ⚠️ Safety Advisory |
|
|
|
|
|
This response has been flagged for potential safety concerns: |
|
|
|
|
|
{concerns_text} |
|
|
|
|
|
**Please review this content carefully and consider:** |
|
|
- The potential impact on yourself and others |
|
|
- Whether this content aligns with your intended use |
|
|
- If additional verification or expert consultation is needed |
|
|
|
|
|
*This advisory is provided for transparency and user awareness. The response has not been modified.* |
|
|
""" |
|
|
|
|
|
response_content = response_content + warning_section |
|
|
|
|
|
|
|
|
final_response['final_response'] = response_content |
|
|
if 'response' in final_response: |
|
|
final_response['response'] = response_content |
|
|
|
|
|
|
|
|
|
|
|
safety_checked['safety_checked_response'] = response_content |
|
|
safety_checked['original_response'] = response_content |
|
|
|
|
|
logger.info("Safety warnings appended to response - no user choice prompted (feature paused)") |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_5"] = { |
|
|
"hypothesis": f"Safety validation for response about '{main_topic}'", |
|
|
"evidence": [ |
|
|
f"Safety score: {safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)}", |
|
|
f"Warnings generated: {len(safety_checked.get('warnings', []))}", |
|
|
f"Analysis method: {safety_checked.get('safety_analysis', {}).get('analysis_method', 'unknown')}", |
|
|
f"Content appropriateness: {self._assess_content_appropriateness(user_input, safety_checked)}" |
|
|
], |
|
|
"confidence": safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8), |
|
|
"reasoning": f"Safety analysis for {main_topic} content with non-blocking warning system" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if 'final_response' in final_response: |
|
|
final_response['final_response'] = response_content |
|
|
if 'response' in final_response: |
|
|
final_response['response'] = response_content |
|
|
|
|
|
|
|
|
reasoning_chain["alternative_paths"] = self._generate_alternative_paths(intent_result, user_input, main_topic) |
|
|
reasoning_chain["uncertainty_areas"] = self._identify_uncertainty_areas(intent_result, final_response, safety_checked) |
|
|
reasoning_chain["evidence_sources"] = self._extract_evidence_sources(intent_result, final_response, context) |
|
|
reasoning_chain["confidence_calibration"] = self._calibrate_confidence_scores(reasoning_chain) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
|
|
|
|
|
|
merged_response = { |
|
|
'final_response': response_content, |
|
|
'response': response_content, |
|
|
'safety_checked_response': response_content, |
|
|
'original_response': response_content, |
|
|
'warnings': safety_checked.get('warnings', []) |
|
|
} |
|
|
|
|
|
|
|
|
result = self._format_final_output(merged_response, interaction_id, { |
|
|
'intent': intent_result.get('primary_intent', 'unknown'), |
|
|
'execution_plan': execution_plan, |
|
|
'processing_steps': [ |
|
|
'Context management', |
|
|
'Intent recognition', |
|
|
'Skills identification', |
|
|
'Execution planning', |
|
|
'Agent execution', |
|
|
'Response synthesis', |
|
|
'Safety check' |
|
|
], |
|
|
'processing_time': processing_time, |
|
|
'agents_used': list(self.agents.keys()), |
|
|
'intent_result': intent_result, |
|
|
'skills_result': skills_result, |
|
|
'synthesis_result': final_response, |
|
|
'reasoning_chain': reasoning_chain |
|
|
}) |
|
|
|
|
|
|
|
|
response_text = str(result.get('response', '')) |
|
|
user_id = getattr(self, '_current_user_id', {}).get(session_id, "Test_Any") |
|
|
if response_text: |
|
|
self.context_manager._update_context(context, user_input, response_text, user_id=user_id) |
|
|
|
|
|
|
|
|
interaction_id = result.get('interaction_id', f"{session_id}_{int(time.time())}") |
|
|
try: |
|
|
await self.context_manager.generate_interaction_context( |
|
|
interaction_id=interaction_id, |
|
|
session_id=session_id, |
|
|
user_input=user_input, |
|
|
system_response=response_text, |
|
|
user_id=user_id |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
await self.context_manager.generate_session_context(session_id, user_id) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating session context: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
if hasattr(self, '_context_cache'): |
|
|
orchestrator_cache_key = f"context_{session_id}" |
|
|
if orchestrator_cache_key in self._context_cache: |
|
|
del self._context_cache[orchestrator_cache_key] |
|
|
logger.debug(f"Orchestrator cache cleared for session {session_id} to refresh with updated contexts") |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating interaction context: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
self.track_response_metrics(start_time, result) |
|
|
|
|
|
|
|
|
self.recent_queries.append({ |
|
|
'query': user_input, |
|
|
'response': result, |
|
|
'timestamp': time.time() |
|
|
}) |
|
|
|
|
|
if len(self.recent_queries) > self.max_recent_queries: |
|
|
self.recent_queries = self.recent_queries[-self.max_recent_queries:] |
|
|
|
|
|
logger.info(f"Request processing complete. Response length: {len(response_text)}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in process_request: {e}", exc_info=True) |
|
|
processing_time = time.time() - start_time |
|
|
return { |
|
|
"response": f"Error processing request: {str(e)}", |
|
|
"error": str(e), |
|
|
"interaction_id": str(uuid.uuid4())[:8], |
|
|
"agent_trace": [], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"metadata": { |
|
|
"agents_used": [], |
|
|
"processing_time": processing_time, |
|
|
"token_count": 0, |
|
|
"warnings": [] |
|
|
} |
|
|
} |
|
|
|
|
|
def _generate_interaction_id(self, session_id: str) -> str: |
|
|
""" |
|
|
Generate unique interaction identifier |
|
|
""" |
|
|
timestamp = datetime.now().isoformat() |
|
|
unique_id = str(uuid.uuid4())[:8] |
|
|
return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}" |
|
|
|
|
|
async def _get_all_user_sessions(self, user_id: str) -> List[Dict]: |
|
|
""" |
|
|
Fetch all session contexts for relevance classification |
|
|
Fallback method if context_manager doesn't have it |
|
|
|
|
|
Args: |
|
|
user_id: User identifier |
|
|
|
|
|
Returns: |
|
|
List of session context dictionaries |
|
|
""" |
|
|
try: |
|
|
|
|
|
if hasattr(self.context_manager, 'get_all_user_sessions'): |
|
|
return await self.context_manager.get_all_user_sessions(user_id) |
|
|
|
|
|
|
|
|
import sqlite3 |
|
|
db_path = getattr(self.context_manager, 'db_path', 'sessions.db') |
|
|
|
|
|
conn = sqlite3.connect(db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT DISTINCT |
|
|
sc.session_id, |
|
|
sc.session_summary, |
|
|
sc.created_at, |
|
|
(SELECT GROUP_CONCAT(ic.interaction_summary, ' ||| ') |
|
|
FROM interaction_contexts ic |
|
|
WHERE ic.session_id = sc.session_id |
|
|
ORDER BY ic.created_at DESC |
|
|
LIMIT 10) as recent_interactions |
|
|
FROM session_contexts sc |
|
|
JOIN sessions s ON sc.session_id = s.session_id |
|
|
WHERE s.user_id = ? |
|
|
ORDER BY sc.created_at DESC |
|
|
LIMIT 50 |
|
|
""", (user_id,)) |
|
|
|
|
|
sessions = [] |
|
|
for row in cursor.fetchall(): |
|
|
session_id, session_summary, created_at, interactions_str = row |
|
|
|
|
|
interaction_list = [] |
|
|
if interactions_str: |
|
|
for summary in interactions_str.split(' ||| '): |
|
|
if summary.strip(): |
|
|
interaction_list.append({ |
|
|
'summary': summary.strip(), |
|
|
'timestamp': created_at |
|
|
}) |
|
|
|
|
|
sessions.append({ |
|
|
'session_id': session_id, |
|
|
'summary': session_summary or '', |
|
|
'created_at': created_at, |
|
|
'interaction_contexts': interaction_list |
|
|
}) |
|
|
|
|
|
conn.close() |
|
|
return sessions |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching user sessions: {e}", exc_info=True) |
|
|
return [] |
|
|
|
|
|
async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict: |
|
|
""" |
|
|
Create execution plan based on intent recognition |
|
|
Maps intent types to specific execution tasks |
|
|
""" |
|
|
primary_intent = intent_result.get('primary_intent', 'casual_conversation') |
|
|
secondary_intents = intent_result.get('secondary_intents', []) |
|
|
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.7) |
|
|
|
|
|
|
|
|
intent_task_mapping = { |
|
|
"information_request": { |
|
|
"tasks": ["information_gathering", "content_research"], |
|
|
"execution_order": "sequential", |
|
|
"priority": "high" |
|
|
}, |
|
|
"task_execution": { |
|
|
"tasks": ["task_planning", "execution_strategy"], |
|
|
"execution_order": "sequential", |
|
|
"priority": "high" |
|
|
}, |
|
|
"creative_generation": { |
|
|
"tasks": ["creative_brainstorming", "content_ideation"], |
|
|
"execution_order": "parallel", |
|
|
"priority": "normal" |
|
|
}, |
|
|
"analysis_research": { |
|
|
"tasks": ["research_analysis", "data_collection", "pattern_identification"], |
|
|
"execution_order": "sequential", |
|
|
"priority": "high" |
|
|
}, |
|
|
"troubleshooting": { |
|
|
"tasks": ["problem_analysis", "solution_research"], |
|
|
"execution_order": "sequential", |
|
|
"priority": "high" |
|
|
}, |
|
|
"education_learning": { |
|
|
"tasks": ["curriculum_planning", "educational_content"], |
|
|
"execution_order": "sequential", |
|
|
"priority": "normal" |
|
|
}, |
|
|
"technical_support": { |
|
|
"tasks": ["technical_research", "guidance_generation"], |
|
|
"execution_order": "sequential", |
|
|
"priority": "high" |
|
|
}, |
|
|
"casual_conversation": { |
|
|
"tasks": ["context_enrichment"], |
|
|
"execution_order": "parallel", |
|
|
"priority": "low" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
plan = intent_task_mapping.get(primary_intent, { |
|
|
"tasks": ["general_research"], |
|
|
"execution_order": "parallel", |
|
|
"priority": "normal" |
|
|
}) |
|
|
|
|
|
|
|
|
if confidence > 0.7 and secondary_intents: |
|
|
for secondary_intent in secondary_intents[:2]: |
|
|
secondary_plan = intent_task_mapping.get(secondary_intent) |
|
|
if secondary_plan: |
|
|
|
|
|
existing_tasks = set(plan["tasks"]) |
|
|
for task in secondary_plan["tasks"]: |
|
|
if task not in existing_tasks: |
|
|
plan["tasks"].append(task) |
|
|
existing_tasks.add(task) |
|
|
|
|
|
logger.info(f"Execution plan created for intent '{primary_intent}': {len(plan['tasks'])} tasks, order={plan['execution_order']}") |
|
|
|
|
|
return { |
|
|
"agents_to_execute": plan["tasks"], |
|
|
"execution_order": plan["execution_order"], |
|
|
"priority": plan["priority"], |
|
|
"primary_intent": primary_intent, |
|
|
"secondary_intents": secondary_intents |
|
|
} |
|
|
|
|
|
async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict: |
|
|
""" |
|
|
Execute agents in parallel or sequential order based on plan |
|
|
Actually executes task-specific LLM calls based on intent |
|
|
""" |
|
|
tasks = execution_plan.get("agents_to_execute", []) |
|
|
execution_order = execution_plan.get("execution_order", "parallel") |
|
|
primary_intent = execution_plan.get("primary_intent", "casual_conversation") |
|
|
|
|
|
if not tasks: |
|
|
logger.warning("No tasks to execute in execution plan") |
|
|
return {} |
|
|
|
|
|
logger.info(f"Executing {len(tasks)} tasks in {execution_order} order for intent '{primary_intent}'") |
|
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
context_summary = self._build_context_summary(context) |
|
|
|
|
|
|
|
|
task_prompts = self._build_task_prompts(user_input, context_summary, primary_intent) |
|
|
|
|
|
if execution_order == "parallel": |
|
|
|
|
|
task_coroutines = [] |
|
|
for task in tasks: |
|
|
if task in task_prompts: |
|
|
coro = self._execute_single_task(task, task_prompts[task]) |
|
|
task_coroutines.append((task, coro)) |
|
|
else: |
|
|
logger.warning(f"No prompt template for task: {task}") |
|
|
|
|
|
|
|
|
if task_coroutines: |
|
|
task_results = await asyncio.gather( |
|
|
*[coro for _, coro in task_coroutines], |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
for (task, _), result in zip(task_coroutines, task_results): |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Task {task} failed: {result}") |
|
|
results[task] = {"error": str(result), "status": "failed"} |
|
|
else: |
|
|
results[task] = result |
|
|
logger.info(f"Task {task} completed: {len(str(result))} chars") |
|
|
else: |
|
|
|
|
|
previous_results = {} |
|
|
for task in tasks: |
|
|
if task in task_prompts: |
|
|
|
|
|
enhanced_prompt = task_prompts[task] |
|
|
if previous_results: |
|
|
enhanced_prompt += f"\n\nPrevious task results: {str(previous_results)}" |
|
|
|
|
|
try: |
|
|
result = await self._execute_single_task(task, enhanced_prompt) |
|
|
results[task] = result |
|
|
previous_results[task] = result |
|
|
logger.info(f"Task {task} completed: {len(str(result))} chars") |
|
|
except Exception as e: |
|
|
logger.error(f"Task {task} failed: {e}") |
|
|
results[task] = {"error": str(e), "status": "failed"} |
|
|
previous_results[task] = results[task] |
|
|
else: |
|
|
logger.warning(f"No prompt template for task: {task}") |
|
|
|
|
|
logger.info(f"Agent execution complete: {len(results)} results collected") |
|
|
return results |
|
|
|
|
|
def _build_context_summary(self, context: dict) -> str: |
|
|
"""Build a concise summary of context for task execution (all from cache)""" |
|
|
summary_parts = [] |
|
|
|
|
|
|
|
|
session_context = context.get('session_context', {}) |
|
|
session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
|
|
if session_summary: |
|
|
summary_parts.append(f"Session summary: {session_summary[:1500]}") |
|
|
|
|
|
|
|
|
interaction_contexts = context.get('interaction_contexts', []) |
|
|
if interaction_contexts: |
|
|
recent_summaries = [ic.get('summary', '') for ic in interaction_contexts[-3:]] |
|
|
if recent_summaries: |
|
|
summary_parts.append(f"Recent conversation topics: {', '.join(recent_summaries)}") |
|
|
|
|
|
|
|
|
user_context = context.get('user_context', '') |
|
|
if user_context: |
|
|
summary_parts.append(f"User background: {user_context[:200]}") |
|
|
|
|
|
return " | ".join(summary_parts) if summary_parts else "No prior context" |
|
|
|
|
|
async def process_agents_parallel(self, request: Dict) -> List: |
|
|
""" |
|
|
Step 1: Optimize Agent Chain - Process multiple agents in parallel |
|
|
|
|
|
Args: |
|
|
request: Dictionary containing request data with 'user_input' and 'context' |
|
|
|
|
|
Returns: |
|
|
List of agent results in order [intent_result, skills_result] |
|
|
""" |
|
|
user_input = request.get('user_input', '') |
|
|
context = request.get('context', {}) |
|
|
|
|
|
|
|
|
self.agent_call_count += 2 |
|
|
|
|
|
tasks = [ |
|
|
self.agents['intent_recognition'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
), |
|
|
self.agents['skills_identification'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
), |
|
|
] |
|
|
|
|
|
try: |
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
processed_results = [] |
|
|
for idx, result in enumerate(results): |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Agent task {idx} failed: {result}") |
|
|
processed_results.append({}) |
|
|
else: |
|
|
processed_results.append(result) |
|
|
return processed_results |
|
|
except Exception as e: |
|
|
logger.error(f"Error in parallel agent processing: {e}", exc_info=True) |
|
|
return [{}, {}] |
|
|
|
|
|
async def process_request_parallel(self, session_id: str, user_input: str, context: Dict) -> Dict: |
|
|
"""Process intent, skills, and safety in parallel""" |
|
|
|
|
|
|
|
|
try: |
|
|
intent_task = self.agents['intent_recognition'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
|
|
|
skills_task = self.agents['skills_identification'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
|
|
|
|
|
|
safety_task = self.agents['safety_check'].execute( |
|
|
response=user_input, |
|
|
context=context |
|
|
) |
|
|
|
|
|
|
|
|
self.agent_call_count += 3 |
|
|
|
|
|
|
|
|
results = await asyncio.gather( |
|
|
intent_task, |
|
|
skills_task, |
|
|
safety_task, |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
intent_result = results[0] if not isinstance(results[0], Exception) else {} |
|
|
skills_result = results[1] if not isinstance(results[1], Exception) else {} |
|
|
safety_result = results[2] if not isinstance(results[2], Exception) else {} |
|
|
|
|
|
|
|
|
if isinstance(results[0], Exception): |
|
|
logger.error(f"Intent recognition error: {results[0]}") |
|
|
if isinstance(results[1], Exception): |
|
|
logger.error(f"Skills identification error: {results[1]}") |
|
|
if isinstance(results[2], Exception): |
|
|
logger.error(f"Safety check error: {results[2]}") |
|
|
|
|
|
return { |
|
|
'intent': intent_result, |
|
|
'skills': skills_result, |
|
|
'safety_precheck': safety_result |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in parallel processing: {e}", exc_info=True) |
|
|
|
|
|
return { |
|
|
'intent': await self.agents['intent_recognition'].execute(user_input=user_input, context=context), |
|
|
'skills': await self.agents['skills_identification'].execute(user_input=user_input, context=context), |
|
|
'safety_precheck': {} |
|
|
} |
|
|
|
|
|
def _build_enhanced_context(self, session_id: str, prior_interactions: List[Dict]) -> Dict: |
|
|
"""Build enhanced context with memory accumulation""" |
|
|
|
|
|
|
|
|
context = { |
|
|
'session_memory': [], |
|
|
'user_preferences': {}, |
|
|
'interaction_patterns': {}, |
|
|
'skills_used': set() |
|
|
} |
|
|
|
|
|
|
|
|
for idx, interaction in enumerate(prior_interactions): |
|
|
weight = 1.0 / (idx + 1) |
|
|
|
|
|
|
|
|
if 'skills' in interaction: |
|
|
for skill in interaction['skills']: |
|
|
if isinstance(skill, dict): |
|
|
context['skills_used'].add(skill.get('name', skill.get('skill', ''))) |
|
|
elif isinstance(skill, str): |
|
|
context['skills_used'].add(skill) |
|
|
|
|
|
|
|
|
if 'intent' in interaction: |
|
|
intent = interaction['intent'] |
|
|
if intent not in context['interaction_patterns']: |
|
|
context['interaction_patterns'][intent] = 0 |
|
|
context['interaction_patterns'][intent] += weight |
|
|
|
|
|
|
|
|
if idx < 3: |
|
|
context['session_memory'].append({ |
|
|
'summary': interaction.get('summary', ''), |
|
|
'timestamp': interaction.get('timestamp'), |
|
|
'relevance': weight |
|
|
}) |
|
|
|
|
|
|
|
|
context['skills_used'] = list(context['skills_used']) |
|
|
|
|
|
return context |
|
|
|
|
|
def _build_task_prompts(self, user_input: str, context_summary: str, primary_intent: str) -> dict: |
|
|
"""Build task-specific prompts for execution""" |
|
|
|
|
|
base_context = f"User Query: {user_input}\nContext: {context_summary}" |
|
|
|
|
|
prompts = { |
|
|
"information_gathering": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Gather comprehensive, accurate information relevant to the user's query. |
|
|
Focus on facts, definitions, explanations, and verified information. |
|
|
Structure the information clearly and cite key points. |
|
|
""", |
|
|
|
|
|
"content_research": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Research and compile detailed content about the topic. |
|
|
Include multiple perspectives, current information, and relevant examples. |
|
|
Organize findings logically with clear sections. |
|
|
""", |
|
|
|
|
|
"task_planning": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Create a detailed execution plan for the requested task. |
|
|
Break down into clear steps, identify requirements, and outline expected outcomes. |
|
|
Consider potential challenges and solutions. |
|
|
""", |
|
|
|
|
|
"execution_strategy": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Develop a strategic approach for task execution. |
|
|
Define methodology, best practices, and implementation considerations. |
|
|
Provide actionable guidance with clear priorities. |
|
|
""", |
|
|
|
|
|
"creative_brainstorming": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Generate creative ideas and approaches for content creation. |
|
|
Explore different angles, styles, and formats. |
|
|
Provide diverse creative options with implementation suggestions. |
|
|
""", |
|
|
|
|
|
"content_ideation": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Develop content concepts and detailed ideation. |
|
|
Create outlines, themes, and structural frameworks. |
|
|
Suggest variations and refinement paths. |
|
|
""", |
|
|
|
|
|
"research_analysis": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Conduct thorough research analysis on the topic. |
|
|
Identify key findings, trends, patterns, and insights. |
|
|
Analyze different perspectives and methodologies. |
|
|
""", |
|
|
|
|
|
"data_collection": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Collect and organize relevant data points and evidence. |
|
|
Gather statistics, examples, case studies, and supporting information. |
|
|
Structure data for easy analysis and reference. |
|
|
""", |
|
|
|
|
|
"pattern_identification": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Identify patterns, correlations, and significant relationships. |
|
|
Analyze trends, cause-effect relationships, and underlying structures. |
|
|
Provide insights based on pattern recognition. |
|
|
""", |
|
|
|
|
|
"problem_analysis": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Analyze the problem in detail. |
|
|
Identify root causes, contributing factors, and constraints. |
|
|
Break down the problem into components for systematic resolution. |
|
|
""", |
|
|
|
|
|
"solution_research": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Research and evaluate potential solutions. |
|
|
Compare approaches, assess pros/cons, and recommend best practices. |
|
|
Consider implementation feasibility and effectiveness. |
|
|
""", |
|
|
|
|
|
"curriculum_planning": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Design educational curriculum and learning path. |
|
|
Structure content progressively, define learning objectives, and suggest resources. |
|
|
Create a comprehensive learning framework. |
|
|
""", |
|
|
|
|
|
"educational_content": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Generate educational content with clear explanations. |
|
|
Use teaching methods, examples, analogies, and progressive complexity. |
|
|
Make content accessible and engaging for learning. |
|
|
""", |
|
|
|
|
|
"technical_research": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Research technical aspects and solutions. |
|
|
Gather technical documentation, best practices, and implementation details. |
|
|
Structure technical information clearly with practical guidance. |
|
|
""", |
|
|
|
|
|
"guidance_generation": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Generate step-by-step guidance and instructions. |
|
|
Create clear, actionable steps with explanations and troubleshooting tips. |
|
|
Ensure guidance is comprehensive and easy to follow. |
|
|
""", |
|
|
|
|
|
"context_enrichment": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Enrich the conversation with relevant context and insights. |
|
|
Add helpful background information, connections to previous topics, and engaging details. |
|
|
Enhance understanding and engagement. |
|
|
""", |
|
|
|
|
|
"general_research": f""" |
|
|
{base_context} |
|
|
|
|
|
Task: Conduct general research and information gathering. |
|
|
Compile relevant information, insights, and useful details about the topic. |
|
|
Organize findings for clear presentation. |
|
|
""" |
|
|
} |
|
|
|
|
|
return prompts |
|
|
|
|
|
async def _execute_single_task(self, task_name: str, prompt: str) -> dict: |
|
|
"""Execute a single task using the LLM router""" |
|
|
try: |
|
|
logger.debug(f"Executing task: {task_name}") |
|
|
logger.debug(f"Task prompt length: {len(prompt)}") |
|
|
|
|
|
|
|
|
result = await self.llm_router.route_inference( |
|
|
task_type="general_reasoning", |
|
|
prompt=prompt, |
|
|
max_tokens=2000, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
if result: |
|
|
return { |
|
|
"task": task_name, |
|
|
"status": "completed", |
|
|
"content": result, |
|
|
"content_length": len(str(result)) |
|
|
} |
|
|
else: |
|
|
logger.warning(f"Task {task_name} returned empty result") |
|
|
return { |
|
|
"task": task_name, |
|
|
"status": "empty", |
|
|
"content": "", |
|
|
"content_length": 0 |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error executing task {task_name}: {e}", exc_info=True) |
|
|
return { |
|
|
"task": task_name, |
|
|
"status": "error", |
|
|
"error": str(e), |
|
|
"content": "" |
|
|
} |
|
|
|
|
|
def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict: |
|
|
""" |
|
|
Format final output with tracing and metadata |
|
|
""" |
|
|
|
|
|
response_text = ( |
|
|
response.get("final_response") or |
|
|
response.get("safety_checked_response") or |
|
|
response.get("original_response") or |
|
|
response.get("response") or |
|
|
str(response.get("result", "")) |
|
|
) |
|
|
|
|
|
if not response_text: |
|
|
response_text = "I apologize, but I'm having trouble generating a response right now. Please try again." |
|
|
|
|
|
|
|
|
warnings = [] |
|
|
if "warnings" in response: |
|
|
warnings = response["warnings"] if isinstance(response["warnings"], list) else [] |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"agents_used": response.get("agents_used", []), |
|
|
"processing_time": response.get("processing_time", 0), |
|
|
"token_count": response.get("token_count", 0), |
|
|
"warnings": warnings |
|
|
} |
|
|
|
|
|
|
|
|
if additional_metadata: |
|
|
metadata.update(additional_metadata) |
|
|
|
|
|
return { |
|
|
"interaction_id": interaction_id, |
|
|
"response": response_text, |
|
|
"final_response": response_text, |
|
|
"confidence_score": response.get("confidence_score", 0.7), |
|
|
"agent_trace": self.execution_trace if self.execution_trace else [ |
|
|
{"step": "complete", "agent": "orchestrator", "status": "completed"} |
|
|
], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"metadata": metadata |
|
|
} |
|
|
|
|
|
async def handle_user_safety_decision(self, choice_id: str, user_decision: bool, session_id: str = None) -> dict: |
|
|
""" |
|
|
Handle user's safety decision and complete processing |
|
|
|
|
|
Args: |
|
|
choice_id: The choice identifier from the prompt |
|
|
user_decision: True for revision, False for original with warnings |
|
|
session_id: Session identifier |
|
|
|
|
|
Returns: |
|
|
dict: Final response based on user choice |
|
|
""" |
|
|
try: |
|
|
|
|
|
if session_id: |
|
|
self.awaiting_safety_response[session_id] = False |
|
|
|
|
|
if not SAFETY_CHOICE_AVAILABLE: |
|
|
logger.warning("Safety choice modules not available") |
|
|
return {'error': 'Safety choice system not available'} |
|
|
|
|
|
choice_result = process_safety_choice(choice_id, user_decision) |
|
|
|
|
|
if 'error' in choice_result: |
|
|
logger.error(f"Error processing safety choice: {choice_result['error']}") |
|
|
return choice_result |
|
|
|
|
|
if choice_result['action'] == 'proceed_with_revision': |
|
|
logger.info("User chose revision - applying safety revisions") |
|
|
|
|
|
safety_issues = choice_result['safety_analysis'].get('detected_issues', []) |
|
|
safety_scores = choice_result['safety_analysis'].get('safety_scores', {}) |
|
|
|
|
|
if not safety_scores: |
|
|
confidence_scores = choice_result['safety_analysis'].get('confidence_scores', {}) |
|
|
if confidence_scores: |
|
|
exceeded_categories = [] |
|
|
if confidence_scores.get('toxicity', 0) > 0.3: |
|
|
exceeded_categories.append('toxicity_or_harmful_language') |
|
|
if confidence_scores.get('bias', 0) > 0.05: |
|
|
exceeded_categories.append('potential_biases_or_stereotypes') |
|
|
if confidence_scores.get('privacy', 0) > 0.2: |
|
|
exceeded_categories.append('privacy_or_security_concerns') |
|
|
else: |
|
|
exceeded_categories = [k for k, v in safety_scores.items() if isinstance(v, (int, float)) and v > 0.3] |
|
|
|
|
|
revision_prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. |
|
|
|
|
|
Original Response: {choice_result['original_response']} |
|
|
|
|
|
Safety Issues Detected: {', '.join(exceeded_categories) if exceeded_categories else 'General safety concerns'} |
|
|
Specific Warnings: {'; '.join(safety_issues) if safety_issues else 'General safety concerns detected'} |
|
|
|
|
|
Please revise the response to address these concerns while maintaining helpfulness and accuracy. |
|
|
""" |
|
|
|
|
|
revised_result = await self.agents['response_synthesis'].execute( |
|
|
agent_outputs={}, |
|
|
user_input=revision_prompt, |
|
|
context={} |
|
|
) |
|
|
|
|
|
revised_response = revised_result.get('final_response', choice_result['original_response']) |
|
|
|
|
|
return { |
|
|
'response': revised_response, |
|
|
'final_response': revised_response, |
|
|
'safety_analysis': choice_result['safety_analysis'], |
|
|
'user_choice': 'revision', |
|
|
'revision_applied': True, |
|
|
'interaction_id': str(uuid.uuid4())[:8], |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
elif choice_result['action'] == 'use_original_with_warnings': |
|
|
logger.info("User chose original response with safety warnings") |
|
|
|
|
|
return { |
|
|
'response': choice_result['response_content'], |
|
|
'final_response': choice_result['response_content'], |
|
|
'safety_analysis': choice_result['safety_analysis'], |
|
|
'user_choice': 'original_with_warnings', |
|
|
'revision_applied': False, |
|
|
'interaction_id': str(uuid.uuid4())[:8], |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
else: |
|
|
logger.error(f"Unknown action: {choice_result['action']}") |
|
|
return {'error': f"Unknown action: {choice_result['action']}"} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error handling user safety decision: {e}", exc_info=True) |
|
|
return {'error': str(e)} |
|
|
|
|
|
def get_execution_trace(self) -> list: |
|
|
""" |
|
|
Return execution trace for debugging and analysis |
|
|
""" |
|
|
return self.execution_trace |
|
|
|
|
|
def clear_execution_trace(self): |
|
|
""" |
|
|
Clear the execution trace |
|
|
""" |
|
|
self.execution_trace = [] |
|
|
|
|
|
def _calculate_session_duration(self, context: dict) -> str: |
|
|
"""Calculate session duration for reasoning context""" |
|
|
interaction_contexts = context.get('interaction_contexts', []) |
|
|
if not interaction_contexts: |
|
|
return "New session" |
|
|
|
|
|
|
|
|
interaction_count = len(interaction_contexts) |
|
|
if interaction_count < 5: |
|
|
return "Short session (< 5 interactions)" |
|
|
elif interaction_count < 20: |
|
|
return "Medium session (5-20 interactions)" |
|
|
else: |
|
|
return "Long session (> 20 interactions)" |
|
|
|
|
|
async def _analyze_topic_continuity(self, context: dict, user_input: str) -> str: |
|
|
"""Analyze topic continuity using LLM zero-shot classification (uses session context and interaction contexts from cache)""" |
|
|
try: |
|
|
|
|
|
session_context = context.get('session_context', {}) |
|
|
session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
|
|
|
|
|
interaction_contexts = context.get('interaction_contexts', []) |
|
|
if not interaction_contexts and not session_summary: |
|
|
return "No previous context" |
|
|
|
|
|
|
|
|
recent_interactions_summary = "\n".join([ |
|
|
f"- {ic.get('summary', '')}" |
|
|
for ic in interaction_contexts[:3] |
|
|
if ic.get('summary') |
|
|
]) |
|
|
|
|
|
|
|
|
if self.llm_router: |
|
|
prompt = f"""Determine if the current query continues the previous conversation topic or introduces a new topic. |
|
|
|
|
|
Session Summary: {session_summary[:300] if session_summary else 'No session summary available'} |
|
|
|
|
|
Recent Interactions: |
|
|
{recent_interactions_summary if recent_interactions_summary else 'No recent interactions'} |
|
|
|
|
|
Current Query: "{user_input}" |
|
|
|
|
|
Analyze whether the current query: |
|
|
1. Continues the same topic from previous interactions |
|
|
2. Introduces a new topic |
|
|
|
|
|
Respond with EXACTLY one of these formats: |
|
|
- "Continuing [topic name] discussion" if same topic |
|
|
- "New topic: [topic name]" if different topic |
|
|
|
|
|
Keep topic name to 2-5 words. Example responses: |
|
|
- "Continuing machine learning discussion" |
|
|
- "New topic: financial analysis" |
|
|
- "Continuing software development discussion" |
|
|
""" |
|
|
|
|
|
continuity_result = await self.llm_router.route_inference( |
|
|
task_type="general_reasoning", |
|
|
prompt=prompt, |
|
|
max_tokens=50, |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
if continuity_result and isinstance(continuity_result, str) and continuity_result.strip(): |
|
|
result = continuity_result.strip() |
|
|
|
|
|
if "Continuing" in result or "New topic:" in result: |
|
|
logger.debug(f"Topic continuity analysis: {result}") |
|
|
return result |
|
|
|
|
|
|
|
|
if not session_summary and not recent_interactions_summary: |
|
|
return "No previous context" |
|
|
return "Topic continuity analysis unavailable" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in LLM-based topic continuity analysis: {e}", exc_info=True) |
|
|
|
|
|
return "Topic continuity analysis failed" |
|
|
|
|
|
def _extract_pattern_evidence(self, user_input: str) -> str: |
|
|
"""Extract pattern evidence for intent reasoning""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'which']): |
|
|
return "Question pattern detected" |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['please', 'can you', 'could you', 'help me']): |
|
|
return "Request pattern detected" |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['explain', 'describe', 'tell me about']): |
|
|
return "Explanation pattern detected" |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['analyze', 'compare', 'evaluate', 'assess']): |
|
|
return "Analysis pattern detected" |
|
|
|
|
|
return "General conversational pattern" |
|
|
|
|
|
def _assess_intent_complexity(self, intent_result: dict) -> str: |
|
|
"""Assess intent complexity for reasoning""" |
|
|
primary_intent = intent_result.get('primary_intent', 'unknown') |
|
|
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
|
|
secondary_intents = intent_result.get('secondary_intents', []) |
|
|
|
|
|
if confidence > 0.8 and len(secondary_intents) == 0: |
|
|
return "Simple, clear intent" |
|
|
elif confidence > 0.7 and len(secondary_intents) <= 1: |
|
|
return "Moderate complexity" |
|
|
else: |
|
|
return "Complex, multi-faceted intent" |
|
|
|
|
|
def _generate_alternative_paths(self, intent_result: dict, user_input: str, main_topic: str) -> list: |
|
|
"""Generate alternative reasoning paths based on actual content""" |
|
|
primary_intent = intent_result.get('primary_intent', 'unknown') |
|
|
secondary_intents = intent_result.get('secondary_intents', []) |
|
|
|
|
|
alternative_paths = [] |
|
|
|
|
|
|
|
|
for secondary_intent in secondary_intents: |
|
|
alternative_paths.append({ |
|
|
"path": f"Alternative intent: {secondary_intent} for {main_topic}", |
|
|
"reasoning": f"Could interpret as {secondary_intent} based on linguistic patterns in the query about {main_topic}", |
|
|
"confidence": intent_result.get('confidence_scores', {}).get(secondary_intent, 0.3), |
|
|
"rejected_reason": f"Primary intent '{primary_intent}' has higher confidence for {main_topic} topic" |
|
|
}) |
|
|
|
|
|
|
|
|
if 'curriculum' in user_input.lower() or 'course' in user_input.lower(): |
|
|
alternative_paths.append({ |
|
|
"path": "Structured educational framework approach", |
|
|
"reasoning": f"Could provide a more structured educational framework for {main_topic}", |
|
|
"confidence": 0.6, |
|
|
"rejected_reason": f"Current approach better matches user's specific request for {main_topic}" |
|
|
}) |
|
|
|
|
|
if 'detailed' in user_input.lower() or 'comprehensive' in user_input.lower(): |
|
|
alternative_paths.append({ |
|
|
"path": "High-level overview approach", |
|
|
"reasoning": f"Could provide a high-level overview instead of detailed content for {main_topic}", |
|
|
"confidence": 0.4, |
|
|
"rejected_reason": f"User specifically requested detailed information about {main_topic}" |
|
|
}) |
|
|
|
|
|
return alternative_paths |
|
|
|
|
|
def _identify_uncertainty_areas(self, intent_result: dict, final_response: dict, safety_checked: dict) -> list: |
|
|
"""Identify areas of uncertainty in the reasoning based on actual content""" |
|
|
uncertainty_areas = [] |
|
|
|
|
|
|
|
|
primary_intent = intent_result.get('primary_intent', 'unknown') |
|
|
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
|
|
if confidence < 0.8: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": f"Intent classification ({primary_intent}) for user's specific request", |
|
|
"confidence": confidence, |
|
|
"mitigation": "Provided multiple interpretation options and context-aware analysis" |
|
|
}) |
|
|
|
|
|
|
|
|
coherence_score = final_response.get('coherence_score', 0.7) |
|
|
if coherence_score < 0.8: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": "Response coherence and structure for the specific topic", |
|
|
"confidence": coherence_score, |
|
|
"mitigation": "Applied quality enhancement techniques and content relevance checks" |
|
|
}) |
|
|
|
|
|
|
|
|
safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
|
|
if safety_score < 0.9: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": "Content safety and bias assessment for educational content", |
|
|
"confidence": safety_score, |
|
|
"mitigation": "Generated advisory warnings for user awareness and content appropriateness" |
|
|
}) |
|
|
|
|
|
|
|
|
response_text = str(final_response.get('final_response', '')) |
|
|
if len(response_text) < 100: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": "Response completeness for user's detailed request", |
|
|
"confidence": 0.6, |
|
|
"mitigation": "Enhanced response generation with topic-specific content" |
|
|
}) |
|
|
|
|
|
return uncertainty_areas |
|
|
|
|
|
def _extract_evidence_sources(self, intent_result: dict, final_response: dict, context: dict) -> list: |
|
|
"""Extract evidence sources for reasoning based on actual content""" |
|
|
evidence_sources = [] |
|
|
|
|
|
|
|
|
evidence_sources.append({ |
|
|
"type": "linguistic_analysis", |
|
|
"source": "Pattern matching and NLP analysis", |
|
|
"relevance": 0.9, |
|
|
"description": f"Intent classification based on linguistic patterns for '{intent_result.get('primary_intent', 'unknown')}' intent" |
|
|
}) |
|
|
|
|
|
|
|
|
interactions = context.get('interactions', []) |
|
|
if interactions: |
|
|
evidence_sources.append({ |
|
|
"type": "conversation_history", |
|
|
"source": f"Previous {len(interactions)} interactions", |
|
|
"relevance": 0.7, |
|
|
"description": f"Conversation context and topic continuity analysis" |
|
|
}) |
|
|
|
|
|
|
|
|
synthesis_method = final_response.get('synthesis_method', 'unknown') |
|
|
evidence_sources.append({ |
|
|
"type": "synthesis_method", |
|
|
"source": f"{synthesis_method} approach", |
|
|
"relevance": 0.8, |
|
|
"description": f"Response generated using {synthesis_method} methodology with quality optimization" |
|
|
}) |
|
|
|
|
|
|
|
|
response_text = str(final_response.get('final_response', '')) |
|
|
if len(response_text) > 1000: |
|
|
evidence_sources.append({ |
|
|
"type": "content_analysis", |
|
|
"source": "Comprehensive content generation", |
|
|
"relevance": 0.85, |
|
|
"description": "Detailed response generation based on user's specific requirements" |
|
|
}) |
|
|
|
|
|
return evidence_sources |
|
|
|
|
|
def _calibrate_confidence_scores(self, reasoning_chain: dict) -> dict: |
|
|
"""Calibrate confidence scores across the reasoning chain""" |
|
|
chain_of_thought = reasoning_chain.get('chain_of_thought', {}) |
|
|
|
|
|
|
|
|
step_confidences = [] |
|
|
for step_data in chain_of_thought.values(): |
|
|
if isinstance(step_data, dict) and 'confidence' in step_data: |
|
|
step_confidences.append(step_data['confidence']) |
|
|
|
|
|
overall_confidence = sum(step_confidences) / len(step_confidences) if step_confidences else 0.7 |
|
|
|
|
|
return { |
|
|
"overall_confidence": overall_confidence, |
|
|
"step_count": len(chain_of_thought), |
|
|
"confidence_distribution": { |
|
|
"high_confidence": len([c for c in step_confidences if c > 0.8]), |
|
|
"medium_confidence": len([c for c in step_confidences if 0.6 <= c <= 0.8]), |
|
|
"low_confidence": len([c for c in step_confidences if c < 0.6]) |
|
|
}, |
|
|
"calibration_method": "Weighted average of step confidences" |
|
|
} |
|
|
|
|
|
async def _extract_main_topic(self, user_input: str, context: dict = None) -> str: |
|
|
"""Extract the main topic using LLM zero-shot classification with caching""" |
|
|
try: |
|
|
|
|
|
import hashlib |
|
|
cache_key = hashlib.md5(user_input.encode()).hexdigest() |
|
|
if cache_key in self._topic_cache: |
|
|
logger.debug(f"Topic cache hit for: {user_input[:50]}...") |
|
|
return self._topic_cache[cache_key] |
|
|
|
|
|
|
|
|
if self.llm_router: |
|
|
|
|
|
context_info = "" |
|
|
if context: |
|
|
session_context = context.get('session_context', {}) |
|
|
session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
|
|
interaction_count = len(context.get('interaction_contexts', [])) |
|
|
|
|
|
if session_summary: |
|
|
context_info = f"\n\nSession context: {session_summary[:200]}" |
|
|
if interaction_count > 0: |
|
|
context_info += f"\nPrevious interactions in session: {interaction_count}" |
|
|
|
|
|
prompt = f"""Classify the main topic of this query in 2-5 words. Be specific and concise. |
|
|
|
|
|
Query: "{user_input}"{context_info} |
|
|
|
|
|
Respond with ONLY the topic name (e.g., "Machine Learning", "Healthcare Analytics", "Financial Modeling", "Software Development", "Educational Curriculum"). |
|
|
|
|
|
Do not include explanations, just the topic name. Maximum 5 words.""" |
|
|
|
|
|
topic_result = await self.llm_router.route_inference( |
|
|
task_type="classification", |
|
|
prompt=prompt, |
|
|
max_tokens=20, |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
if topic_result and isinstance(topic_result, str) and topic_result.strip(): |
|
|
topic = topic_result.strip() |
|
|
|
|
|
|
|
|
topic = topic.split('\n')[0].strip() |
|
|
words = topic.split()[:5] |
|
|
topic = " ".join(words) |
|
|
|
|
|
|
|
|
if len(self._topic_cache) >= self._topic_cache_max_size: |
|
|
|
|
|
oldest_key = next(iter(self._topic_cache)) |
|
|
del self._topic_cache[oldest_key] |
|
|
|
|
|
self._topic_cache[cache_key] = topic |
|
|
logger.debug(f"Topic extracted: {topic}") |
|
|
return topic |
|
|
|
|
|
|
|
|
words = user_input.split()[:4] |
|
|
fallback_topic = " ".join(words) if words else "General inquiry" |
|
|
logger.warning(f"Using fallback topic extraction: {fallback_topic}") |
|
|
return fallback_topic |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in LLM-based topic extraction: {e}", exc_info=True) |
|
|
|
|
|
words = user_input.split()[:4] |
|
|
return " ".join(words) if words else "General inquiry" |
|
|
|
|
|
async def _extract_keywords(self, user_input: str) -> str: |
|
|
"""Extract key terms using LLM or simple extraction""" |
|
|
try: |
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
stop_words = {'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', 'way', 'who', 'boy', 'did', 'she', 'use', 'her', 'many', 'some', 'time', 'very', 'when', 'come', 'here', 'just', 'like', 'long', 'make', 'over', 'such', 'take', 'than', 'them', 'well', 'were'} |
|
|
|
|
|
words = re.findall(r'\b[a-zA-Z]{3,}\b', user_input.lower()) |
|
|
keywords = [w for w in words if w not in stop_words][:5] |
|
|
|
|
|
return ", ".join(keywords) if keywords else "General terms" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in keyword extraction: {e}", exc_info=True) |
|
|
return "General terms" |
|
|
|
|
|
def _assess_query_complexity(self, user_input: str) -> str: |
|
|
"""Assess the complexity of the user query""" |
|
|
word_count = len(user_input.split()) |
|
|
question_count = user_input.count('?') |
|
|
|
|
|
if word_count > 50 and question_count > 2: |
|
|
return "Highly complex multi-part query" |
|
|
elif word_count > 30 and question_count > 1: |
|
|
return "Moderately complex query" |
|
|
elif word_count > 15: |
|
|
return "Standard complexity query" |
|
|
else: |
|
|
return "Simple query" |
|
|
|
|
|
def _determine_response_scope(self, user_input: str) -> str: |
|
|
"""Determine the scope of response needed""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
if any(word in input_lower for word in ['detailed', 'comprehensive', 'complete', 'full']): |
|
|
return "Comprehensive detailed response" |
|
|
elif any(word in input_lower for word in ['brief', 'short', 'summary', 'overview']): |
|
|
return "Brief summary response" |
|
|
elif any(word in input_lower for word in ['step by step', 'tutorial', 'guide', 'how to']): |
|
|
return "Step-by-step instructional response" |
|
|
else: |
|
|
return "Standard informative response" |
|
|
|
|
|
def _assess_content_relevance(self, user_input: str, final_response: dict) -> str: |
|
|
"""Assess how relevant the response content is to the user input""" |
|
|
response_text = str(final_response.get('final_response', '')) |
|
|
|
|
|
|
|
|
input_words = set(user_input.lower().split()) |
|
|
response_words = set(response_text.lower().split()) |
|
|
|
|
|
overlap = len(input_words.intersection(response_words)) |
|
|
total_input_words = len(input_words) |
|
|
|
|
|
if overlap / total_input_words > 0.3: |
|
|
return "High relevance to user query" |
|
|
elif overlap / total_input_words > 0.15: |
|
|
return "Moderate relevance to user query" |
|
|
else: |
|
|
return "Low relevance to user query" |
|
|
|
|
|
def _assess_content_appropriateness(self, user_input: str, safety_checked: dict) -> str: |
|
|
"""Assess content appropriateness for the topic""" |
|
|
warnings = safety_checked.get('warnings', []) |
|
|
safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
|
|
|
|
|
if safety_score > 0.9 and len(warnings) == 0: |
|
|
return "Highly appropriate content" |
|
|
elif safety_score > 0.8 and len(warnings) <= 1: |
|
|
return "Appropriate content with minor notes" |
|
|
else: |
|
|
return "Content requires review" |
|
|
|
|
|
def check_safety_thresholds(self, safety_result: dict) -> tuple: |
|
|
"""Check if safety scores exceed thresholds""" |
|
|
safety_analysis = safety_result.get('safety_analysis', {}) |
|
|
|
|
|
|
|
|
safety_scores = safety_analysis.get('safety_scores', {}) |
|
|
if not safety_scores: |
|
|
|
|
|
confidence_scores = safety_analysis.get('confidence_scores', {}) |
|
|
safety_scores = { |
|
|
"toxicity_or_harmful_language": confidence_scores.get('toxicity', 0.0), |
|
|
"potential_biases_or_stereotypes": confidence_scores.get('bias', 0.0), |
|
|
"privacy_or_security_concerns": confidence_scores.get('privacy', 0.0), |
|
|
"controversial_or_sensitive_topics": 1.0 - confidence_scores.get('safety', 1.0) |
|
|
} |
|
|
|
|
|
exceeded_categories = [] |
|
|
|
|
|
for category, score in safety_scores.items(): |
|
|
threshold = self.safety_thresholds.get(category, 1.0) |
|
|
if isinstance(score, (int, float)) and score > threshold: |
|
|
exceeded_categories.append(category) |
|
|
logger.info(f"Safety threshold exceeded: {category} = {score} > {threshold}") |
|
|
|
|
|
return len(exceeded_categories) > 0, exceeded_categories |
|
|
|
|
|
def generate_revision_prompt(self, original_response: str, exceeded_categories: list, |
|
|
safety_warnings: list) -> str: |
|
|
"""Generate revision prompt for synthesis agent""" |
|
|
|
|
|
category_instructions = { |
|
|
"toxicity_or_harmful_language": "Remove any language that could be harmful or offensive", |
|
|
"potential_biases_or_stereotypes": "Ensure response provides balanced alternatives and avoids stereotypes. Include diverse perspectives and specific qualifications needed for each career path", |
|
|
"privacy_or_security_concerns": "Remove any content that could compromise privacy or security", |
|
|
"controversial_or_sensitive_topics": "Present balanced viewpoints and acknowledge different perspectives" |
|
|
} |
|
|
|
|
|
revision_instructions = [] |
|
|
for category in exceeded_categories: |
|
|
if category in category_instructions: |
|
|
revision_instructions.append(category_instructions[category]) |
|
|
|
|
|
prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. |
|
|
|
|
|
Original Response: |
|
|
{original_response} |
|
|
|
|
|
Safety Issues Detected: |
|
|
{', '.join(exceeded_categories)} |
|
|
|
|
|
Specific Warnings: |
|
|
{'; '.join(safety_warnings) if safety_warnings else 'General safety concerns detected'} |
|
|
|
|
|
REVISION INSTRUCTIONS: |
|
|
{' | '.join(revision_instructions)} |
|
|
|
|
|
Please revise the response to address these concerns while maintaining helpfulness and accuracy. Ensure the response: |
|
|
1. Addresses the user's original question completely |
|
|
2. Provides specific, actionable alternatives with clear qualifications needed |
|
|
3. Avoids generalizations and stereotypes about career transitions |
|
|
4. Includes necessary skills, education, and experience requirements |
|
|
5. Maintains a balanced, inclusive perspective that acknowledges different paths |
|
|
|
|
|
Revised Response:""" |
|
|
|
|
|
return prompt |
|
|
|
|
|
async def process_request_with_revision(self, session_id: str, user_input: str) -> dict: |
|
|
"""Enhanced process_request with safety revision loop and timeout protection""" |
|
|
try: |
|
|
return await asyncio.wait_for( |
|
|
self._process_request_with_revision_internal(session_id, user_input), |
|
|
timeout=self.revision_timeout |
|
|
) |
|
|
except asyncio.TimeoutError: |
|
|
logger.error(f"Safety revision timed out after {self.revision_timeout}s") |
|
|
|
|
|
return { |
|
|
'final_response': 'Request processing took longer than expected. Please try again.', |
|
|
'response': 'Request processing took longer than expected. Please try again.', |
|
|
'revision_attempts': 0, |
|
|
'timeout_error': True, |
|
|
'safety_revision_applied': False |
|
|
} |
|
|
|
|
|
async def _process_request_with_revision_internal(self, session_id: str, user_input: str) -> dict: |
|
|
"""Internal revision loop with comprehensive error handling""" |
|
|
|
|
|
revision_attempt = 0 |
|
|
current_response = None |
|
|
final_result = None |
|
|
exceeded_categories = [] |
|
|
safety_warnings = [] |
|
|
|
|
|
while revision_attempt <= self.max_revision_attempts: |
|
|
try: |
|
|
|
|
|
processing_input = user_input |
|
|
if revision_attempt > 0: |
|
|
processing_input = self.generate_revision_prompt( |
|
|
current_response, |
|
|
exceeded_categories, |
|
|
safety_warnings |
|
|
) |
|
|
logger.info(f"Revision attempt {revision_attempt}: regenerating response with safety improvements") |
|
|
|
|
|
|
|
|
result = await self.process_request(session_id, processing_input) |
|
|
|
|
|
|
|
|
current_response = result.get('final_response') or result.get('response', '') |
|
|
|
|
|
if not current_response: |
|
|
|
|
|
metadata = result.get('metadata', {}) |
|
|
current_response = metadata.get('synthesis_result', {}).get('final_response', '') |
|
|
|
|
|
if not current_response: |
|
|
logger.warning("Could not extract response text for safety check") |
|
|
return result |
|
|
|
|
|
|
|
|
safety_checked = await self.agents['safety_check'].execute( |
|
|
response=current_response, |
|
|
context=result.get('context', {}) |
|
|
) |
|
|
|
|
|
|
|
|
needs_revision, exceeded_categories = self.check_safety_thresholds(safety_checked) |
|
|
safety_warnings = safety_checked.get('warnings', []) |
|
|
|
|
|
if not needs_revision: |
|
|
|
|
|
logger.info(f"Safety check passed on attempt {revision_attempt + 1}") |
|
|
result['safety_result'] = safety_checked |
|
|
result['revision_attempts'] = revision_attempt |
|
|
result['safety_revision_applied'] = revision_attempt > 0 |
|
|
|
|
|
|
|
|
if 'metadata' not in result: |
|
|
result['metadata'] = {} |
|
|
result['metadata']['safety_result'] = safety_checked |
|
|
result['metadata']['revision_attempts'] = revision_attempt |
|
|
|
|
|
return result |
|
|
|
|
|
if revision_attempt >= self.max_revision_attempts: |
|
|
|
|
|
logger.warning(f"Max revision attempts reached. Categories still exceeded: {exceeded_categories}") |
|
|
|
|
|
input_complexity = self._assess_input_complexity(user_input) |
|
|
|
|
|
|
|
|
if input_complexity["is_complex"] and input_complexity["complexity_score"] > 25: |
|
|
logger.info("Complex input detected - attempting intelligent re-prompt") |
|
|
try: |
|
|
|
|
|
improved_prompt = self._generate_improved_prompt(user_input, exceeded_categories) |
|
|
|
|
|
|
|
|
improved_result = await self.process_request(session_id, improved_prompt) |
|
|
improved_response = improved_result.get('final_response', '') |
|
|
|
|
|
|
|
|
final_safety_check = await self.agents['safety_check'].execute( |
|
|
response=improved_response, |
|
|
context=improved_result.get('context', {}) |
|
|
) |
|
|
|
|
|
improved_needs_revision, improved_exceeded = self.check_safety_thresholds(final_safety_check) |
|
|
|
|
|
if not improved_needs_revision: |
|
|
|
|
|
logger.info("Intelligent re-prompt resolved safety concerns") |
|
|
improved_result['safety_result'] = final_safety_check |
|
|
improved_result['revision_attempts'] = revision_attempt + 1 |
|
|
improved_result['intelligent_reprompt_success'] = True |
|
|
if 'metadata' not in improved_result: |
|
|
improved_result['metadata'] = {} |
|
|
improved_result['metadata']['safety_result'] = final_safety_check |
|
|
improved_result['metadata']['revision_attempts'] = revision_attempt + 1 |
|
|
improved_result['metadata']['intelligent_reprompt_success'] = True |
|
|
return improved_result |
|
|
else: |
|
|
|
|
|
logger.info("Intelligent re-prompt did not fully resolve concerns") |
|
|
current_response = improved_response |
|
|
safety_checked = final_safety_check |
|
|
exceeded_categories = improved_exceeded |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Intelligent re-prompt failed: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
|
|
|
warning_summary = self._generate_warning_summary(exceeded_categories, safety_checked.get('warnings', [])) |
|
|
user_guidance = self._generate_user_guidance(exceeded_categories, user_input) |
|
|
|
|
|
|
|
|
original_response = result.get('final_response', '') |
|
|
enhanced_response = f"{original_response}\n\n{warning_summary}\n\n{user_guidance}" |
|
|
|
|
|
result['final_response'] = enhanced_response |
|
|
result['response'] = enhanced_response |
|
|
result['safety_result'] = safety_checked |
|
|
result['revision_attempts'] = revision_attempt |
|
|
result['safety_exceeded'] = exceeded_categories |
|
|
result['safety_revision_applied'] = revision_attempt > 0 |
|
|
result['warning_summary_added'] = True |
|
|
result['input_complexity'] = input_complexity |
|
|
|
|
|
|
|
|
if 'metadata' not in result: |
|
|
result['metadata'] = {} |
|
|
result['metadata']['safety_result'] = safety_checked |
|
|
result['metadata']['revision_attempts'] = revision_attempt |
|
|
result['metadata']['safety_exceeded'] = exceeded_categories |
|
|
result['metadata']['input_complexity'] = input_complexity |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
final_result = result |
|
|
revision_attempt += 1 |
|
|
logger.info(f"Generating revision attempt {revision_attempt} for: {exceeded_categories}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in safety revision attempt {revision_attempt}: {e}", exc_info=True) |
|
|
if final_result: |
|
|
final_result['revision_error'] = str(e) |
|
|
if 'metadata' not in final_result: |
|
|
final_result['metadata'] = {} |
|
|
final_result['metadata']['revision_error'] = str(e) |
|
|
return final_result |
|
|
|
|
|
return { |
|
|
'response': 'Error in processing with safety revision', |
|
|
'final_response': 'Error in processing with safety revision', |
|
|
'revision_attempts': revision_attempt, |
|
|
'revision_error': str(e), |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
|
|
|
return final_result or { |
|
|
'response': 'Error in safety revision processing', |
|
|
'final_response': 'Error in safety revision processing', |
|
|
'revision_attempts': revision_attempt, |
|
|
'safety_revision_applied': False |
|
|
} |
|
|
|
|
|
def _generate_warning_summary(self, exceeded_categories: list, safety_warnings: list) -> str: |
|
|
"""Generate user-friendly warning summary""" |
|
|
category_explanations = { |
|
|
"potential_biases_or_stereotypes": "may contain assumptions about career transitions that don't account for individual circumstances", |
|
|
"toxicity_or_harmful_language": "contains language that could be harmful or inappropriate", |
|
|
"privacy_or_security_concerns": "includes content that could raise privacy or security considerations", |
|
|
"controversial_or_sensitive_topics": "touches on topics that may benefit from additional perspective" |
|
|
} |
|
|
|
|
|
if not exceeded_categories: |
|
|
return "" |
|
|
|
|
|
warning_text = "**Note**: This response " + ", ".join([ |
|
|
category_explanations.get(cat, f"has concerns related to {cat}") |
|
|
for cat in exceeded_categories |
|
|
]) + "." |
|
|
|
|
|
return warning_text |
|
|
|
|
|
def _generate_user_guidance(self, exceeded_categories: list, original_user_input: str) -> str: |
|
|
"""Generate proactive user guidance with UX-friendly options for complex prompts""" |
|
|
if not exceeded_categories: |
|
|
return "" |
|
|
|
|
|
input_complexity = self._assess_input_complexity(original_user_input) |
|
|
|
|
|
guidance_templates = { |
|
|
"potential_biases_or_stereotypes": { |
|
|
"issue": "avoid assumptions about career paths", |
|
|
"simple_suggestion": "ask for advice tailored to specific qualifications or industry interests", |
|
|
"complex_refinement": "add details like your specific skills, target industry, or education level" |
|
|
}, |
|
|
"toxicity_or_harmful_language": { |
|
|
"issue": "ensure respectful communication", |
|
|
"simple_suggestion": "rephrase using more neutral language", |
|
|
"complex_refinement": "adjust the tone while keeping your detailed context" |
|
|
}, |
|
|
"privacy_or_security_concerns": { |
|
|
"issue": "protect sensitive information", |
|
|
"simple_suggestion": "ask for general guidance instead", |
|
|
"complex_refinement": "remove specific personal details while keeping the scenario structure" |
|
|
}, |
|
|
"controversial_or_sensitive_topics": { |
|
|
"issue": "get balanced perspectives", |
|
|
"simple_suggestion": "ask for multiple viewpoints or balanced analysis", |
|
|
"complex_refinement": "specify you'd like pros/cons or different perspectives included" |
|
|
} |
|
|
} |
|
|
|
|
|
primary_category = exceeded_categories[0] |
|
|
guidance = guidance_templates.get(primary_category, { |
|
|
"issue": "improve response quality", |
|
|
"simple_suggestion": "try rephrasing with more specific details", |
|
|
"complex_refinement": "add clarifying details to your existing question" |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
topic = "Error recovery context" |
|
|
|
|
|
|
|
|
if input_complexity["is_complex"]: |
|
|
return f"""**Want a better response?** To {guidance['issue']} in responses about {topic}, you could {guidance['complex_refinement']} rather than rewriting your detailed question. Or simply ask again as-is and I'll focus on providing more balanced information.""" |
|
|
else: |
|
|
return f"""**Want a better response?** To {guidance['issue']} in future responses about {topic}, you could {guidance['simple_suggestion']}. Feel free to ask again with any adjustments!""" |
|
|
|
|
|
def _assess_input_complexity(self, user_input: str) -> dict: |
|
|
"""Assess input complexity to determine appropriate UX guidance""" |
|
|
word_count = len(user_input.split()) |
|
|
sentence_count = user_input.count('.') + user_input.count('!') + user_input.count('?') |
|
|
has_context = any(phrase in user_input.lower() for phrase in [ |
|
|
'i am currently', 'my situation', 'my background', 'i have been', |
|
|
'my experience', 'i work', 'my company', 'specific to my' |
|
|
]) |
|
|
has_constraints = any(phrase in user_input.lower() for phrase in [ |
|
|
'must', 'need to', 'required', 'limited by', 'constraint', 'budget', |
|
|
'timeline', 'deadline', 'specific requirements' |
|
|
]) |
|
|
|
|
|
is_complex = ( |
|
|
word_count > 30 or |
|
|
sentence_count > 2 or |
|
|
has_context or |
|
|
has_constraints |
|
|
) |
|
|
|
|
|
return { |
|
|
"is_complex": is_complex, |
|
|
"word_count": word_count, |
|
|
"has_personal_context": has_context, |
|
|
"has_constraints": has_constraints, |
|
|
"complexity_score": word_count * 0.1 + sentence_count * 5 + (has_context * 10) + (has_constraints * 10) |
|
|
} |
|
|
|
|
|
def _generate_improved_prompt(self, original_input: str, exceeded_categories: list) -> str: |
|
|
"""Generate improved prompt for complex inputs to resolve safety concerns automatically""" |
|
|
|
|
|
improvements = [] |
|
|
|
|
|
if "potential_biases_or_stereotypes" in exceeded_categories: |
|
|
improvements.append("Please provide specific qualifications, skills, and requirements for each option") |
|
|
improvements.append("Include diverse pathways and acknowledge individual circumstances vary") |
|
|
|
|
|
if "toxicity_or_harmful_language" in exceeded_categories: |
|
|
improvements.append("Use respectful, professional language throughout") |
|
|
|
|
|
if "privacy_or_security_concerns" in exceeded_categories: |
|
|
improvements.append("Focus on general guidance without personal specifics") |
|
|
|
|
|
if "controversial_or_sensitive_topics" in exceeded_categories: |
|
|
improvements.append("Present balanced perspectives and multiple viewpoints") |
|
|
|
|
|
improvement_instructions = ". ".join(improvements) |
|
|
|
|
|
improved_prompt = f"""{original_input} |
|
|
|
|
|
Additional guidance for response: {improvement_instructions}. Ensure all advice is specific, actionable, and acknowledges different backgrounds and circumstances.""" |
|
|
|
|
|
return improved_prompt |
|
|
|
|
|
def check_query_similarity(self, new_query: str, threshold: float = 0.85) -> Optional[Dict]: |
|
|
""" |
|
|
Step 3: Add Query Similarity Detection |
|
|
|
|
|
Check if new query is similar to any recent queries above threshold. |
|
|
Uses simple string similarity (can be enhanced with embeddings later). |
|
|
|
|
|
Args: |
|
|
new_query: The new query to check |
|
|
threshold: Similarity threshold (default 0.85) |
|
|
|
|
|
Returns: |
|
|
Cached response dict if similar query found, None otherwise |
|
|
""" |
|
|
if not self.recent_queries: |
|
|
return None |
|
|
|
|
|
new_query_lower = new_query.lower().strip() |
|
|
|
|
|
for cached_query_data in reversed(self.recent_queries): |
|
|
cached_query = cached_query_data.get('query', '') |
|
|
if not cached_query: |
|
|
continue |
|
|
|
|
|
cached_query_lower = cached_query.lower().strip() |
|
|
|
|
|
|
|
|
similarity = self._calculate_similarity(new_query_lower, cached_query_lower) |
|
|
|
|
|
if similarity > threshold: |
|
|
logger.info(f"Similar query detected (similarity: {similarity:.2f}): '{new_query[:50]}...' similar to '{cached_query[:50]}...'") |
|
|
return cached_query_data.get('response') |
|
|
|
|
|
return None |
|
|
|
|
|
def _calculate_similarity(self, query1: str, query2: str) -> float: |
|
|
""" |
|
|
Calculate similarity between two queries using Jaccard similarity on words. |
|
|
Can be enhanced with embeddings for semantic similarity. |
|
|
""" |
|
|
if not query1 or not query2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
words1 = set(query1.split()) |
|
|
words2 = set(query2.split()) |
|
|
|
|
|
if not words1 or not words2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
intersection = len(words1.intersection(words2)) |
|
|
union = len(words1.union(words2)) |
|
|
|
|
|
if union == 0: |
|
|
return 0.0 |
|
|
|
|
|
jaccard = intersection / union |
|
|
|
|
|
|
|
|
if query1 in query2 or query2 in query1: |
|
|
jaccard = max(jaccard, 0.9) |
|
|
|
|
|
return jaccard |
|
|
|
|
|
def track_response_metrics(self, start_time: float, response: Dict): |
|
|
""" |
|
|
Step 5: Add Response Metrics Tracking |
|
|
|
|
|
Track performance metrics for responses. |
|
|
|
|
|
Args: |
|
|
start_time: Start time from time.time() |
|
|
response: Response dictionary containing response data |
|
|
""" |
|
|
try: |
|
|
latency = time.time() - start_time |
|
|
|
|
|
|
|
|
response_text = ( |
|
|
response.get('response') or |
|
|
response.get('final_response') or |
|
|
str(response.get('result', '')) |
|
|
) |
|
|
|
|
|
|
|
|
token_count = len(response_text.split()) if response_text else 0 |
|
|
|
|
|
|
|
|
safety_score = 0.8 |
|
|
if 'metadata' in response: |
|
|
synthesis_result = response['metadata'].get('synthesis_result', {}) |
|
|
safety_result = response['metadata'].get('safety_result', {}) |
|
|
if safety_result: |
|
|
safety_analysis = safety_result.get('safety_analysis', {}) |
|
|
safety_score = safety_analysis.get('overall_safety_score', 0.8) |
|
|
|
|
|
metrics = { |
|
|
'latency': latency, |
|
|
'token_count': token_count, |
|
|
'agent_calls': self.agent_call_count, |
|
|
'safety_score': safety_score, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
self.response_metrics_history.append(metrics) |
|
|
if len(self.response_metrics_history) > 100: |
|
|
self.response_metrics_history = self.response_metrics_history[-100:] |
|
|
|
|
|
|
|
|
logger.info(f"Response Metrics - Latency: {latency:.3f}s, Tokens: {token_count}, " |
|
|
f"Agent Calls: {self.agent_call_count}, Safety Score: {safety_score:.2f}") |
|
|
|
|
|
|
|
|
self.agent_call_count = 0 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error tracking response metrics: {e}", exc_info=True) |
|
|
|