JatsTheAIGen's picture
cache key error when user id changes -fixed task 1 31_10_2025 v4
cb5e65b
raw
history blame
53.6 kB
# app.py - Mobile-First Implementation
import gradio as gr
import uuid
import logging
import traceback
from typing import Optional, Tuple, List, Dict, Any
import os
# Configure comprehensive logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log')
]
)
logger = logging.getLogger(__name__)
# Try to import orchestration components
orchestrator = None
orchestrator_available = False
# Process Flow Visualization - DISABLED
# Moving functionality to container logs instead of UI
process_flow_available = False
logger.info("Process Flow Visualization disabled - functionality moved to container logs")
try:
logger.info("Attempting to import orchestration components...")
import sys
sys.path.insert(0, '.')
sys.path.insert(0, 'src')
from src.agents.intent_agent import create_intent_agent
from src.agents.synthesis_agent import create_synthesis_agent
from src.agents.safety_agent import create_safety_agent
from src.agents.skills_identification_agent import create_skills_identification_agent
from src.llm_router import LLMRouter
from src.orchestrator_engine import MVPOrchestrator
from src.context_manager import EfficientContextManager
from src.config import settings
logger.info("βœ“ Successfully imported orchestration components")
orchestrator_available = True
except ImportError as e:
logger.warning(f"Could not import orchestration components: {e}")
# Note: System will gracefully degrade if orchestrator unavailable
# This is handled in process_message_async with proper user-facing messages
try:
from spaces import GPU
SPACES_GPU_AVAILABLE = True
logger.info("HF Spaces GPU available")
except ImportError:
# Not running on HF Spaces or spaces module not available
SPACES_GPU_AVAILABLE = False
GPU = None
logger.info("Running without HF Spaces GPU")
def create_mobile_optimized_interface():
"""Create the mobile-optimized Gradio interface and return demo with components"""
# Store components for wiring
interface_components = {}
with gr.Blocks(
title="AI Research Assistant MVP",
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="gray",
font=("Inter", "system-ui", "sans-serif")
),
css="""
/* Mobile-first responsive CSS */
.mobile-container {
max-width: 100vw;
margin: 0 auto;
padding: 0 12px;
}
/* Touch-friendly button sizing */
.gradio-button {
min-height: 44px !important;
min-width: 44px !important;
font-size: 16px !important; /* Prevents zoom on iOS */
}
/* Mobile-optimized chat interface */
.chatbot-container {
height: 60vh !important;
max-height: 60vh !important;
overflow-y: auto !important;
-webkit-overflow-scrolling: touch !important;
}
/* Mobile input enhancements */
.textbox-input {
font-size: 16px !important; /* Prevents zoom */
min-height: 44px !important;
padding: 12px !important;
}
/* Responsive grid adjustments */
@media (max-width: 768px) {
.gradio-row {
flex-direction: column !important;
gap: 8px !important;
}
.gradio-column {
width: 100% !important;
}
.chatbot-container {
height: 50vh !important;
}
}
/* Dark mode support */
@media (prefers-color-scheme: dark) {
body {
background: #1a1a1a;
color: #ffffff;
}
}
/* Hide scrollbars but maintain functionality */
.chatbot-container::-webkit-scrollbar {
width: 4px;
}
/* Loading states */
.loading-indicator {
display: flex;
align-items: center;
justify-content: center;
padding: 20px;
}
/* Mobile menu enhancements */
.accordion-content {
max-height: 200px !important;
overflow-y: auto !important;
}
/* Skills Tags Styling */
#skills_tags_container {
padding: 8px 12px;
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
border-radius: 8px;
border: 1px solid #dee2e6;
margin: 8px 0;
min-height: 40px;
display: flex;
flex-wrap: wrap;
align-items: center;
gap: 6px;
}
.skill-tag {
display: inline-block;
background: linear-gradient(135deg, #007bff 0%, #0056b3 100%);
color: white;
padding: 4px 8px;
border-radius: 12px;
font-size: 12px;
font-weight: 500;
margin: 2px;
box-shadow: 0 2px 4px rgba(0,123,255,0.2);
transition: all 0.2s ease;
cursor: pointer;
}
.skill-tag:hover {
transform: translateY(-1px);
box-shadow: 0 4px 8px rgba(0,123,255,0.3);
}
.skill-tag.high-confidence {
background: linear-gradient(135deg, #28a745 0%, #1e7e34 100%);
}
.skill-tag.medium-confidence {
background: linear-gradient(135deg, #ffc107 0%, #e0a800 100%);
color: #212529;
}
.skill-tag.low-confidence {
background: linear-gradient(135deg, #6c757d 0%, #495057 100%);
}
.skills-header {
font-size: 11px;
color: #6c757d;
margin-right: 8px;
font-weight: 600;
}
/* Dark mode support for skills */
@media (prefers-color-scheme: dark) {
#skills_tags_container {
background: linear-gradient(135deg, #2d3748 0%, #1a202c 100%);
border-color: #4a5568;
}
.skills-header {
color: #a0aec0;
}
}
"""
) as demo:
# Session Management (Mobile-Optimized)
with gr.Column(elem_classes="mobile-container"):
gr.Markdown("""
# 🧠 Research Assistant
*Academic AI with transparent reasoning*
""")
# Session Header Bar (Mobile-Friendly)
with gr.Row():
# User Selection Dropdown
user_dropdown = gr.Dropdown(
choices=["Admin_J", "Dev_K", "Dev_H", "Dev_A", "Test_Any"],
value="Test_Any",
label="User",
show_label=False,
container=False,
scale=1,
min_width=100
)
interface_components['user_dropdown'] = user_dropdown
session_info = gr.Textbox(
label="Session Info",
value=f"Session: {str(uuid.uuid4())[:8]} | User: Test_Any | Interactions: 0",
max_lines=1,
show_label=False,
container=False,
scale=3,
interactive=False
)
interface_components['session_info'] = session_info
new_session_btn = gr.Button(
"πŸ”„ New",
size="sm",
variant="secondary",
scale=1,
min_width=60
)
interface_components['new_session_btn'] = new_session_btn
menu_toggle = gr.Button(
"βš™οΈ",
size="sm",
variant="secondary",
scale=1,
min_width=60
)
interface_components['menu_toggle'] = menu_toggle
# Main Chat Area (Mobile-Optimized)
with gr.Tabs() as main_tabs:
with gr.TabItem("πŸ’¬ Chat", id="chat_tab"):
chatbot = gr.Chatbot(
label="",
show_label=False,
height="60vh",
elem_classes="chatbot-container",
type="messages"
)
interface_components['chatbot'] = chatbot
# Skills Identification Display (between chat and input)
with gr.Row(visible=False, elem_id="skills_display") as skills_display_row:
skills_tags = gr.HTML(
value="",
show_label=False,
elem_id="skills_tags_container"
)
interface_components['skills_tags'] = skills_tags
# Mobile Input Area
with gr.Row():
message_input = gr.Textbox(
placeholder="Ask me anything...",
show_label=False,
max_lines=3,
container=False,
scale=4,
autofocus=True
)
interface_components['message_input'] = message_input
send_btn = gr.Button(
"↑ Send",
variant="primary",
scale=1,
min_width=80
)
interface_components['send_btn'] = send_btn
# Technical Details Tab (Collapsible for Mobile)
with gr.TabItem("πŸ” Details", id="details_tab"):
with gr.Accordion("Reasoning Chain", open=False):
reasoning_display = gr.JSON(
label="",
show_label=False
)
interface_components['reasoning_display'] = reasoning_display
with gr.Accordion("Agent Performance", open=False):
performance_display = gr.JSON(
label="",
show_label=False
)
interface_components['performance_display'] = performance_display
with gr.Accordion("Session Context", open=False):
context_display = gr.JSON(
label="",
show_label=False
)
interface_components['context_display'] = context_display
# Process Flow Tab - DISABLED
# Process flow information is now logged to container logs instead of UI
# Mobile Bottom Navigation
with gr.Row(visible=False, elem_id="mobile_nav") as mobile_navigation:
chat_nav_btn = gr.Button("πŸ’¬ Chat", variant="secondary", size="sm", min_width=0)
details_nav_btn = gr.Button("πŸ” Details", variant="secondary", size="sm", min_width=0)
settings_nav_btn = gr.Button("βš™οΈ Settings", variant="secondary", size="sm", min_width=0)
# Settings Panel (Modal for Mobile)
with gr.Column(visible=False, elem_id="settings_panel") as settings:
interface_components['settings_panel'] = settings
with gr.Accordion("Display Options", open=True):
show_reasoning = gr.Checkbox(
label="Show reasoning chain",
value=True,
info="Display step-by-step reasoning"
)
interface_components['show_reasoning'] = show_reasoning
show_agent_trace = gr.Checkbox(
label="Show agent execution trace",
value=False,
info="Display which agents processed your request"
)
interface_components['show_agent_trace'] = show_agent_trace
compact_mode = gr.Checkbox(
label="Compact mode",
value=False,
info="Optimize for smaller screens"
)
interface_components['compact_mode'] = compact_mode
with gr.Accordion("Performance Options", open=False):
response_speed = gr.Radio(
choices=["Fast", "Balanced", "Thorough"],
value="Balanced",
label="Response Speed Preference"
)
interface_components['response_speed'] = response_speed
cache_enabled = gr.Checkbox(
label="Enable context caching",
value=True,
info="Faster responses using session memory"
)
interface_components['cache_enabled'] = cache_enabled
save_prefs_btn = gr.Button("Save Preferences", variant="primary")
interface_components['save_prefs_btn'] = save_prefs_btn
# Wire up the submit handler INSIDE the gr.Blocks context
if 'send_btn' in interface_components and 'message_input' in interface_components and 'chatbot' in interface_components:
# Store interface components globally for dynamic return values
global _interface_components
_interface_components = interface_components
# Build outputs list dynamically
outputs = _build_outputs_list(interface_components)
# Include session_info in inputs to pass session ID
inputs = [interface_components['message_input'], interface_components['chatbot']]
if 'user_dropdown' in interface_components:
inputs.append(interface_components['user_dropdown'])
if 'session_info' in interface_components:
inputs.append(interface_components['session_info'])
interface_components['send_btn'].click(
fn=chat_handler_fn,
inputs=inputs,
outputs=outputs
)
# Wire up New Session button
if 'new_session_btn' in interface_components and 'session_info' in interface_components and 'user_dropdown' in interface_components:
def new_session(user_id):
new_session_id = str(uuid.uuid4())[:8]
return f"Session: {new_session_id} | User: {user_id} | Interactions: 0"
interface_components['new_session_btn'].click(
fn=new_session,
inputs=[interface_components['user_dropdown']],
outputs=[interface_components['session_info']]
)
# Wire up User Dropdown to update session info
if 'user_dropdown' in interface_components and 'session_info' in interface_components:
def update_session_info(user_id, session_text):
# Extract session_id from existing text
import re
match = re.search(r'Session: ([a-f0-9]+)', session_text)
session_id = match.group(1) if match else str(uuid.uuid4())[:8]
# Extract interaction count
match = re.search(r'Interactions: (\d+)', session_text)
interaction_count = match.group(1) if match else "0"
return f"Session: {session_id} | User: {user_id} | Interactions: {interaction_count}"
interface_components['user_dropdown'].change(
fn=update_session_info,
inputs=[interface_components['user_dropdown'], interface_components['session_info']],
outputs=[interface_components['session_info']]
)
# Wire up Settings button to toggle settings panel
if 'menu_toggle' in interface_components and 'settings_panel' in interface_components:
def toggle_settings(visible):
return gr.update(visible=not visible)
interface_components['menu_toggle'].click(
fn=toggle_settings,
inputs=[interface_components['settings_panel']],
outputs=[interface_components['settings_panel']]
)
# Wire up Save Preferences button
if 'save_prefs_btn' in interface_components:
def save_preferences(*args):
logger.info("Preferences saved")
gr.Info("Preferences saved successfully!")
interface_components['save_prefs_btn'].click(
fn=save_preferences,
inputs=[
interface_components.get('show_reasoning', None),
interface_components.get('show_agent_trace', None),
interface_components.get('response_speed', None),
interface_components.get('cache_enabled', None)
]
)
# Process Flow event handlers - DISABLED
# Process flow information is now logged to container logs instead of UI
return demo, interface_components
def setup_event_handlers(demo, event_handlers):
"""Setup event handlers for the interface"""
# Find components by their labels or types
components = {}
for block in demo.blocks:
if hasattr(block, 'label'):
if block.label == 'Session ID':
components['session_info'] = block
elif hasattr(block, 'value') and 'session' in str(block.value).lower():
components['session_id'] = block
# Setup message submission handler
try:
# This is a simplified version - you'll need to adapt based on your actual component structure
if hasattr(demo, 'submit'):
demo.submit(
fn=event_handlers.handle_message_submit,
inputs=[components.get('message_input'), components.get('chatbot')],
outputs=[components.get('message_input'), components.get('chatbot')]
)
except Exception as e:
logger.error(f"Could not setup event handlers: {e}", exc_info=True)
# Event handlers setup failure is logged but won't affect core chat functionality
# Gradio interface will still work with default handlers
return demo
def _generate_skills_html(identified_skills: List[Dict[str, Any]]) -> str:
"""Generate HTML for skills tags display"""
if not identified_skills:
return ""
# Limit to top 8 skills for UI
top_skills = identified_skills[:8]
# Generate HTML with confidence-based styling
skills_html = '<div class="skills-header">🎯 Relevant Skills:</div>'
for skill in top_skills:
skill_name = skill.get('skill', 'Unknown Skill')
probability = skill.get('probability', 0.5)
# Determine confidence class based on probability
if probability >= 0.7:
confidence_class = "high-confidence"
elif probability >= 0.4:
confidence_class = "medium-confidence"
else:
confidence_class = "low-confidence"
# Create skill tag
skills_html += f'<span class="skill-tag {confidence_class}" title="Probability: {probability:.1%}">{skill_name}</span>'
return skills_html
def _update_skills_display(skills_html: str) -> Tuple[str, bool]:
"""Update skills display visibility and content"""
if skills_html and len(skills_html.strip()) > 0:
return skills_html, True # Show skills display
else:
return "", False # Hide skills display
async def process_message_async(message: str, history: Optional[List], session_id: str, user_id: str = "Test_Any") -> Tuple[List, str, dict, dict, dict, str, str]:
"""
Process message with full orchestration system
Returns (updated_history, empty_string, reasoning_data, performance_data, context_data, session_id, skills_html)
GUARANTEES:
- Always returns a response (never None or empty)
- Handles all error cases gracefully
- Provides fallback responses at every level
- Returns metadata for Details tab
- Returns session_id to maintain session continuity
- Returns skills HTML for display
"""
global orchestrator
try:
logger.info(f"Processing message: {message[:100]}")
logger.info(f"Session ID: {session_id}")
logger.info(f"User ID: {user_id}")
logger.info(f"Orchestrator available: {orchestrator is not None}")
# Set user_id in orchestrator for context system
if orchestrator is not None:
if hasattr(orchestrator, 'set_user_id'):
orchestrator.set_user_id(session_id, user_id)
if not message or not message.strip():
logger.debug("Empty message received")
return history if history else [], "", {}, {}, {}, session_id, ""
if history is None:
history = []
new_history = list(history) if isinstance(history, list) else []
# Check if this is a safety choice response (BEFORE normal processing)
message_upper = message.strip().upper()
is_safety_choice = message_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N']
# Check if we have a pending safety choice for this session
if is_safety_choice and orchestrator is not None:
# Check both _pending_choices (from app.py) and awaiting_safety_response (from orchestrator)
pending_choice = getattr(orchestrator, '_pending_choices', {}).get(session_id)
awaiting_response = getattr(orchestrator, 'awaiting_safety_response', {}).get(session_id, False)
if pending_choice or awaiting_response:
logger.info(f"Processing safety choice response: {message_upper} (session: {session_id})")
# Determine user decision
user_decision = message_upper in ['YES', 'APPLY', 'Y']
# Process the safety choice directly (bypasses normal safety checks)
if pending_choice:
choice_result = await orchestrator.handle_user_safety_decision(
pending_choice['choice_id'],
user_decision,
session_id
)
# Clean up pending choice
if hasattr(orchestrator, '_pending_choices'):
orchestrator._pending_choices.pop(session_id, None)
else:
# Fallback: if no pending choice but flag is set, skip safety check
logger.warning(f"Safety response flag set but no pending choice found - bypassing safety check")
return new_history, "", {}, {}, {}, session_id, ""
# Add user message
new_history.append({"role": "user", "content": message.strip()})
# Add assistant response
if 'error' in choice_result:
response = f"Error processing safety choice: {choice_result['error']}"
else:
response = choice_result.get('response', choice_result.get('final_response', 'Processing complete.'))
new_history.append({"role": "assistant", "content": response})
# Extract metadata
reasoning_data = {}
performance_data = {
"user_choice": choice_result.get('user_choice', 'unknown'),
"revision_applied": choice_result.get('revision_applied', False)
}
context_data = {
"interaction_id": choice_result.get('interaction_id', 'unknown'),
"session_id": session_id
}
# Ensure flags are cleared
if hasattr(orchestrator, 'awaiting_safety_response'):
orchestrator.awaiting_safety_response.pop(session_id, None)
return new_history, "", reasoning_data, performance_data, context_data, session_id, ""
# Add user message (normal flow)
new_history.append({"role": "user", "content": message.strip()})
# Initialize Details tab data
reasoning_data = {}
performance_data = {}
context_data = {}
skills_html = "" # Initialize skills_html
# GUARANTEE: Always get a response
response = "Hello! I'm processing your request..."
# Try to use orchestrator if available
if orchestrator is not None:
try:
logger.info("Attempting full orchestration...")
# Use normal processing (user choice feature is PAUSED)
# Safety warnings are automatically appended to responses when thresholds exceeded
result = await orchestrator.process_request(
session_id=session_id,
user_input=message.strip()
)
# Check if result indicates this was a safety response (should have been handled above)
if result.get('is_safety_response', False):
logger.warning("Safety response detected in normal processing - should have been handled earlier")
# Skip further processing
return new_history, "", {}, {}, {}, session_id, ""
# USER CHOICE FEATURE PAUSED - Warnings automatically appended to responses
# No reiteration/revision happens - responses are returned with warnings when thresholds exceeded
logger.info("Processing response - safety warnings appended automatically if needed (no revisions)")
# Extract response from result with multiple fallback checks
if isinstance(result, dict):
# Extract the text response (not the dict)
response = (
result.get('response') or
result.get('final_response') or
result.get('safety_checked_response') or
result.get('original_response') or
str(result.get('result', ''))
)
# Extract metadata for Details tab with enhanced reasoning chain
reasoning_data = result.get('metadata', {}).get('reasoning_chain', {
"chain_of_thought": {},
"alternative_paths": [],
"uncertainty_areas": [],
"evidence_sources": [],
"confidence_calibration": {}
})
# Ensure we have the enhanced structure even if orchestrator didn't provide it
if not reasoning_data.get('chain_of_thought'):
reasoning_data = {
"chain_of_thought": {
"step_1": {
"hypothesis": "Processing user request",
"evidence": [f"User input: {message[:50]}..."],
"confidence": 0.7,
"reasoning": "Basic request processing"
}
},
"alternative_paths": [],
"uncertainty_areas": [],
"evidence_sources": [],
"confidence_calibration": {"overall_confidence": 0.7}
}
performance_data = {
"agent_trace": result.get('agent_trace', []),
"processing_time": result.get('metadata', {}).get('processing_time', 0),
"token_count": result.get('metadata', {}).get('token_count', 0),
"confidence_score": result.get('confidence_score', 0.7),
"agents_used": result.get('metadata', {}).get('agents_used', [])
}
context_data = {
"interaction_id": result.get('interaction_id', 'unknown'),
"session_id": session_id,
"timestamp": result.get('timestamp', ''),
"warnings": result.get('metadata', {}).get('warnings', [])
}
# Extract skills data for UI display
skills_html = ""
skills_result = result.get('metadata', {}).get('skills_result', {})
if skills_result and skills_result.get('identified_skills'):
skills_html = _generate_skills_html(skills_result['identified_skills'])
else:
response = str(result) if result else "Processing complete."
# Final safety check - ensure response is not empty (only for actual errors)
# Handle both string and dict types
if isinstance(response, dict):
response = str(response.get('content', response))
if not response or (isinstance(response, str) and len(response.strip()) == 0):
# This should only happen if LLM API completely fails - log it
logger.warning(f"Empty response received from orchestrator for message: {message[:50]}...")
response = (
f"I received your message about '{message[:50]}...'. "
f"I'm processing your request and working on providing you with a comprehensive answer. "
f"Please wait a moment and try again if needed."
)
logger.info(f"Orchestrator returned response (length: {len(response)})")
except Exception as orch_error:
logger.error(f"Orchestrator error with safety revision: {orch_error}", exc_info=True)
try:
# Graceful degradation to original orchestrator method
logger.info("Falling back to original orchestrator method...")
result = await orchestrator.process_request(
session_id=session_id,
user_input=message.strip()
)
result['fallback_used'] = True
result['revision_attempts'] = 0
logger.info("βœ“ Fallback to original orchestrator successful")
# Extract response from fallback result
response = (
result.get('response') or
result.get('final_response') or
result.get('safety_checked_response') or
result.get('original_response') or
str(result.get('result', ''))
)
# Extract metadata from fallback result
reasoning_data = result.get('metadata', {}).get('reasoning_chain', {
"chain_of_thought": {},
"alternative_paths": [],
"uncertainty_areas": [],
"evidence_sources": [],
"confidence_calibration": {}
})
performance_data = {
"agent_trace": result.get('agent_trace', []),
"processing_time": result.get('metadata', {}).get('processing_time', 0),
"token_count": result.get('metadata', {}).get('token_count', 0),
"confidence_score": result.get('confidence_score', 0.7),
"agents_used": result.get('metadata', {}).get('agents_used', [])
}
context_data = {
"interaction_id": result.get('interaction_id', 'unknown'),
"session_id": session_id,
"timestamp": result.get('timestamp', ''),
"warnings": result.get('metadata', {}).get('warnings', [])
}
# Extract skills data from fallback
skills_html = ""
skills_result = result.get('metadata', {}).get('skills_result', {})
if skills_result and skills_result.get('identified_skills'):
skills_html = _generate_skills_html(skills_result['identified_skills'])
except Exception as fallback_error:
logger.error(f"Fallback orchestrator also failed: {fallback_error}", exc_info=True)
# Fallback response with error info and enhanced reasoning
response = f"I'm experiencing some technical difficulties. Your message was: '{message[:100]}...' Please try again or rephrase your question."
reasoning_data = {
"chain_of_thought": {
"step_1": {
"hypothesis": "System encountered an error during processing",
"evidence": [f"Error: {str(orch_error)[:100]}..."],
"confidence": 0.3,
"reasoning": "Orchestrator failure - fallback mode activated"
}
},
"alternative_paths": [],
"uncertainty_areas": [
{
"aspect": "System reliability",
"confidence": 0.3,
"mitigation": "Error handling and graceful degradation"
}
],
"evidence_sources": [],
"confidence_calibration": {"overall_confidence": 0.3, "error_mode": True}
}
performance_data = {}
context_data = {}
skills_html = ""
else:
# System initialization message with enhanced reasoning
logger.info("Orchestrator not yet available")
response = f"Hello! I received your message about: '{message}'.\n\nThe orchestration system is initializing. Your question is important and I'll provide a comprehensive answer shortly."
reasoning_data = {
"chain_of_thought": {
"step_1": {
"hypothesis": "System is initializing and not yet ready",
"evidence": ["Orchestrator not available", f"User input: {message[:50]}..."],
"confidence": 0.5,
"reasoning": "System startup phase - components loading"
}
},
"alternative_paths": [],
"uncertainty_areas": [
{
"aspect": "System readiness",
"confidence": 0.5,
"mitigation": "Graceful initialization message"
}
],
"evidence_sources": [],
"confidence_calibration": {"overall_confidence": 0.5, "initialization_mode": True}
}
performance_data = {}
context_data = {}
skills_html = "" # Initialize skills_html for orchestrator not available case
# Add assistant response
new_history.append({"role": "assistant", "content": response})
logger.info("βœ“ Message processing complete")
return new_history, "", reasoning_data, performance_data, context_data, session_id, skills_html
except Exception as e:
# FINAL FALLBACK: Always return something to user with enhanced reasoning
logger.error(f"Error in process_message_async: {e}", exc_info=True)
# Create error history with helpful message
error_history = list(history) if history else []
error_history.append({"role": "user", "content": message})
# User-friendly error message
error_message = (
f"I encountered a technical issue processing your message: '{message[:50]}...'. "
f"Please try rephrasing your question or contact support if this persists."
)
error_history.append({"role": "assistant", "content": error_message})
# Enhanced reasoning for error case
reasoning_data = {
"chain_of_thought": {
"step_1": {
"hypothesis": "Critical system error occurred",
"evidence": [f"Exception: {str(e)[:100]}...", f"User input: {message[:50]}..."],
"confidence": 0.2,
"reasoning": "System error handling - final fallback activated"
}
},
"alternative_paths": [],
"uncertainty_areas": [
{
"aspect": "System stability",
"confidence": 0.2,
"mitigation": "Error logging and user notification"
}
],
"evidence_sources": [],
"confidence_calibration": {"overall_confidence": 0.2, "critical_error": True}
}
return error_history, "", reasoning_data, {}, {}, session_id, ""
# Global variable to store interface components for dynamic return values
_interface_components = {}
def _build_outputs_list(interface_components: dict) -> list:
"""
Build outputs list dynamically based on available interface components
"""
outputs = [interface_components['chatbot'], interface_components['message_input']]
# Add Details tab components
if 'reasoning_display' in interface_components:
outputs.append(interface_components['reasoning_display'])
if 'performance_display' in interface_components:
outputs.append(interface_components['performance_display'])
if 'context_display' in interface_components:
outputs.append(interface_components['context_display'])
if 'session_info' in interface_components:
outputs.append(interface_components['session_info'])
if 'skills_tags' in interface_components:
outputs.append(interface_components['skills_tags'])
# Process Flow outputs - DISABLED
# Process flow information is now logged to container logs instead of UI
return outputs
def _build_dynamic_return_values(result: tuple, skills_content: str, interface_components: dict) -> tuple:
"""
Build return values dynamically based on available interface components
This ensures the return values match the outputs list exactly
"""
return_values = []
# Base components (always present)
return_values.extend([
result[0], # chatbot (history)
result[1], # message_input (empty_string)
])
# Add Details tab components
if 'reasoning_display' in interface_components:
return_values.append(result[2]) # reasoning_data
if 'performance_display' in interface_components:
return_values.append(result[3]) # performance_data
if 'context_display' in interface_components:
return_values.append(result[4]) # context_data
if 'session_info' in interface_components:
return_values.append(result[5]) # session_id
if 'skills_tags' in interface_components:
return_values.append(skills_content) # skills_content
# Process Flow outputs - DISABLED
# Process flow information is now logged to container logs instead of UI
return tuple(return_values)
def process_message(message: str, history: Optional[List], session_id: Optional[str] = None, user_id: str = "Test_Any") -> tuple:
"""
Synchronous wrapper for async processing
Returns dynamic tuple based on available interface components
"""
import asyncio
# Use provided session_id or generate a new one
if not session_id:
session_id = str(uuid.uuid4())[:8]
try:
# Run async processing
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(process_message_async(message, history, session_id, user_id))
# Extract skills_html from result and determine visibility
skills_html = result[6]
skills_content, skills_visible = _update_skills_display(skills_html)
# Return dynamic values based on available components
return _build_dynamic_return_values(result, skills_content, _interface_components)
except Exception as e:
logger.error(f"Error in process_message: {e}", exc_info=True)
error_history = list(history) if history else []
error_history.append({"role": "user", "content": message})
error_history.append({"role": "assistant", "content": f"Error: {str(e)}"})
# Enhanced reasoning for sync error case
reasoning_data = {
"chain_of_thought": {
"step_1": {
"hypothesis": "Synchronous processing error",
"evidence": [f"Sync error: {str(e)[:100]}...", f"User input: {message[:50]}..."],
"confidence": 0.2,
"reasoning": "Synchronous wrapper error handling"
}
},
"alternative_paths": [],
"uncertainty_areas": [
{
"aspect": "Processing reliability",
"confidence": 0.2,
"mitigation": "Error logging and fallback response"
}
],
"evidence_sources": [],
"confidence_calibration": {"overall_confidence": 0.2, "sync_error": True}
}
# Return dynamic values for error case
error_result = (error_history, "", reasoning_data, {}, {}, session_id, "")
return _build_dynamic_return_values(error_result, "", _interface_components)
# Decorate the chat handler with GPU if available
if SPACES_GPU_AVAILABLE and GPU is not None:
@GPU # This decorator is detected by HF Spaces for ZeroGPU allocation
def gpu_chat_handler(message, history, user_id="Test_Any", session_text=""):
"""Handle chat messages with GPU support"""
# Extract session_id from session_text or generate new one
import re
match = re.search(r'Session: ([a-f0-9]+)', session_text) if session_text else None
session_id = match.group(1) if match else str(uuid.uuid4())[:8]
result = process_message(message, history, session_id, user_id)
# Return all 15 values directly
return result
def safe_gpu_chat_handler(message, history, user_id="Test_Any", session_text=""):
"""
Wrapper to catch any exceptions from GPU decorator cleanup phase.
This prevents exceptions during device release from propagating to Gradio UI.
"""
try:
# Call the GPU-decorated handler
return gpu_chat_handler(message, history, user_id, session_text)
except Exception as e:
# If decorator cleanup raises an exception, catch it and recompute result
# This is safe because the actual processing already completed successfully
logger.warning(
f"GPU decorator cleanup error caught (non-fatal): {e}. "
f"Recomputing result to avoid UI error propagation."
)
# Extract session_id from session_text or generate new one
import re
match = re.search(r'Session: ([a-f0-9]+)', session_text) if session_text else None
session_id = match.group(1) if match else str(uuid.uuid4())[:8]
# Recompute result without GPU decorator (safe fallback)
result = process_message(message, history, session_id, user_id)
return result
chat_handler_fn = safe_gpu_chat_handler
else:
def chat_handler_wrapper(message, history, user_id="Test_Any", session_text=""):
"""Wrapper to handle session ID - Process Flow functionality moved to logs"""
# Extract session_id from session_text or generate new one
import re
match = re.search(r'Session: ([a-f0-9]+)', session_text) if session_text else None
session_id = match.group(1) if match else str(uuid.uuid4())[:8]
result = process_message(message, history, session_id, user_id)
# Extract skills_html from result and determine visibility
skills_html = result[6]
skills_content, skills_visible = _update_skills_display(skills_html)
# Update session info with interaction count
try:
context_data = result[4]
# Get interaction count from context or increment
import sqlite3
import re
conn = sqlite3.connect("sessions.db")
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM interaction_contexts WHERE session_id = ?
""", (session_id,))
interaction_count = cursor.fetchone()[0]
conn.close()
except Exception:
interaction_count = 0
# Update session_info if available
updated_session_info = f"Session: {session_id} | User: {user_id} | Interactions: {interaction_count}"
# Log process flow information to container logs instead of UI
try:
# Extract data for process flow logging
reasoning_data = result[2]
performance_data = result[3]
context_data = result[4]
# Log comprehensive process flow information
logger.info("=" * 60)
logger.info("PROCESS FLOW LOGGING")
logger.info("=" * 60)
logger.info(f"Session ID: {session_id}")
logger.info(f"User ID: {user_id}")
logger.info(f"User Input: {message[:100]}...")
logger.info(f"Processing Time: {performance_data.get('processing_time', 0):.2f}s")
# Log intent recognition details
if reasoning_data.get("chain_of_thought"):
logger.info("Intent Recognition:")
logger.info(f" - Primary Intent: {reasoning_data.get('chain_of_thought', {}).get('step_1', {}).get('hypothesis', 'unknown')}")
logger.info(f" - Confidence: {reasoning_data.get('confidence_calibration', {}).get('overall_confidence', 0.7):.2f}")
# Log performance metrics
logger.info("Performance Metrics:")
logger.info(f" - Agent Trace: {performance_data.get('agent_trace', [])}")
logger.info(f" - Token Count: {performance_data.get('token_count', 0)}")
logger.info(f" - Confidence Score: {performance_data.get('confidence_score', 0.7):.2f}")
logger.info(f" - Agents Used: {performance_data.get('agents_used', [])}")
# Log context information
logger.info("Context Information:")
logger.info(f" - User ID: {user_id}")
logger.info(f" - Session ID: {session_id}")
logger.info(f" - Interaction ID: {context_data.get('interaction_id', 'unknown')}")
logger.info(f" - Interaction Count: {interaction_count}")
logger.info(f" - Timestamp: {context_data.get('timestamp', '')}")
logger.info(f" - Warnings: {context_data.get('warnings', [])}")
# Log skills identification if available
if skills_html and len(skills_html.strip()) > 0:
logger.info("Skills Identification:")
logger.info(f" - Skills HTML: {skills_html}")
logger.info("=" * 60)
logger.info("END PROCESS FLOW LOGGING")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Error logging process flow: {e}")
# Build return values with updated session info
return_values = list(_build_dynamic_return_values(result, skills_content, _interface_components))
# Update session_info in return values if present
if 'session_info' in _interface_components and len(return_values) > 2:
# Find session_info index in outputs
outputs_list = _build_outputs_list(_interface_components)
if 'session_info' in _interface_components:
try:
session_info_idx = outputs_list.index(_interface_components['session_info'])
if session_info_idx < len(return_values):
return_values[session_info_idx] = updated_session_info
except (ValueError, IndexError):
pass
return tuple(return_values)
chat_handler_fn = chat_handler_wrapper
# Initialize orchestrator on module load
def initialize_orchestrator():
"""Initialize the orchestration system with logging"""
global orchestrator
if not orchestrator_available:
logger.info("Orchestrator components not available, skipping initialization")
return
try:
logger.info("=" * 60)
logger.info("INITIALIZING ORCHESTRATION SYSTEM")
logger.info("=" * 60)
# Get HF token
hf_token = os.getenv('HF_TOKEN', '')
if not hf_token:
logger.warning("HF_TOKEN not found in environment")
# Initialize LLM Router
logger.info("Step 1/6: Initializing LLM Router...")
llm_router = LLMRouter(hf_token)
logger.info("βœ“ LLM Router initialized")
# Initialize Agents
logger.info("Step 2/6: Initializing Agents...")
agents = {
'intent_recognition': create_intent_agent(llm_router),
'response_synthesis': create_synthesis_agent(llm_router),
'safety_check': create_safety_agent(llm_router),
}
# Add skills identification agent
skills_agent = create_skills_identification_agent(llm_router)
agents['skills_identification'] = skills_agent
logger.info("βœ“ Skills identification agent initialized")
logger.info(f"βœ“ Initialized {len(agents)} agents")
# Initialize Context Manager (with LLM router for context generation)
logger.info("Step 3/6: Initializing Context Manager...")
context_manager = EfficientContextManager(llm_router=llm_router)
logger.info("βœ“ Context Manager initialized")
# Initialize Orchestrator
logger.info("Step 4/6: Initializing Orchestrator...")
orchestrator = MVPOrchestrator(llm_router, context_manager, agents)
logger.info("βœ“ Orchestrator initialized")
logger.info("=" * 60)
logger.info("ORCHESTRATION SYSTEM READY")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Failed to initialize orchestrator: {e}", exc_info=True)
orchestrator = None
# Try to initialize orchestrator
initialize_orchestrator()
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("STARTING APP")
logger.info("=" * 60)
demo, components = create_mobile_optimized_interface()
logger.info("βœ“ Interface created")
logger.info(f"Orchestrator available: {orchestrator is not None}")
# Launch the app
logger.info("=" * 60)
logger.info("LAUNCHING GRADIO APP")
logger.info("=" * 60)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)