File size: 8,709 Bytes
66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# context_manager.py
import sqlite3
import json
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class EfficientContextManager:
def __init__(self):
self.session_cache = {} # In-memory for active sessions
self.cache_config = {
"max_session_size": 10, # MB per session
"ttl": 3600, # 1 hour
"compression": "gzip",
"eviction_policy": "LRU"
}
self.db_path = "sessions.db"
logger.info(f"Initializing ContextManager with DB path: {self.db_path}")
self._init_database()
def _init_database(self):
"""Initialize database and create tables"""
try:
logger.info("Initializing database...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create sessions table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at TIMESTAMP,
last_activity TIMESTAMP,
context_data TEXT,
user_metadata TEXT
)
""")
logger.info("✓ Sessions table ready")
# Create interactions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT REFERENCES sessions(session_id),
user_input TEXT,
context_snapshot TEXT,
created_at TIMESTAMP,
FOREIGN KEY(session_id) REFERENCES sessions(session_id)
)
""")
logger.info("✓ Interactions table ready")
conn.commit()
conn.close()
logger.info("Database initialization complete")
except Exception as e:
logger.error(f"Database initialization error: {e}", exc_info=True)
async def manage_context(self, session_id: str, user_input: str) -> dict:
"""
Efficient context management with multi-level caching
"""
# Level 1: In-memory session cache
context = self._get_from_memory_cache(session_id)
if not context:
# Level 2: Database retrieval with embeddings
context = await self._retrieve_from_db(session_id, user_input)
# Cache warming
self._warm_memory_cache(session_id, context)
# Update context with new interaction
updated_context = self._update_context(context, user_input)
return self._optimize_context(updated_context)
def _optimize_context(self, context: dict) -> dict:
"""
Optimize context for LLM consumption
"""
return {
"essential_entities": self._extract_entities(context),
"conversation_summary": self._generate_summary(context),
"recent_interactions": context.get("interactions", [])[-3:],
"user_preferences": context.get("preferences", {}),
"active_tasks": context.get("active_tasks", [])
}
def _get_from_memory_cache(self, session_id: str) -> dict:
"""
Retrieve context from in-memory session cache
"""
# TODO: Implement in-memory cache retrieval
return self.session_cache.get(session_id)
async def _retrieve_from_db(self, session_id: str, user_input: str) -> dict:
"""
Retrieve context from database with semantic search
"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get session data
cursor.execute("""
SELECT context_data, user_metadata, last_activity
FROM sessions
WHERE session_id = ?
""", (session_id,))
row = cursor.fetchone()
if row:
context_data = json.loads(row[0]) if row[0] else {}
user_metadata = json.loads(row[1]) if row[1] else {}
last_activity = row[2]
# Get recent interactions
cursor.execute("""
SELECT user_input, context_snapshot, created_at
FROM interactions
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 10
""", (session_id,))
recent_interactions = []
for interaction_row in cursor.fetchall():
recent_interactions.append({
"user_input": interaction_row[0],
"context": json.loads(interaction_row[1]) if interaction_row[1] else {},
"timestamp": interaction_row[2]
})
context = {
"session_id": session_id,
"interactions": recent_interactions,
"preferences": user_metadata.get("preferences", {}),
"active_tasks": user_metadata.get("active_tasks", []),
"last_activity": last_activity
}
conn.close()
return context
else:
# Create new session
cursor.execute("""
INSERT INTO sessions (session_id, created_at, last_activity, context_data, user_metadata)
VALUES (?, ?, ?, ?, ?)
""", (session_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}"))
conn.commit()
conn.close()
return {
"session_id": session_id,
"interactions": [],
"preferences": {},
"active_tasks": []
}
except Exception as e:
print(f"Database retrieval error: {e}")
# Fallback to empty context
return {
"session_id": session_id,
"interactions": [],
"preferences": {},
"active_tasks": []
}
def _warm_memory_cache(self, session_id: str, context: dict):
"""
Warm the in-memory cache with retrieved context
"""
# TODO: Implement cache warming with LRU eviction
self.session_cache[session_id] = context
def _update_context(self, context: dict, user_input: str) -> dict:
"""
Update context with new user interaction and persist to database
"""
try:
# Add new interaction to context
if "interactions" not in context:
context["interactions"] = []
new_interaction = {
"user_input": user_input,
"timestamp": datetime.now().isoformat(),
"context": context
}
# Keep only last 10 interactions in memory
context["interactions"] = [new_interaction] + context["interactions"][:9]
# Persist to database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Update session
cursor.execute("""
UPDATE sessions
SET last_activity = ?, context_data = ?
WHERE session_id = ?
""", (datetime.now().isoformat(), json.dumps(context), context["session_id"]))
# Insert interaction
cursor.execute("""
INSERT INTO interactions (session_id, user_input, context_snapshot, created_at)
VALUES (?, ?, ?, ?)
""", (context["session_id"], user_input, json.dumps(context), datetime.now().isoformat()))
conn.commit()
conn.close()
except Exception as e:
print(f"Context update error: {e}")
return context
def _extract_entities(self, context: dict) -> list:
"""
Extract essential entities from context
"""
# TODO: Implement entity extraction
return []
def _generate_summary(self, context: dict) -> str:
"""
Generate conversation summary
"""
# TODO: Implement summary generation
return ""
|