# context_manager.py import sqlite3 import json import logging import uuid import hashlib import threading import time import os from pathlib import Path from contextlib import contextmanager from datetime import datetime, timedelta from typing import Dict, Optional, List logger = logging.getLogger(__name__) class TransactionManager: """Manage database transactions with proper locking""" def __init__(self, db_path): self.db_path = db_path self._lock = threading.RLock() self._connections = {} @contextmanager def transaction(self, session_id=None): """Context manager for database transactions with automatic rollback""" conn = None cursor = None try: with self._lock: conn = sqlite3.connect(self.db_path, isolation_level='IMMEDIATE') conn.execute('PRAGMA journal_mode=WAL') # Write-Ahead Logging for better concurrency conn.execute('PRAGMA busy_timeout=5000') # 5 second timeout for locks cursor = conn.cursor() yield cursor conn.commit() logger.debug(f"Transaction committed for session {session_id}") except Exception as e: if conn: conn.rollback() logger.error(f"Transaction rolled back for session {session_id}: {e}") raise finally: if conn: conn.close() class EfficientContextManager: def __init__(self, llm_router=None, db_path=None): self.session_cache = {} # In-memory for active sessions self._session_cache = {} # Enhanced in-memory cache with timestamps self.cache_config = { "max_session_size": 10, # MB per session "ttl": 3600, # 1 hour "compression": "gzip", "eviction_policy": "LRU" } # Use provided db_path or get from config/env, default to /tmp for Docker if db_path is None: try: from config import settings db_path = settings.db_path except (ImportError, AttributeError): # Fallback: check environment variable or use /tmp # os is already imported at top of file db_path = os.getenv("DB_PATH", "/tmp/sessions.db") self.db_path = db_path # Ensure directory exists db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) logger.info(f"Created database directory: {db_dir}") self.llm_router = llm_router # For generating context summaries logger.info(f"Initializing ContextManager with DB path: {self.db_path}") self.transaction_manager = TransactionManager(self.db_path) self._init_database() self.optimize_database_indexes() def _init_database(self): """Initialize database and create tables""" try: logger.info("Initializing database...") conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create sessions table if not exists cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, user_id TEXT DEFAULT 'Test_Any', created_at TIMESTAMP, last_activity TIMESTAMP, context_data TEXT, user_metadata TEXT ) """) # Add user_id column to existing sessions table if it doesn't exist try: cursor.execute("ALTER TABLE sessions ADD COLUMN user_id TEXT DEFAULT 'Test_Any'") logger.info("✓ Added user_id column to sessions table") except sqlite3.OperationalError: # Column already exists pass logger.info("✓ Sessions table ready") # Create interactions table cursor.execute(""" CREATE TABLE IF NOT EXISTS interactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT REFERENCES sessions(session_id), user_input TEXT, context_snapshot TEXT, created_at TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(session_id) ) """) logger.info("✓ Interactions table ready") # Create user_contexts table (persistent user persona summaries) cursor.execute(""" CREATE TABLE IF NOT EXISTS user_contexts ( user_id TEXT PRIMARY KEY, persona_summary TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) logger.info("✓ User contexts table ready") # Create session_contexts table (session summaries) cursor.execute(""" CREATE TABLE IF NOT EXISTS session_contexts ( session_id TEXT PRIMARY KEY, user_id TEXT, session_summary TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(session_id), FOREIGN KEY(user_id) REFERENCES user_contexts(user_id) ) """) logger.info("✓ Session contexts table ready") # Create interaction_contexts table (individual interaction summaries) cursor.execute(""" CREATE TABLE IF NOT EXISTS interaction_contexts ( interaction_id TEXT PRIMARY KEY, session_id TEXT, user_input TEXT, system_response TEXT, interaction_summary TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(session_id) ) """) logger.info("✓ Interaction contexts table ready") conn.commit() conn.close() # Update schema with new columns and tables for user change tracking self._update_database_schema() logger.info("Database initialization complete") except Exception as e: logger.error(f"Database initialization error: {e}", exc_info=True) def _update_database_schema(self): """Add missing columns and tables for user change tracking""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Add needs_refresh column to interaction_contexts try: cursor.execute(""" ALTER TABLE interaction_contexts ADD COLUMN needs_refresh INTEGER DEFAULT 0 """) logger.info("✓ Added needs_refresh column to interaction_contexts") except sqlite3.OperationalError: pass # Column already exists # Create user change log table cursor.execute(""" CREATE TABLE IF NOT EXISTS user_change_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, old_user_id TEXT, new_user_id TEXT, timestamp TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(session_id) ) """) conn.commit() conn.close() logger.info("✓ Database schema updated successfully for user change tracking") # Update interactions table for deduplication self._update_interactions_table() except Exception as e: logger.error(f"Schema update error: {e}", exc_info=True) def _update_interactions_table(self): """Add interaction_hash column for deduplication""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check if column already exists cursor.execute("PRAGMA table_info(interactions)") columns = [row[1] for row in cursor.fetchall()] # Add interaction_hash column if it doesn't exist if 'interaction_hash' not in columns: try: cursor.execute(""" ALTER TABLE interactions ADD COLUMN interaction_hash TEXT """) logger.info("✓ Added interaction_hash column to interactions table") except sqlite3.OperationalError: pass # Column already exists # Create unique index for deduplication (this enforces uniqueness) try: cursor.execute(""" CREATE UNIQUE INDEX IF NOT EXISTS idx_interaction_hash_unique ON interactions(interaction_hash) """) logger.info("✓ Created unique index on interaction_hash") except sqlite3.OperationalError: # Index might already exist, try non-unique index as fallback cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_interaction_hash ON interactions(interaction_hash) """) conn.commit() conn.close() logger.info("✓ Interactions table updated for deduplication") except Exception as e: logger.error(f"Error updating interactions table: {e}", exc_info=True) async def manage_context(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict: """ Efficient context management with separated session/user caching STEP 1: Fetch User Context (if available) STEP 2: Get Previous Interaction Contexts STEP 3: Combine for workflow use """ # Use session-only cache key to prevent user_id conflicts session_cache_key = f"session_{session_id}" user_cache_key = f"user_{user_id}" # Get session context from cache session_context = self._get_from_memory_cache(session_cache_key) # Check if cached session context matches current user_id # Handle both old and new cache formats cached_entry = self.session_cache.get(session_cache_key) if cached_entry: # Extract actual context from cache entry if isinstance(cached_entry, dict) and 'value' in cached_entry: actual_context = cached_entry.get('value', {}) else: actual_context = cached_entry if actual_context and actual_context.get("user_id") != user_id: # User changed, invalidate session cache logger.info(f"User mismatch in cache for session {session_id}, invalidating cache") session_context = None if session_cache_key in self.session_cache: del self.session_cache[session_cache_key] else: session_context = actual_context # Get user context separately user_context = self._get_from_memory_cache(user_cache_key) if not session_context: # Retrieve from database with user context session_context = await self._retrieve_from_db(session_id, user_input, user_id) # Step 2: Cache session context with TTL self.add_context_cache(session_cache_key, session_context, ttl=self.cache_config.get("ttl", 3600)) # Handle user context separately - load only once and cache thereafter # Cache does not refer to database after initial load if not user_context or not user_context.get("user_context_loaded"): user_context_data = await self.get_user_context(user_id) user_context = { "user_context": user_context_data, "user_context_loaded": True, "user_id": user_id } # Cache user context separately - this is the only database query for user context self._warm_memory_cache(user_cache_key, user_context) logger.debug(f"User context loaded once for {user_id} and cached") else: # User context already cached, use it without database query logger.debug(f"Using cached user context for {user_id}") # Merge contexts without duplication merged_context = { **session_context, "user_context": user_context.get("user_context", ""), "user_context_loaded": True, "user_id": user_id # Ensure current user_id is used } # Update context with new interaction updated_context = self._update_context(merged_context, user_input, user_id=user_id) return self._optimize_context(updated_context) async def get_user_context(self, user_id: str) -> str: """ STEP 1: Fetch or generate User Context (500-token persona summary) Available for all interactions except first time per user """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check if user context exists cursor.execute(""" SELECT persona_summary FROM user_contexts WHERE user_id = ? """, (user_id,)) row = cursor.fetchone() if row and row[0]: # Existing user context found conn.close() logger.info(f"✓ User context loaded for {user_id}") return row[0] # Generate new user context from all historical data logger.info(f"Generating new user context for {user_id}") # Fetch all historical Session and Interaction contexts for this user all_session_summaries = [] all_interaction_summaries = [] # Get all session contexts cursor.execute(""" SELECT session_summary FROM session_contexts WHERE user_id = ? ORDER BY created_at DESC LIMIT 50 """, (user_id,)) for row in cursor.fetchall(): if row[0]: all_session_summaries.append(row[0]) # Get all interaction contexts cursor.execute(""" SELECT ic.interaction_summary FROM interaction_contexts ic JOIN sessions s ON ic.session_id = s.session_id WHERE s.user_id = ? ORDER BY ic.created_at DESC LIMIT 100 """, (user_id,)) for row in cursor.fetchall(): if row[0]: all_interaction_summaries.append(row[0]) conn.close() if not all_session_summaries and not all_interaction_summaries: # First time user - no context to generate logger.info(f"No historical data for {user_id} - first time user") return "" # Generate persona summary using LLM (500 tokens) historical_data = "\n\n".join(all_session_summaries + all_interaction_summaries[:20]) if self.llm_router: prompt = f"""Generate a concise 500-token persona summary for user {user_id} based on their interaction history: Historical Context: {historical_data} Create a persona summary that captures: - Communication style and preferences - Common topics and interests - Interaction patterns - Key information shared across sessions Keep the summary concise and focused (approximately 500 tokens).""" try: persona_summary = await self.llm_router.route_inference( task_type="general_reasoning", prompt=prompt, max_tokens=500, temperature=0.7 ) if persona_summary and isinstance(persona_summary, str) and persona_summary.strip(): # Store in database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO user_contexts (user_id, persona_summary, updated_at) VALUES (?, ?, ?) """, (user_id, persona_summary.strip(), datetime.now().isoformat())) conn.commit() conn.close() logger.info(f"✓ Generated and stored user context for {user_id}") return persona_summary.strip() except Exception as e: logger.error(f"Error generating user context: {e}", exc_info=True) # Fallback: Return empty if LLM fails logger.warning(f"Could not generate user context for {user_id} - using empty") return "" except Exception as e: logger.error(f"Error getting user context: {e}", exc_info=True) return "" async def generate_interaction_context(self, interaction_id: str, session_id: str, user_input: str, system_response: str, user_id: str = "Test_Any") -> str: """ STEP 2: Generate Interaction Context (50-token summary) Called after each response """ try: if not self.llm_router: return "" # Use full user input for context generation (not truncated in prompt) # Only truncate for display in prompt if extremely long user_input_preview = user_input[:500] if len(user_input) > 500 else user_input prompt = f"""Summarize this interaction in approximately 50 tokens: User Input: {user_input_preview} System Response: {system_response[:500]} Provide a brief summary capturing the key exchange.""" try: summary = await self.llm_router.route_inference( task_type="general_reasoning", prompt=prompt, max_tokens=50, temperature=0.7 ) if summary and isinstance(summary, str) and summary.strip(): # Store in database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() created_at = datetime.now().isoformat() cursor.execute(""" INSERT OR REPLACE INTO interaction_contexts (interaction_id, session_id, user_input, system_response, interaction_summary, created_at) VALUES (?, ?, ?, ?, ?, ?) """, ( interaction_id, session_id, user_input[:5000], # Increased from 500 to 5000 characters system_response[:2000], # Increased from 1000 to 2000 summary.strip(), created_at )) conn.commit() conn.close() # Update cache immediately with new interaction context # This ensures cache is synchronized with database at the same time self._update_cache_with_interaction_context(session_id, summary.strip(), created_at) logger.info(f"✓ Generated interaction context for {interaction_id} and updated cache") return summary.strip() except Exception as e: logger.error(f"Error generating interaction context: {e}", exc_info=True) # Fallback on LLM failure return "" except Exception as e: logger.error(f"Error in generate_interaction_context: {e}", exc_info=True) return "" async def generate_session_context(self, session_id: str, user_id: str = "Test_Any") -> str: """ Generate Session Context (100-token summary) at every turn Uses cached interaction contexts instead of querying database Updates both database and cache immediately """ try: # Get interaction contexts from cache (no database query) session_cache_key = f"session_{session_id}" cached_context = self.session_cache.get(session_cache_key) if not cached_context: logger.warning(f"No cached context found for session {session_id}, cannot generate session context") return "" interaction_contexts = cached_context.get('interaction_contexts', []) if not interaction_contexts: logger.info(f"No interaction contexts available for session {session_id} to summarize") return "" # Use cached interaction contexts (from cache, not database) interaction_summaries = [ic.get('summary', '') for ic in interaction_contexts if ic.get('summary')] if not interaction_summaries: logger.info(f"No interaction summaries available for session {session_id}") return "" # Generate session summary using LLM (100 tokens) if self.llm_router: combined_context = "\n".join(interaction_summaries) prompt = f"""Summarize this session's interactions in approximately 100 tokens: Interaction Summaries: {combined_context} Create a concise session summary capturing: - Main topics discussed - Key outcomes or information shared - User's focus areas Keep the summary concise (approximately 100 tokens).""" try: session_summary = await self.llm_router.route_inference( task_type="general_reasoning", prompt=prompt, max_tokens=100, temperature=0.7 ) if session_summary and isinstance(session_summary, str) and session_summary.strip(): # Store in database created_at = datetime.now().isoformat() conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO session_contexts (session_id, user_id, session_summary, created_at) VALUES (?, ?, ?, ?) """, (session_id, user_id, session_summary.strip(), created_at)) conn.commit() conn.close() # Update cache immediately with new session context # This ensures cache is synchronized with database at the same time self._update_cache_with_session_context(session_id, session_summary.strip(), created_at) logger.info(f"✓ Generated session context for {session_id} and updated cache") return session_summary.strip() except Exception as e: logger.error(f"Error generating session context: {e}", exc_info=True) # Fallback on LLM failure return "" except Exception as e: logger.error(f"Error in generate_session_context: {e}", exc_info=True) return "" async def end_session(self, session_id: str, user_id: str = "Test_Any"): """ End session and clear cache Note: Session context is already generated at every turn, so this just clears cache """ try: # Session context is already generated at every turn (no need to regenerate) # Clear in-memory cache for this session (session-only key) session_cache_key = f"session_{session_id}" if session_cache_key in self.session_cache: del self.session_cache[session_cache_key] logger.info(f"✓ Cleared cache for session {session_id}") except Exception as e: logger.error(f"Error ending session: {e}", exc_info=True) def _clear_user_cache_on_change(self, session_id: str, new_user_id: str, old_user_id: str): """Clear cache entries when user changes""" if new_user_id != old_user_id: # Clear old composite cache keys old_cache_key = f"{session_id}_{old_user_id}" if old_cache_key in self.session_cache: del self.session_cache[old_cache_key] logger.info(f"Cleared old cache for user {old_user_id} on session {session_id}") def _optimize_context(self, context: dict, relevance_classification: Optional[Dict] = None) -> dict: """ Optimize context for LLM consumption with relevance filtering support Format: [Session Context] + [User Context (conditional)] + [Interaction Context #N, #N-1, ...] Args: context: Base context dictionary relevance_classification: Optional relevance classification results with dynamic user context Applies smart pruning before formatting. """ # Step 4: Prune context if it exceeds token limits (uses config threshold) pruned_context = self.prune_context(context) # Get context mode (fresh or relevant) session_id = pruned_context.get("session_id") context_mode = self.get_context_mode(session_id) interaction_contexts = pruned_context.get("interaction_contexts", []) session_context = pruned_context.get("session_context", {}) session_summary = session_context.get("summary", "") if isinstance(session_context, dict) else "" # MODIFIED: Conditional user context inclusion based on mode and relevance user_context = "" if context_mode == 'relevant' and relevance_classification: # Use dynamic relevant summaries from relevance classification user_context = relevance_classification.get('combined_user_context', '') if user_context: logger.info( f"Using dynamic relevant context: {len(relevance_classification.get('relevant_summaries', []))} " f"sessions summarized for session {session_id}" ) elif context_mode == 'relevant' and not relevance_classification: # Fallback: Use traditional user context if relevance classification unavailable user_context = pruned_context.get("user_context", "") logger.debug(f"Relevant mode but no classification, using traditional user context") # If context_mode == 'fresh', user_context remains empty (no user context) # Format interaction contexts as requested formatted_interactions = [] for idx, ic in enumerate(interaction_contexts[:10]): # Last 10 interactions formatted_interactions.append(f"[Interaction Context #{len(interaction_contexts) - idx}]\n{ic.get('summary', '')}") # Combine Session Context + (Conditional) User Context + Interaction Contexts combined_context = "" if session_summary: combined_context += f"[Session Context]\n{session_summary}\n\n" # Include user context only if available and in relevant mode if user_context: context_label = "[Relevant User Context]" if context_mode == 'relevant' else "[User Context]" combined_context += f"{context_label}\n{user_context}\n\n" if formatted_interactions: combined_context += "\n\n".join(formatted_interactions) return { "session_id": pruned_context.get("session_id"), "user_id": pruned_context.get("user_id", "Test_Any"), "user_context": user_context, # Dynamic summaries OR empty "session_context": session_context, "interaction_contexts": interaction_contexts, "combined_context": combined_context, "context_mode": context_mode, # Include mode for debugging "relevance_metadata": relevance_classification.get('relevance_scores', {}) if relevance_classification else {}, "preferences": pruned_context.get("preferences", {}), "active_tasks": pruned_context.get("active_tasks", []), "last_activity": pruned_context.get("last_activity") } def _get_from_memory_cache(self, cache_key: str) -> dict: """ Retrieve context from in-memory session cache with expiration check """ cached = self.session_cache.get(cache_key) if not cached: return None # Check if it's the new format with expiration if isinstance(cached, dict) and 'value' in cached: # New format with TTL if self._is_cache_expired(cached): # Remove expired cache entry del self.session_cache[cache_key] logger.debug(f"Cache expired for key: {cache_key}") return None return cached.get('value') else: # Old format (direct value) - return as-is for backward compatibility return cached def _is_cache_expired(self, cache_entry: dict) -> bool: """ Check if cache entry has expired based on TTL """ if not isinstance(cache_entry, dict): return True expires = cache_entry.get('expires') if not expires: return False # No expiration set, consider valid return time.time() > expires def add_context_cache(self, key: str, value: dict, ttl: int = 3600): """ Step 2: Implement Context Caching with TTL expiration Add context to cache with expiration time. Args: key: Cache key value: Value to cache (dict) ttl: Time to live in seconds (default 3600 = 1 hour) """ import time self.session_cache[key] = { 'value': value, 'expires': time.time() + ttl, 'timestamp': time.time() } logger.debug(f"Cached context for key: {key} with TTL: {ttl}s") def get_token_count(self, text: str) -> int: """ Approximate token count for text (4 characters ≈ 1 token) Args: text: Text to count tokens for Returns: Approximate token count """ if not text: return 0 # Simple approximation: 4 characters per token return len(text) // 4 def prune_context(self, context: dict, max_tokens: Optional[int] = None) -> dict: """ Step 4: Implement Smart Context Pruning with configurable threshold Prune context to stay within token limit while keeping most recent and relevant content. Args: context: Context dictionary to prune max_tokens: Maximum token count (uses config default if None) Returns: Pruned context dictionary """ # Use config threshold if not provided if max_tokens is None: try: from .config import get_settings settings = get_settings() max_tokens = settings.context_pruning_threshold logger.debug(f"Using config pruning threshold: {max_tokens} tokens") except Exception: max_tokens = 2000 # Fallback to default logger.warning("Could not load config, using default pruning threshold: 2000") try: # Calculate current token count current_tokens = self._calculate_context_tokens(context) if current_tokens <= max_tokens: return context # No pruning needed logger.info(f"Context token count ({current_tokens}) exceeds limit ({max_tokens}), pruning...") # Create a copy to avoid modifying original pruned_context = context.copy() # Priority: Keep most recent interactions + session context + user context interaction_contexts = pruned_context.get('interaction_contexts', []) session_context = pruned_context.get('session_context', {}) user_context = pruned_context.get('user_context', '') # Keep user context and session context (essential) essential_tokens = ( self.get_token_count(user_context) + self.get_token_count(str(session_context)) ) # Calculate how many interaction contexts we can keep available_tokens = max_tokens - essential_tokens if available_tokens < 0: # Essential context itself is too large - summarize user context if self.get_token_count(user_context) > max_tokens // 2: pruned_context['user_context'] = user_context[:max_tokens * 2] # Rough cut logger.warning(f"User context too large, truncated") return pruned_context # Keep most recent interactions that fit in token budget kept_interactions = [] current_size = 0 for interaction in interaction_contexts: summary = interaction.get('summary', '') interaction_tokens = self.get_token_count(summary) if current_size + interaction_tokens <= available_tokens: kept_interactions.append(interaction) current_size += interaction_tokens else: break # Can't fit any more pruned_context['interaction_contexts'] = kept_interactions logger.info(f"Pruned context: kept {len(kept_interactions)}/{len(interaction_contexts)} interactions, " f"reduced from {current_tokens} to {self._calculate_context_tokens(pruned_context)} tokens") return pruned_context except Exception as e: logger.error(f"Error pruning context: {e}", exc_info=True) return context # Return original on error def _calculate_context_tokens(self, context: dict) -> int: """Calculate total token count for context""" total = 0 # Count tokens in each component user_context = context.get('user_context', '') total += self.get_token_count(str(user_context)) session_context = context.get('session_context', {}) if isinstance(session_context, dict): total += self.get_token_count(str(session_context.get('summary', ''))) else: total += self.get_token_count(str(session_context)) interaction_contexts = context.get('interaction_contexts', []) for interaction in interaction_contexts: summary = interaction.get('summary', '') total += self.get_token_count(str(summary)) return total async def _retrieve_from_db(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict: """ Retrieve session context with proper user_id synchronization Uses transactions to ensure atomic updates of database and cache """ conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Use transaction to ensure atomic updates cursor.execute("BEGIN TRANSACTION") # Get session data (SQLite doesn't support FOR UPDATE, but transaction ensures consistency) cursor.execute(""" SELECT context_data, user_metadata, last_activity, user_id FROM sessions WHERE session_id = ? """, (session_id,)) row = cursor.fetchone() if row: context_data = json.loads(row[0]) if row[0] else {} user_metadata = json.loads(row[1]) if row[1] else {} last_activity = row[2] session_user_id = row[3] if len(row) > 3 else user_id # Check for user_id change and update atomically user_changed = False if session_user_id != user_id: logger.info(f"User change detected: {session_user_id} -> {user_id} for session {session_id}") user_changed = True # Update session with new user_id cursor.execute(""" UPDATE sessions SET user_id = ?, last_activity = ? WHERE session_id = ? """, (user_id, datetime.now().isoformat(), session_id)) # Clear any cached interaction contexts for old user by marking for refresh try: cursor.execute(""" UPDATE interaction_contexts SET needs_refresh = 1 WHERE session_id = ? """, (session_id,)) except sqlite3.OperationalError: # Column might not exist yet, will be created by schema update pass # Log user change event try: cursor.execute(""" INSERT INTO user_change_log (session_id, old_user_id, new_user_id, timestamp) VALUES (?, ?, ?, ?) """, (session_id, session_user_id, user_id, datetime.now().isoformat())) except sqlite3.OperationalError: # Table might not exist yet, will be created by schema update pass # Clear old cache entries when user changes self._clear_user_cache_on_change(session_id, user_id, session_user_id) cursor.execute("COMMIT") # Get interaction contexts with refresh flag check try: cursor.execute(""" SELECT interaction_summary, created_at, needs_refresh FROM interaction_contexts WHERE session_id = ? AND (needs_refresh IS NULL OR needs_refresh = 0) ORDER BY created_at DESC LIMIT 20 """, (session_id,)) except sqlite3.OperationalError: # Column might not exist yet, fall back to query without needs_refresh cursor.execute(""" SELECT interaction_summary, created_at FROM interaction_contexts WHERE session_id = ? ORDER BY created_at DESC LIMIT 20 """, (session_id,)) interaction_contexts = [] for ic_row in cursor.fetchall(): # Handle both query formats (with and without needs_refresh) if len(ic_row) >= 2: summary = ic_row[0] timestamp = ic_row[1] needs_refresh = ic_row[2] if len(ic_row) > 2 else 0 if summary and not needs_refresh: interaction_contexts.append({ "summary": summary, "timestamp": timestamp }) # Get session context from database session_context_data = None try: cursor.execute(""" SELECT session_summary, created_at FROM session_contexts WHERE session_id = ? ORDER BY created_at DESC LIMIT 1 """, (session_id,)) sc_row = cursor.fetchone() if sc_row and sc_row[0]: session_context_data = { "summary": sc_row[0], "timestamp": sc_row[1] } except sqlite3.OperationalError: # Table might not exist yet pass context = { "session_id": session_id, "user_id": user_id, "interaction_contexts": interaction_contexts, "session_context": session_context_data, "preferences": user_metadata.get("preferences", {}), "active_tasks": user_metadata.get("active_tasks", []), "last_activity": last_activity, "user_context_loaded": False, "user_changed": user_changed } conn.close() return context else: # Create new session with transaction cursor.execute(""" INSERT INTO sessions (session_id, user_id, created_at, last_activity, context_data, user_metadata) VALUES (?, ?, ?, ?, ?, ?) """, (session_id, user_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}")) cursor.execute("COMMIT") conn.close() return { "session_id": session_id, "user_id": user_id, "interaction_contexts": [], "session_context": None, "preferences": {}, "active_tasks": [], "user_context_loaded": False, "user_changed": False } except sqlite3.Error as e: logger.error(f"Database transaction error: {e}", exc_info=True) if conn: try: conn.rollback() except: pass conn.close() # Return safe fallback return { "session_id": session_id, "user_id": user_id, "interaction_contexts": [], "session_context": None, "preferences": {}, "active_tasks": [], "user_context_loaded": False, "error": str(e), "user_changed": False } except Exception as e: logger.error(f"Database retrieval error: {e}", exc_info=True) if conn: try: conn.rollback() except: pass conn.close() # Return safe fallback return { "session_id": session_id, "user_id": user_id, "interaction_contexts": [], "session_context": None, "preferences": {}, "active_tasks": [], "user_context_loaded": False, "error": str(e), "user_changed": False } def _warm_memory_cache(self, cache_key: str, context: dict): """ Warm the in-memory cache with retrieved context Note: Use add_context_cache() instead for TTL support """ # Use add_context_cache for consistency with TTL self.add_context_cache(cache_key, context, ttl=self.cache_config.get("ttl", 3600)) def _update_cache_with_interaction_context(self, session_id: str, interaction_summary: str, created_at: str): """ Update cache with new interaction context immediately after database update This keeps cache synchronized with database without requiring database queries """ session_cache_key = f"session_{session_id}" # Get current cached context if it exists cached_context = self.session_cache.get(session_cache_key) if cached_context: # Add new interaction context to the beginning of the list (most recent first) interaction_contexts = cached_context.get('interaction_contexts', []) new_interaction = { "summary": interaction_summary, "timestamp": created_at } # Insert at beginning and keep only last 20 (matches DB query limit) interaction_contexts.insert(0, new_interaction) interaction_contexts = interaction_contexts[:20] # Update cached context with new interaction contexts cached_context['interaction_contexts'] = interaction_contexts self.session_cache[session_cache_key] = cached_context logger.debug(f"Cache updated with new interaction context for session {session_id} (total: {len(interaction_contexts)})") else: # If cache doesn't exist, create new entry new_context = { "session_id": session_id, "interaction_contexts": [{ "summary": interaction_summary, "timestamp": created_at }], "preferences": {}, "active_tasks": [], "user_context_loaded": False } self.session_cache[session_cache_key] = new_context logger.debug(f"Created new cache entry with interaction context for session {session_id}") def _update_cache_with_session_context(self, session_id: str, session_summary: str, created_at: str): """ Update cache with new session context immediately after database update This keeps cache synchronized with database without requiring database queries """ session_cache_key = f"session_{session_id}" # Get current cached context if it exists cached_context = self.session_cache.get(session_cache_key) if cached_context: # Update session context in cache cached_context['session_context'] = { "summary": session_summary, "timestamp": created_at } self.session_cache[session_cache_key] = cached_context logger.debug(f"Cache updated with new session context for session {session_id}") else: # If cache doesn't exist, create new entry new_context = { "session_id": session_id, "session_context": { "summary": session_summary, "timestamp": created_at }, "interaction_contexts": [], "preferences": {}, "active_tasks": [], "user_context_loaded": False } self.session_cache[session_cache_key] = new_context logger.debug(f"Created new cache entry with session context for session {session_id}") def _update_context(self, context: dict, user_input: str, response: str = None, user_id: str = "Test_Any") -> dict: """ Update context with deduplication and idempotency checks Prevents duplicate context updates using interaction hashes """ try: # Generate unique interaction hash to prevent duplicates interaction_hash = self._generate_interaction_hash(user_input, context["session_id"], user_id) # Check if this interaction was already processed if self._is_duplicate_interaction(interaction_hash): logger.info(f"Duplicate interaction detected, skipping update: {interaction_hash[:8]}") return context # Use transaction for atomic updates current_time = datetime.now().isoformat() with self.transaction_manager.transaction(context["session_id"]) as cursor: # Update session activity (only if last_activity is older to prevent unnecessary updates) cursor.execute(""" UPDATE sessions SET last_activity = ?, user_id = ? WHERE session_id = ? AND (last_activity IS NULL OR last_activity < ?) """, (current_time, user_id, context["session_id"], current_time)) # Store interaction with duplicate prevention using INSERT OR IGNORE session_context = { "preferences": context.get("preferences", {}), "active_tasks": context.get("active_tasks", []) } cursor.execute(""" INSERT OR IGNORE INTO interactions ( interaction_hash, session_id, user_input, context_snapshot, created_at ) VALUES (?, ?, ?, ?, ?) """, ( interaction_hash, context["session_id"], user_input, json.dumps(session_context), current_time )) # Mark interaction as processed (outside transaction) self._mark_interaction_processed(interaction_hash) # Update in-memory context context["last_interaction"] = user_input context["last_update"] = current_time logger.info(f"Context updated for session {context['session_id']} with hash {interaction_hash[:8]}") return context except Exception as e: logger.error(f"Error updating context: {e}", exc_info=True) return context def _generate_interaction_hash(self, user_input: str, session_id: str, user_id: str) -> str: """Generate unique hash for interaction to prevent duplicates""" # Use session_id, user_id, and user_input for exact duplicate detection # Normalize user input by stripping whitespace normalized_input = user_input.strip() content = f"{session_id}:{user_id}:{normalized_input}" return hashlib.sha256(content.encode()).hexdigest() def _is_duplicate_interaction(self, interaction_hash: str) -> bool: """Check if interaction was already processed""" # Keep a rolling window of recent interaction hashes in memory if not hasattr(self, '_processed_interactions'): self._processed_interactions = set() # Check in-memory cache first if interaction_hash in self._processed_interactions: return True # Also check database for persistent duplicates try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Check if interaction_hash column exists and query for duplicates cursor.execute("PRAGMA table_info(interactions)") columns = [row[1] for row in cursor.fetchall()] if 'interaction_hash' in columns: cursor.execute(""" SELECT COUNT(*) FROM interactions WHERE interaction_hash IS NOT NULL AND interaction_hash = ? """, (interaction_hash,)) count = cursor.fetchone()[0] conn.close() return count > 0 else: conn.close() return False except sqlite3.OperationalError: # Column might not exist yet, only check in-memory return interaction_hash in self._processed_interactions def _mark_interaction_processed(self, interaction_hash: str): """Mark interaction as processed""" if not hasattr(self, '_processed_interactions'): self._processed_interactions = set() self._processed_interactions.add(interaction_hash) # Limit memory usage by keeping only last 1000 hashes if len(self._processed_interactions) > 1000: # Keep most recent 500 entries (simple truncation) self._processed_interactions = set(list(self._processed_interactions)[-500:]) async def manage_context_optimized(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict: """ Efficient context management with transaction optimization """ # Use session-only cache key session_cache_key = f"session_{session_id}" # Try to get from cache first (no DB access) cached_context = self._get_from_memory_cache(session_cache_key) if cached_context and self._is_cache_valid(cached_context): logger.debug(f"Using cached context for session {session_id}") return cached_context # Use transaction for all DB operations with self.transaction_manager.transaction(session_id) as cursor: # Atomic session retrieval and update cursor.execute(""" SELECT s.context_data, s.user_metadata, s.last_activity, s.user_id, COUNT(ic.interaction_id) as interaction_count FROM sessions s LEFT JOIN interaction_contexts ic ON s.session_id = ic.session_id WHERE s.session_id = ? GROUP BY s.session_id """, (session_id,)) row = cursor.fetchone() if row: # Parse existing session data context_data = json.loads(row[0] or '{}') user_metadata = json.loads(row[1] or '{}') last_activity = row[2] stored_user_id = row[3] or user_id interaction_count = row[4] or 0 # Handle user change atomically if stored_user_id != user_id: self._handle_user_change_atomic(cursor, session_id, stored_user_id, user_id) # Get interaction contexts efficiently interaction_contexts = self._get_interaction_contexts_atomic(cursor, session_id) else: # Create new session atomically cursor.execute(""" INSERT INTO sessions (session_id, user_id, created_at, last_activity, context_data, user_metadata) VALUES (?, ?, datetime('now'), datetime('now'), '{}', '{}') """, (session_id, user_id)) context_data = {} user_metadata = {} interaction_contexts = [] interaction_count = 0 # Load user context asynchronously (outside transaction) user_context = await self._load_user_context_async(user_id) # Build final context final_context = { "session_id": session_id, "user_id": user_id, "interaction_contexts": interaction_contexts, "user_context": user_context, "preferences": user_metadata.get("preferences", {}), "active_tasks": user_metadata.get("active_tasks", []), "interaction_count": interaction_count, "cache_timestamp": datetime.now().isoformat() } # Update cache self._warm_memory_cache(session_cache_key, final_context) return self._optimize_context(final_context) def _handle_user_change_atomic(self, cursor, session_id: str, old_user_id: str, new_user_id: str): """Handle user change within transaction""" logger.info(f"Handling user change in transaction: {old_user_id} -> {new_user_id}") # Update session cursor.execute(""" UPDATE sessions SET user_id = ?, last_activity = datetime('now') WHERE session_id = ? """, (new_user_id, session_id)) # Log the change try: cursor.execute(""" INSERT INTO user_change_log (session_id, old_user_id, new_user_id, timestamp) VALUES (?, ?, ?, datetime('now')) """, (session_id, old_user_id, new_user_id)) except sqlite3.OperationalError: # Table might not exist yet pass # Invalidate related caches try: cursor.execute(""" UPDATE interaction_contexts SET needs_refresh = 1 WHERE session_id = ? """, (session_id,)) except sqlite3.OperationalError: # Column might not exist yet pass def _get_interaction_contexts_atomic(self, cursor, session_id: str, limit: int = 20): """Get interaction contexts within transaction""" try: cursor.execute(""" SELECT interaction_summary, created_at, interaction_id FROM interaction_contexts WHERE session_id = ? AND (needs_refresh IS NULL OR needs_refresh = 0) ORDER BY created_at DESC LIMIT ? """, (session_id, limit)) except sqlite3.OperationalError: # Fallback if needs_refresh column doesn't exist cursor.execute(""" SELECT interaction_summary, created_at, interaction_id FROM interaction_contexts WHERE session_id = ? ORDER BY created_at DESC LIMIT ? """, (session_id, limit)) contexts = [] for row in cursor.fetchall(): if row[0]: contexts.append({ "summary": row[0], "timestamp": row[1], "id": row[2] if len(row) > 2 else None }) return contexts async def _load_user_context_async(self, user_id: str): """Load user context asynchronously to avoid blocking""" try: # Check memory cache first user_cache_key = f"user_{user_id}" cached = self._get_from_memory_cache(user_cache_key) if cached: return cached.get("user_context", "") # Load from database return await self.get_user_context(user_id) except Exception as e: logger.error(f"Error loading user context: {e}") return "" def _is_cache_valid(self, cached_context: dict, max_age_seconds: int = 60) -> bool: """Check if cached context is still valid""" if not cached_context: return False cache_timestamp = cached_context.get("cache_timestamp") if not cache_timestamp: return False try: cache_time = datetime.fromisoformat(cache_timestamp) age = (datetime.now() - cache_time).total_seconds() return age < max_age_seconds except: return False def invalidate_session_cache(self, session_id: str): """ Invalidate cached context for a session to force fresh retrieval Only affects cache management - does not change application functionality """ session_cache_key = f"session_{session_id}" if session_cache_key in self.session_cache: del self.session_cache[session_cache_key] logger.info(f"Cache invalidated for session {session_id} to ensure fresh context retrieval") def optimize_database_indexes(self): """Create database indexes for better query performance""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create indexes for frequently queried columns indexes = [ "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)", "CREATE INDEX IF NOT EXISTS idx_sessions_last_activity ON sessions(last_activity)", "CREATE INDEX IF NOT EXISTS idx_interactions_session_id ON interactions(session_id)", "CREATE INDEX IF NOT EXISTS idx_interaction_contexts_session_id ON interaction_contexts(session_id)", "CREATE INDEX IF NOT EXISTS idx_interaction_contexts_created_at ON interaction_contexts(created_at)", "CREATE INDEX IF NOT EXISTS idx_user_change_log_session_id ON user_change_log(session_id)", "CREATE INDEX IF NOT EXISTS idx_user_contexts_updated_at ON user_contexts(updated_at)" ] for index in indexes: try: cursor.execute(index) except sqlite3.OperationalError as e: # Table might not exist yet, skip this index logger.debug(f"Skipping index creation (table may not exist): {e}") # Analyze database for query optimization try: cursor.execute("ANALYZE") except sqlite3.OperationalError: # ANALYZE might not be available in all SQLite versions pass conn.commit() conn.close() logger.info("✓ Database indexes optimized successfully") except Exception as e: logger.error(f"Error optimizing database indexes: {e}", exc_info=True) def set_context_mode(self, session_id: str, mode: str, user_id: str = "Test_Any"): """ Set context mode for session (fresh or relevant) Args: session_id: Session identifier mode: 'fresh' (no user context) or 'relevant' (only relevant context) user_id: User identifier Returns: bool: True if successful, False otherwise """ try: import time # VALIDATION: Ensure mode is valid if mode not in ['fresh', 'relevant']: logger.warning(f"Invalid context mode '{mode}', defaulting to 'fresh'") mode = 'fresh' # Get or create cache entry cache_key = f"session_{session_id}" cached_context = self._get_from_memory_cache(cache_key) if not cached_context: cached_context = { 'session_id': session_id, 'user_id': user_id, 'preferences': {}, 'context_mode': mode, 'context_mode_timestamp': time.time() } else: # Update existing context (preserve other data) cached_context['context_mode'] = mode cached_context['context_mode_timestamp'] = time.time() cached_context['user_id'] = user_id # Update user_id if changed # Update cache with TTL self.add_context_cache(cache_key, cached_context, ttl=3600) logger.info(f"Context mode set to '{mode}' for session {session_id} (user: {user_id})") return True except Exception as e: logger.error(f"Error setting context mode: {e}", exc_info=True) return False # Failure doesn't break existing flow def get_context_mode(self, session_id: str) -> str: """ Get current context mode for session Args: session_id: Session identifier Returns: str: 'fresh' or 'relevant' (default: 'fresh') """ try: cache_key = f"session_{session_id}" cached_context = self._get_from_memory_cache(cache_key) if cached_context: mode = cached_context.get('context_mode', 'fresh') # VALIDATION: Ensure mode is still valid if mode in ['fresh', 'relevant']: return mode else: logger.warning(f"Invalid cached mode '{mode}', resetting to 'fresh'") cached_context['context_mode'] = 'fresh' import time cached_context['context_mode_timestamp'] = time.time() self.add_context_cache(cache_key, cached_context, ttl=3600) return 'fresh' # Default for new sessions return 'fresh' except Exception as e: logger.error(f"Error getting context mode: {e}", exc_info=True) return 'fresh' # Safe default - no degradation async def get_all_user_sessions(self, user_id: str) -> List[Dict]: """ Fetch all session contexts for a user (for relevance classification) Performance: Single database query with JOIN Args: user_id: User identifier Returns: List of session context dictionaries with summaries and interactions """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Fetch all session contexts for user with interaction summaries cursor.execute(""" SELECT DISTINCT sc.session_id, sc.session_summary, sc.created_at, (SELECT GROUP_CONCAT(ic.interaction_summary, ' ||| ') FROM interaction_contexts ic WHERE ic.session_id = sc.session_id ORDER BY ic.created_at DESC LIMIT 10) as recent_interactions FROM session_contexts sc JOIN sessions s ON sc.session_id = s.session_id WHERE s.user_id = ? ORDER BY sc.created_at DESC LIMIT 50 """, (user_id,)) sessions = [] for row in cursor.fetchall(): session_id, session_summary, created_at, interactions_str = row # Parse interaction summaries interaction_list = [] if interactions_str: for summary in interactions_str.split(' ||| '): if summary.strip(): interaction_list.append({ 'summary': summary.strip(), 'timestamp': created_at }) sessions.append({ 'session_id': session_id, 'summary': session_summary or '', 'created_at': created_at, 'interaction_contexts': interaction_list }) conn.close() logger.info(f"Fetched {len(sessions)} sessions for user {user_id}") return sessions except Exception as e: logger.error(f"Error fetching user sessions: {e}", exc_info=True) return [] # Safe fallback - no degradation def _extract_entities(self, context: dict) -> list: """ Extract essential entities from context """ # TODO: Implement entity extraction return [] def _generate_summary(self, context: dict) -> str: """ Generate conversation summary """ # TODO: Implement summary generation return "" def get_or_create_session_context(self, session_id: str, user_id: Optional[str] = None) -> Dict: """Enhanced context retrieval with caching""" import time # In-memory cache check first if session_id in self._session_cache: cache_entry = self._session_cache[session_id] if time.time() - cache_entry['timestamp'] < 300: # 5 min cache logger.debug(f"Cache hit for session {session_id}") return cache_entry['context'] # Batch database queries conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Single query for all context data query = """ SELECT s.context_data, s.user_metadata, s.last_activity, u.persona_summary, ic.interaction_summary FROM sessions s LEFT JOIN user_contexts u ON s.user_id = u.user_id LEFT JOIN interaction_contexts ic ON s.session_id = ic.session_id WHERE s.session_id = ? ORDER BY ic.created_at DESC LIMIT 10 """ cursor.execute(query, (session_id,)) results = cursor.fetchall() # Process results efficiently context = self._build_context_from_results(results, session_id, user_id) # Update cache self._session_cache[session_id] = { 'context': context, 'timestamp': time.time() } return context except Exception as e: logger.error(f"Error in get_or_create_session_context: {e}", exc_info=True) # Return safe fallback return { "session_id": session_id, "user_id": user_id or "Test_Any", "interaction_contexts": [], "session_context": None, "preferences": {}, "active_tasks": [], "user_context_loaded": False } finally: if conn: conn.close() def _build_context_from_results(self, results: list, session_id: str, user_id: Optional[str]) -> Dict: """Build context dictionary from batch query results""" context = { "session_id": session_id, "user_id": user_id or "Test_Any", "interaction_contexts": [], "session_context": None, "user_context": "", "preferences": {}, "active_tasks": [], "user_context_loaded": False } if not results: return context # Process first row for session data first_row = results[0] if first_row[0]: # context_data try: session_data = json.loads(first_row[0]) context["preferences"] = session_data.get("preferences", {}) context["active_tasks"] = session_data.get("active_tasks", []) except: pass if first_row[1]: # user_metadata try: user_metadata = json.loads(first_row[1]) context["preferences"].update(user_metadata.get("preferences", {})) except: pass context["last_activity"] = first_row[2] # last_activity if first_row[3]: # persona_summary context["user_context"] = first_row[3] context["user_context_loaded"] = True # Process interaction contexts seen_interactions = set() for row in results: if row[4]: # interaction_summary # Deduplicate interactions if row[4] not in seen_interactions: seen_interactions.add(row[4]) context["interaction_contexts"].append({ "summary": row[4], "timestamp": None # Could extract from row if available }) return context