Research_AI_Assistant / context_manager.py
JatsTheAIGen's picture
workflow errors debugging V3
ae20ff2
raw
history blame
8.71 kB
# context_manager.py
import sqlite3
import json
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class EfficientContextManager:
def __init__(self):
self.session_cache = {} # In-memory for active sessions
self.cache_config = {
"max_session_size": 10, # MB per session
"ttl": 3600, # 1 hour
"compression": "gzip",
"eviction_policy": "LRU"
}
self.db_path = "sessions.db"
logger.info(f"Initializing ContextManager with DB path: {self.db_path}")
self._init_database()
def _init_database(self):
"""Initialize database and create tables"""
try:
logger.info("Initializing database...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create sessions table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at TIMESTAMP,
last_activity TIMESTAMP,
context_data TEXT,
user_metadata TEXT
)
""")
logger.info("✓ Sessions table ready")
# Create interactions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT REFERENCES sessions(session_id),
user_input TEXT,
context_snapshot TEXT,
created_at TIMESTAMP,
FOREIGN KEY(session_id) REFERENCES sessions(session_id)
)
""")
logger.info("✓ Interactions table ready")
conn.commit()
conn.close()
logger.info("Database initialization complete")
except Exception as e:
logger.error(f"Database initialization error: {e}", exc_info=True)
async def manage_context(self, session_id: str, user_input: str) -> dict:
"""
Efficient context management with multi-level caching
"""
# Level 1: In-memory session cache
context = self._get_from_memory_cache(session_id)
if not context:
# Level 2: Database retrieval with embeddings
context = await self._retrieve_from_db(session_id, user_input)
# Cache warming
self._warm_memory_cache(session_id, context)
# Update context with new interaction
updated_context = self._update_context(context, user_input)
return self._optimize_context(updated_context)
def _optimize_context(self, context: dict) -> dict:
"""
Optimize context for LLM consumption
"""
return {
"essential_entities": self._extract_entities(context),
"conversation_summary": self._generate_summary(context),
"recent_interactions": context.get("interactions", [])[-3:],
"user_preferences": context.get("preferences", {}),
"active_tasks": context.get("active_tasks", [])
}
def _get_from_memory_cache(self, session_id: str) -> dict:
"""
Retrieve context from in-memory session cache
"""
# TODO: Implement in-memory cache retrieval
return self.session_cache.get(session_id)
async def _retrieve_from_db(self, session_id: str, user_input: str) -> dict:
"""
Retrieve context from database with semantic search
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get session data
cursor.execute("""
SELECT context_data, user_metadata, last_activity
FROM sessions
WHERE session_id = ?
""", (session_id,))
row = cursor.fetchone()
if row:
context_data = json.loads(row[0]) if row[0] else {}
user_metadata = json.loads(row[1]) if row[1] else {}
last_activity = row[2]
# Get recent interactions
cursor.execute("""
SELECT user_input, context_snapshot, created_at
FROM interactions
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 10
""", (session_id,))
recent_interactions = []
for interaction_row in cursor.fetchall():
recent_interactions.append({
"user_input": interaction_row[0],
"context": json.loads(interaction_row[1]) if interaction_row[1] else {},
"timestamp": interaction_row[2]
})
context = {
"session_id": session_id,
"interactions": recent_interactions,
"preferences": user_metadata.get("preferences", {}),
"active_tasks": user_metadata.get("active_tasks", []),
"last_activity": last_activity
}
conn.close()
return context
else:
# Create new session
cursor.execute("""
INSERT INTO sessions (session_id, created_at, last_activity, context_data, user_metadata)
VALUES (?, ?, ?, ?, ?)
""", (session_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}"))
conn.commit()
conn.close()
return {
"session_id": session_id,
"interactions": [],
"preferences": {},
"active_tasks": []
}
except Exception as e:
print(f"Database retrieval error: {e}")
# Fallback to empty context
return {
"session_id": session_id,
"interactions": [],
"preferences": {},
"active_tasks": []
}
def _warm_memory_cache(self, session_id: str, context: dict):
"""
Warm the in-memory cache with retrieved context
"""
# TODO: Implement cache warming with LRU eviction
self.session_cache[session_id] = context
def _update_context(self, context: dict, user_input: str) -> dict:
"""
Update context with new user interaction and persist to database
"""
try:
# Add new interaction to context
if "interactions" not in context:
context["interactions"] = []
new_interaction = {
"user_input": user_input,
"timestamp": datetime.now().isoformat(),
"context": context
}
# Keep only last 10 interactions in memory
context["interactions"] = [new_interaction] + context["interactions"][:9]
# Persist to database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Update session
cursor.execute("""
UPDATE sessions
SET last_activity = ?, context_data = ?
WHERE session_id = ?
""", (datetime.now().isoformat(), json.dumps(context), context["session_id"]))
# Insert interaction
cursor.execute("""
INSERT INTO interactions (session_id, user_input, context_snapshot, created_at)
VALUES (?, ?, ?, ?)
""", (context["session_id"], user_input, json.dumps(context), datetime.now().isoformat()))
conn.commit()
conn.close()
except Exception as e:
print(f"Context update error: {e}")
return context
def _extract_entities(self, context: dict) -> list:
"""
Extract essential entities from context
"""
# TODO: Implement entity extraction
return []
def _generate_summary(self, context: dict) -> str:
"""
Generate conversation summary
"""
# TODO: Implement summary generation
return ""