HonestAI / flask_api_standalone.py
JatsTheAIGen's picture
Replace Novita AI with ZeroGPU Chat API (RunPod)
0747201
#!/usr/bin/env python3
"""
Pure Flask API for Hugging Face Spaces
No Gradio - Just Flask REST API
Uses local GPU models for inference
"""
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import logging
import sys
import os
import asyncio
from pathlib import Path
from logging.handlers import RotatingFileHandler
# Validate and set OMP_NUM_THREADS (must be valid integer)
omp_threads = os.getenv('OMP_NUM_THREADS', '4')
try:
omp_int = int(omp_threads)
if omp_int <= 0:
omp_int = 4
logger_basic = logging.getLogger(__name__)
logger_basic.warning("OMP_NUM_THREADS must be positive, defaulting to 4")
os.environ['OMP_NUM_THREADS'] = str(omp_int)
os.environ['MKL_NUM_THREADS'] = str(omp_int)
except (ValueError, TypeError):
os.environ['OMP_NUM_THREADS'] = '4'
os.environ['MKL_NUM_THREADS'] = '4'
logger_basic = logging.getLogger(__name__)
logger_basic.warning("Invalid OMP_NUM_THREADS, defaulting to 4")
# Setup secure logging
log_dir = os.getenv('LOG_DIR', '/tmp/logs')
try:
os.makedirs(log_dir, exist_ok=True, mode=0o700) # Secure permissions
except OSError:
# Fallback if /tmp/logs not writable
log_dir = os.path.expanduser('~/.logs') if os.path.expanduser('~') else '/tmp'
os.makedirs(log_dir, exist_ok=True)
# Configure logging with file rotation
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout) # Console output
]
)
logger = logging.getLogger(__name__)
# Add file handler with rotation (if log directory is writable)
try:
log_file = os.path.join(log_dir, 'app.log')
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
file_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)
# Set secure file permissions (Unix only)
if os.name != 'nt': # Not Windows
try:
os.chmod(log_file, 0o600)
except OSError:
pass # Ignore permission errors
logger.info(f"Logging to file: {log_file}")
except (OSError, PermissionError) as e:
logger.warning(f"Could not create log file: {e}. Using console logging only.")
# Sanitize sensitive data in logs
def sanitize_log_data(data):
"""Remove sensitive information from log data"""
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in ['token', 'password', 'secret', 'key', 'auth', 'api_key']):
sanitized[key] = '***REDACTED***'
else:
sanitized[key] = sanitize_log_data(value) if isinstance(value, (dict, list)) else value
return sanitized
elif isinstance(data, list):
return [sanitize_log_data(item) for item in data]
return data
# Add project root to path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# Create Flask app
app = Flask(__name__)
CORS(app) # Enable CORS for all origins
# Initialize rate limiter (use Redis in production for distributed systems)
rate_limit_enabled = os.getenv('RATE_LIMIT_ENABLED', 'true').lower() == 'true'
if rate_limit_enabled:
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour", "10 per minute"],
storage_uri="memory://", # Use Redis in production: "redis://localhost:6379"
headers_enabled=True
)
logger.info("Rate limiting enabled")
else:
limiter = None
logger.warning("Rate limiting disabled - NOT recommended for production")
# Add security headers middleware
@app.after_request
def set_security_headers(response):
"""
Add comprehensive security headers to all responses.
Implements OWASP-recommended security headers for enhanced protection
against common web vulnerabilities.
"""
# Essential security headers (already implemented)
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = "default-src 'self'"
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Additional security headers (Phase 1 enhancement)
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
response.headers['Cross-Origin-Resource-Policy'] = 'same-origin'
response.headers['Cross-Origin-Opener-Policy'] = 'same-origin'
response.headers['X-Permitted-Cross-Domain-Policies'] = 'none'
return response
# Global orchestrator
orchestrator = None
orchestrator_available = False
initialization_attempted = False
initialization_error = None
def initialize_orchestrator():
"""Initialize the AI orchestrator with ZeroGPU Chat API (RunPod)"""
global orchestrator, orchestrator_available, initialization_attempted, initialization_error
initialization_attempted = True
initialization_error = None
try:
logger.info("=" * 60)
logger.info("INITIALIZING AI ORCHESTRATOR (ZeroGPU Chat API - RunPod)")
logger.info("=" * 60)
from src.agents.intent_agent import create_intent_agent
from src.agents.synthesis_agent import create_synthesis_agent
from src.agents.safety_agent import create_safety_agent
from src.agents.skills_identification_agent import create_skills_identification_agent
from src.llm_router import LLMRouter
from src.orchestrator_engine import MVPOrchestrator
from src.context_manager import EfficientContextManager
logger.info("✓ Imports successful")
# Initialize LLM Router - ZeroGPU Chat API
logger.info("Initializing LLM Router (ZeroGPU Chat API)...")
try:
# Always use ZeroGPU Chat API (local models disabled)
llm_router = LLMRouter(hf_token=None, use_local_models=False)
logger.info("✓ LLM Router initialized (ZeroGPU Chat API)")
except Exception as e:
logger.error(f"❌ Failed to initialize LLM Router: {e}", exc_info=True)
logger.error("This is a critical error - ZeroGPU Chat API is required")
logger.error("Please ensure ZEROGPU_BASE_URL, ZEROGPU_EMAIL, and ZEROGPU_PASSWORD are set in environment variables")
raise
logger.info("Initializing Agents...")
try:
agents = {
'intent_recognition': create_intent_agent(llm_router),
'response_synthesis': create_synthesis_agent(llm_router),
'safety_check': create_safety_agent(llm_router),
'skills_identification': create_skills_identification_agent(llm_router)
}
logger.info("✓ All agents initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize agents: {e}", exc_info=True)
raise
logger.info("Initializing Context Manager...")
try:
context_manager = EfficientContextManager(llm_router=llm_router)
logger.info("✓ Context Manager initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize Context Manager: {e}", exc_info=True)
raise
logger.info("Initializing Orchestrator...")
try:
orchestrator = MVPOrchestrator(llm_router, context_manager, agents)
logger.info("✓ Orchestrator initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize Orchestrator: {e}", exc_info=True)
raise
orchestrator_available = True
logger.info("=" * 60)
logger.info("✓ AI ORCHESTRATOR READY")
logger.info(" - ZeroGPU Chat API enabled")
logger.info(" - MAX_WORKERS: 4")
logger.info("=" * 60)
return True
except ValueError as e:
# Handle configuration errors (e.g., missing ZeroGPU credentials)
if "ZEROGPU" in str(e) or "required" in str(e).lower():
logger.error("=" * 60)
logger.error("❌ CONFIGURATION ERROR")
logger.error("=" * 60)
logger.error(f"Error: {e}")
logger.error("")
logger.error("SOLUTION:")
logger.error("1. Set ZEROGPU_BASE_URL in environment variables (e.g., http://your-pod-ip:8000)")
logger.error("2. Set ZEROGPU_EMAIL in environment variables")
logger.error("3. Set ZEROGPU_PASSWORD in environment variables")
logger.error("4. Register your account first via the /register endpoint if needed")
logger.error("=" * 60)
orchestrator_available = False
initialization_error = f"Configuration Error: {str(e)}"
else:
raise
return False
except Exception as e:
logger.error("=" * 60)
logger.error("❌ FAILED TO INITIALIZE ORCHESTRATOR")
logger.error("=" * 60)
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error message: {str(e)}")
logger.error("=" * 60)
logger.error("Full traceback:", exc_info=True)
orchestrator_available = False
initialization_error = f"{type(e).__name__}: {str(e)}"
return False
# Root endpoint
@app.route('/', methods=['GET'])
def root():
"""API information"""
return jsonify({
'name': 'AI Assistant Flask API',
'version': '1.0',
'status': 'running',
'orchestrator_ready': orchestrator_available,
'features': {
'local_gpu_models': True,
'max_workers': 4,
'hardware': 'NVIDIA T4 Medium'
},
'endpoints': {
'health': 'GET /api/health',
'chat': 'POST /api/chat',
'initialize': 'POST /api/initialize',
'context_mode_get': 'GET /api/context/mode',
'context_mode_set': 'POST /api/context/mode'
}
})
# Health check
@app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint with detailed diagnostics"""
status = {
'status': 'healthy' if orchestrator_available else 'unhealthy',
'orchestrator_ready': orchestrator_available,
'initialization_attempted': initialization_attempted,
}
if not orchestrator_available:
if initialization_error:
status['error'] = initialization_error
status['message'] = 'Initialization failed. Check logs for details.'
elif initialization_attempted:
status['message'] = 'Initialization completed but orchestrator not available'
else:
status['message'] = 'Initialization not yet attempted'
status['help'] = 'Try POST /api/initialize to trigger initialization'
return jsonify(status)
# Chat endpoint
@app.route('/api/chat', methods=['POST'])
@limiter.limit("10 per minute") if limiter else lambda f: f # Rate limit: 10 requests per minute per IP
def chat():
"""
Process chat message
POST /api/chat
{
"message": "user message",
"history": [[user, assistant], ...],
"session_id": "session-123",
"user_id": "user-456"
}
Returns:
{
"success": true,
"message": "AI response",
"history": [...],
"reasoning": {...},
"performance": {...}
}
"""
try:
data = request.get_json()
if not data or 'message' not in data:
return jsonify({
'success': False,
'error': 'Message is required'
}), 400
message = data['message']
# Input validation
if not isinstance(message, str):
return jsonify({
'success': False,
'error': 'Message must be a string'
}), 400
# Strip whitespace and validate
message = message.strip()
if not message:
return jsonify({
'success': False,
'error': 'Message cannot be empty'
}), 400
# Length limit (allow larger inputs for complex queries)
MAX_MESSAGE_LENGTH = 100000 # 100KB limit (increased from 10KB)
if len(message) > MAX_MESSAGE_LENGTH:
return jsonify({
'success': False,
'error': f'Message too long. Maximum length is {MAX_MESSAGE_LENGTH} characters (approximately {MAX_MESSAGE_LENGTH // 4} tokens)'
}), 400
history = data.get('history', [])
session_id = data.get('session_id')
user_id = data.get('user_id', 'anonymous')
context_mode = data.get('context_mode') # Optional: 'fresh' or 'relevant'
logger.info(f"Chat request - User: {user_id}, Session: {session_id}")
logger.info(f"Message length: {len(message)} chars, preview: {message[:100]}...")
if not orchestrator_available or orchestrator is None:
logger.warning("Chat request received but orchestrator not ready")
return jsonify({
'success': False,
'error': 'Orchestrator not ready',
'message': 'AI system is initializing. Please try again in a moment.',
'help': 'If this persists, check logs for initialization errors or try POST /api/initialize'
}), 503
# Process with orchestrator (async method)
# Set user_id for session tracking
if session_id:
orchestrator.set_user_id(session_id, user_id)
# Set context mode if provided
if context_mode and hasattr(orchestrator.context_manager, 'set_context_mode'):
if context_mode in ['fresh', 'relevant']:
orchestrator.context_manager.set_context_mode(session_id, context_mode, user_id)
logger.info(f"Context mode set to '{context_mode}' for session {session_id}")
else:
logger.warning(f"Invalid context_mode '{context_mode}', ignoring. Use 'fresh' or 'relevant'")
# Run async process_request in event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
orchestrator.process_request(
session_id=session_id or f"session-{user_id}",
user_input=message
)
)
finally:
loop.close()
# Extract response
if isinstance(result, dict):
response_text = result.get('response', '') or result.get('final_response', '')
reasoning = result.get('reasoning', {})
performance = result.get('performance', {})
# ENHANCED: Log performance metrics for debugging
if performance:
logger.info("=" * 60)
logger.info("PERFORMANCE METRICS")
logger.info("=" * 60)
logger.info(f"Processing Time: {performance.get('processing_time', 0)}ms")
logger.info(f"Tokens Used: {performance.get('tokens_used', 0)}")
logger.info(f"Agents Used: {performance.get('agents_used', 0)}")
logger.info(f"Confidence Score: {performance.get('confidence_score', 0)}%")
agent_contribs = performance.get('agent_contributions', [])
if agent_contribs:
logger.info("Agent Contributions:")
for contrib in agent_contribs:
logger.info(f" - {contrib.get('agent', 'Unknown')}: {contrib.get('percentage', 0)}%")
logger.info(f"Safety Score: {performance.get('safety_score', 0)}%")
logger.info("=" * 60)
else:
logger.warning("⚠️ No performance metrics in response!")
logger.debug(f"Result keys: {list(result.keys())}")
logger.debug(f"Result metadata keys: {list(result.get('metadata', {}).keys())}")
# Try to extract from metadata as fallback
metadata = result.get('metadata', {})
if 'performance_metrics' in metadata:
performance = metadata['performance_metrics']
logger.info("✓ Found performance metrics in metadata")
else:
response_text = str(result)
reasoning = {}
performance = {
"processing_time": 0,
"tokens_used": 0,
"agents_used": 0,
"confidence_score": 0,
"agent_contributions": [],
"safety_score": 80,
"error": "Response format error"
}
updated_history = history + [[message, response_text]]
logger.info(f"✓ Response generated (length: {len(response_text)})")
return jsonify({
'success': True,
'message': response_text,
'history': updated_history,
'reasoning': reasoning,
'performance': performance
})
except Exception as e:
logger.error(f"Chat error: {e}", exc_info=True)
return jsonify({
'success': False,
'error': str(e),
'message': 'Error processing your request. Please try again.'
}), 500
# Manual initialization endpoint
@app.route('/api/initialize', methods=['POST'])
@limiter.limit("5 per minute") if limiter else lambda f: f # Rate limit: 5 requests per minute per IP
def initialize():
"""Manually trigger initialization"""
success = initialize_orchestrator()
if success:
return jsonify({
'success': True,
'message': 'Orchestrator initialized successfully'
})
else:
return jsonify({
'success': False,
'message': 'Initialization failed. Check logs for details.'
}), 500
# Context mode management endpoints
@app.route('/api/context/mode', methods=['GET'])
def get_context_mode():
"""
Get current context mode for a session
GET /api/context/mode?session_id=session-123
Returns:
{
"success": true,
"session_id": "session-123",
"context_mode": "fresh" | "relevant",
"description": {
"fresh": "No user context included - starts fresh each time",
"relevant": "Only relevant user context included based on relevance classification"
}
}
"""
try:
session_id = request.args.get('session_id')
if not session_id:
return jsonify({
'success': False,
'error': 'session_id query parameter is required'
}), 400
if not orchestrator_available or orchestrator is None:
return jsonify({
'success': False,
'error': 'Orchestrator not ready'
}), 503
if not hasattr(orchestrator.context_manager, 'get_context_mode'):
return jsonify({
'success': False,
'error': 'Context mode not available'
}), 503
context_mode = orchestrator.context_manager.get_context_mode(session_id)
return jsonify({
'success': True,
'session_id': session_id,
'context_mode': context_mode,
'description': {
'fresh': 'No user context included - starts fresh each time',
'relevant': 'Only relevant user context included based on relevance classification'
}
})
except Exception as e:
logger.error(f"Get context mode error: {e}", exc_info=True)
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/api/context/mode', methods=['POST'])
def set_context_mode():
"""
Set context mode for a session
POST /api/context/mode
{
"session_id": "session-123",
"mode": "fresh" | "relevant",
"user_id": "user-456" (optional)
}
Returns:
{
"success": true,
"session_id": "session-123",
"context_mode": "fresh" | "relevant",
"message": "Context mode set successfully"
}
"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Request body is required'
}), 400
session_id = data.get('session_id')
mode = data.get('mode')
user_id = data.get('user_id', 'anonymous')
if not session_id:
return jsonify({
'success': False,
'error': 'session_id is required'
}), 400
if not mode:
return jsonify({
'success': False,
'error': 'mode is required'
}), 400
if mode not in ['fresh', 'relevant']:
return jsonify({
'success': False,
'error': "mode must be 'fresh' or 'relevant'"
}), 400
if not orchestrator_available or orchestrator is None:
return jsonify({
'success': False,
'error': 'Orchestrator not ready'
}), 503
if not hasattr(orchestrator.context_manager, 'set_context_mode'):
return jsonify({
'success': False,
'error': 'Context mode not available'
}), 503
success = orchestrator.context_manager.set_context_mode(session_id, mode, user_id)
if success:
return jsonify({
'success': True,
'session_id': session_id,
'context_mode': mode,
'message': 'Context mode set successfully'
})
else:
return jsonify({
'success': False,
'error': 'Failed to set context mode'
}), 500
except Exception as e:
logger.error(f"Set context mode error: {e}", exc_info=True)
return jsonify({
'success': False,
'error': str(e)
}), 500
# Initialize orchestrator on module import (for Gunicorn compatibility)
# This ensures initialization happens even when running via Gunicorn
logger.info("=" * 60)
logger.info("FLASK API MODULE LOADED")
logger.info("=" * 60)
logger.info("Initializing orchestrator on module import...")
# Use a flag to prevent multiple simultaneous initializations
import threading
_init_lock = threading.Lock()
_init_started = False
def _start_initialization():
"""Start initialization in background thread to avoid blocking"""
global _init_started
with _init_lock:
if _init_started:
return
_init_started = True
def init_worker():
try:
logger.info("Background initialization thread started")
initialize_orchestrator()
except Exception as e:
logger.error(f"Background initialization failed: {e}", exc_info=True)
# Start initialization in background thread
init_thread = threading.Thread(target=init_worker, daemon=True, name="OrchInit")
init_thread.start()
logger.info("Initialization thread started (non-blocking)")
# Start initialization when module is imported
_start_initialization()
# Initialize on startup (for direct execution)
if __name__ == '__main__':
logger.info("=" * 60)
logger.info("STARTING PURE FLASK API (Direct Execution)")
logger.info("=" * 60)
# Wait a moment for background initialization if it hasn't completed
import time
if not orchestrator_available:
logger.info("Waiting for background initialization to complete...")
for i in range(30): # Wait up to 30 seconds
if orchestrator_available:
break
time.sleep(1)
if i % 5 == 0:
logger.info(f"Still waiting... ({i}s)")
if not orchestrator_available:
logger.warning("Orchestrator not ready after wait, attempting direct initialization...")
initialize_orchestrator()
port = int(os.getenv('PORT', 7860))
logger.info(f"Starting Flask on port {port}")
logger.info("Endpoints available:")
logger.info(" GET /")
logger.info(" GET /api/health")
logger.info(" POST /api/chat")
logger.info(" POST /api/initialize")
logger.info(" GET /api/context/mode")
logger.info(" POST /api/context/mode")
logger.info("=" * 60)
# Development mode only - Use Gunicorn for production
logger.warning("⚠️ Using Flask development server - NOT for production!")
logger.warning("⚠️ Use Gunicorn for production: gunicorn flask_api_standalone:app")
logger.info("=" * 60)
app.run(
host='0.0.0.0',
port=port,
debug=False,
threaded=True # Enable threading for concurrent requests
)