JatsTheAIGen commited on
Commit
89a43bb
·
1 Parent(s): 35d9168

cache key error when user id changes -fixed task 1 31_10_2025 v3

Browse files
Files changed (2) hide show
  1. src/context_manager.py +542 -45
  2. src/orchestrator_engine.py +82 -14
src/context_manager.py CHANGED
@@ -3,10 +3,49 @@ import sqlite3
3
  import json
4
  import logging
5
  import uuid
 
 
 
6
  from datetime import datetime, timedelta
7
 
8
  logger = logging.getLogger(__name__)
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  class EfficientContextManager:
11
  def __init__(self, llm_router=None):
12
  self.session_cache = {} # In-memory for active sessions
@@ -19,7 +58,9 @@ class EfficientContextManager:
19
  self.db_path = "sessions.db"
20
  self.llm_router = llm_router # For generating context summaries
21
  logger.info(f"Initializing ContextManager with DB path: {self.db_path}")
 
22
  self._init_database()
 
23
 
24
  def _init_database(self):
25
  """Initialize database and create tables"""
@@ -102,11 +143,95 @@ class EfficientContextManager:
102
 
103
  conn.commit()
104
  conn.close()
 
 
 
 
105
  logger.info("Database initialization complete")
106
 
107
  except Exception as e:
108
  logger.error(f"Database initialization error: {e}", exc_info=True)
109
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  async def manage_context(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
111
  """
112
  Efficient context management with separated session/user caching
@@ -121,6 +246,14 @@ class EfficientContextManager:
121
  # Get session context from cache
122
  session_context = self._get_from_memory_cache(session_cache_key)
123
 
 
 
 
 
 
 
 
 
124
  # Get user context separately
125
  user_context = self._get_from_memory_cache(user_cache_key)
126
 
@@ -128,7 +261,7 @@ class EfficientContextManager:
128
  # Retrieve from database with user context
129
  session_context = await self._retrieve_from_db(session_id, user_input, user_id)
130
 
131
- # Cache session context
132
  self._warm_memory_cache(session_cache_key, session_context)
133
 
134
  # Handle user context separately to prevent loops
@@ -453,13 +586,18 @@ Keep the summary concise (approximately 100 tokens)."""
453
 
454
  async def _retrieve_from_db(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
455
  """
456
- Retrieve context from database with semantic search
 
457
  """
 
458
  try:
459
  conn = sqlite3.connect(self.db_path)
460
  cursor = conn.cursor()
461
 
462
- # Get session data
 
 
 
463
  cursor.execute("""
464
  SELECT context_data, user_metadata, last_activity, user_id
465
  FROM sessions
@@ -474,31 +612,77 @@ Keep the summary concise (approximately 100 tokens)."""
474
  last_activity = row[2]
475
  session_user_id = row[3] if len(row) > 3 else user_id
476
 
477
- # Update user_id if it changed
 
478
  if session_user_id != user_id:
 
 
 
 
479
  cursor.execute("""
480
- UPDATE sessions SET user_id = ? WHERE session_id = ?
481
- """, (user_id, session_id))
482
- conn.commit()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
483
  # Clear old cache entries when user changes
484
  self._clear_user_cache_on_change(session_id, user_id, session_user_id)
485
 
486
- # Get previous interaction contexts for this session
487
- cursor.execute("""
488
- SELECT interaction_summary, created_at
489
- FROM interaction_contexts
490
- WHERE session_id = ?
491
- ORDER BY created_at DESC
492
- LIMIT 20
493
- """, (session_id,))
 
 
 
 
 
 
 
 
 
 
 
 
494
 
495
  interaction_contexts = []
496
  for ic_row in cursor.fetchall():
497
- if ic_row[0]:
498
- interaction_contexts.append({
499
- "summary": ic_row[0],
500
- "timestamp": ic_row[1]
501
- })
 
 
 
 
 
 
502
 
503
  context = {
504
  "session_id": session_id,
@@ -507,18 +691,20 @@ Keep the summary concise (approximately 100 tokens)."""
507
  "preferences": user_metadata.get("preferences", {}),
508
  "active_tasks": user_metadata.get("active_tasks", []),
509
  "last_activity": last_activity,
510
- "user_context_loaded": False # Will be loaded in manage_context
 
511
  }
512
 
513
  conn.close()
514
  return context
515
  else:
516
- # Create new session
517
  cursor.execute("""
518
  INSERT INTO sessions (session_id, user_id, created_at, last_activity, context_data, user_metadata)
519
  VALUES (?, ?, ?, ?, ?, ?)
520
  """, (session_id, user_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}"))
521
- conn.commit()
 
522
  conn.close()
523
 
524
  return {
@@ -527,19 +713,47 @@ Keep the summary concise (approximately 100 tokens)."""
527
  "interaction_contexts": [],
528
  "preferences": {},
529
  "active_tasks": [],
530
- "user_context_loaded": False
 
531
  }
532
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
533
  except Exception as e:
534
  logger.error(f"Database retrieval error: {e}", exc_info=True)
535
- # Fallback to empty context
 
 
 
 
 
 
536
  return {
537
  "session_id": session_id,
538
  "user_id": user_id,
539
  "interaction_contexts": [],
540
  "preferences": {},
541
  "active_tasks": [],
542
- "user_context_loaded": False
 
 
543
  }
544
 
545
  def _warm_memory_cache(self, cache_key: str, context: dict):
@@ -550,39 +764,322 @@ Keep the summary concise (approximately 100 tokens)."""
550
 
551
  def _update_context(self, context: dict, user_input: str, response: str = None, user_id: str = "Test_Any") -> dict:
552
  """
553
- Update context with new user interaction and persist to database
554
- Note: Interaction context generation happens separately after response is generated
555
  """
556
  try:
557
- # Update session activity
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
558
  conn = sqlite3.connect(self.db_path)
559
  cursor = conn.cursor()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
560
 
561
- # Update session last_activity
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
562
  cursor.execute("""
563
- UPDATE sessions
564
- SET last_activity = ?, user_id = ?
565
  WHERE session_id = ?
566
- """, (datetime.now().isoformat(), user_id, context["session_id"]))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
567
 
568
- # Insert basic interaction record (for backward compatibility)
569
- session_context = {
570
- "preferences": context.get("preferences", {}),
571
- "active_tasks": context.get("active_tasks", [])
572
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
573
 
574
- cursor.execute("""
575
- INSERT INTO interactions (session_id, user_input, context_snapshot, created_at)
576
- VALUES (?, ?, ?, ?)
577
- """, (context["session_id"], user_input, json.dumps(session_context), datetime.now().isoformat()))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
578
 
579
  conn.commit()
580
  conn.close()
581
 
 
 
582
  except Exception as e:
583
- logger.error(f"Context update error: {e}", exc_info=True)
584
-
585
- return context
586
 
587
  def _extract_entities(self, context: dict) -> list:
588
  """
 
3
  import json
4
  import logging
5
  import uuid
6
+ import hashlib
7
+ import threading
8
+ from contextlib import contextmanager
9
  from datetime import datetime, timedelta
10
 
11
  logger = logging.getLogger(__name__)
12
 
13
+
14
+ class TransactionManager:
15
+ """Manage database transactions with proper locking"""
16
+
17
+ def __init__(self, db_path):
18
+ self.db_path = db_path
19
+ self._lock = threading.RLock()
20
+ self._connections = {}
21
+
22
+ @contextmanager
23
+ def transaction(self, session_id=None):
24
+ """Context manager for database transactions with automatic rollback"""
25
+ conn = None
26
+ cursor = None
27
+
28
+ try:
29
+ with self._lock:
30
+ conn = sqlite3.connect(self.db_path, isolation_level='IMMEDIATE')
31
+ conn.execute('PRAGMA journal_mode=WAL') # Write-Ahead Logging for better concurrency
32
+ conn.execute('PRAGMA busy_timeout=5000') # 5 second timeout for locks
33
+ cursor = conn.cursor()
34
+
35
+ yield cursor
36
+
37
+ conn.commit()
38
+ logger.debug(f"Transaction committed for session {session_id}")
39
+
40
+ except Exception as e:
41
+ if conn:
42
+ conn.rollback()
43
+ logger.error(f"Transaction rolled back for session {session_id}: {e}")
44
+ raise
45
+ finally:
46
+ if conn:
47
+ conn.close()
48
+
49
  class EfficientContextManager:
50
  def __init__(self, llm_router=None):
51
  self.session_cache = {} # In-memory for active sessions
 
58
  self.db_path = "sessions.db"
59
  self.llm_router = llm_router # For generating context summaries
60
  logger.info(f"Initializing ContextManager with DB path: {self.db_path}")
61
+ self.transaction_manager = TransactionManager(self.db_path)
62
  self._init_database()
63
+ self.optimize_database_indexes()
64
 
65
  def _init_database(self):
66
  """Initialize database and create tables"""
 
143
 
144
  conn.commit()
145
  conn.close()
146
+
147
+ # Update schema with new columns and tables for user change tracking
148
+ self._update_database_schema()
149
+
150
  logger.info("Database initialization complete")
151
 
152
  except Exception as e:
153
  logger.error(f"Database initialization error: {e}", exc_info=True)
154
 
155
+ def _update_database_schema(self):
156
+ """Add missing columns and tables for user change tracking"""
157
+ try:
158
+ conn = sqlite3.connect(self.db_path)
159
+ cursor = conn.cursor()
160
+
161
+ # Add needs_refresh column to interaction_contexts
162
+ try:
163
+ cursor.execute("""
164
+ ALTER TABLE interaction_contexts
165
+ ADD COLUMN needs_refresh INTEGER DEFAULT 0
166
+ """)
167
+ logger.info("✓ Added needs_refresh column to interaction_contexts")
168
+ except sqlite3.OperationalError:
169
+ pass # Column already exists
170
+
171
+ # Create user change log table
172
+ cursor.execute("""
173
+ CREATE TABLE IF NOT EXISTS user_change_log (
174
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
175
+ session_id TEXT,
176
+ old_user_id TEXT,
177
+ new_user_id TEXT,
178
+ timestamp TIMESTAMP,
179
+ FOREIGN KEY(session_id) REFERENCES sessions(session_id)
180
+ )
181
+ """)
182
+
183
+ conn.commit()
184
+ conn.close()
185
+ logger.info("✓ Database schema updated successfully for user change tracking")
186
+
187
+ # Update interactions table for deduplication
188
+ self._update_interactions_table()
189
+
190
+ except Exception as e:
191
+ logger.error(f"Schema update error: {e}", exc_info=True)
192
+
193
+ def _update_interactions_table(self):
194
+ """Add interaction_hash column for deduplication"""
195
+ try:
196
+ conn = sqlite3.connect(self.db_path)
197
+ cursor = conn.cursor()
198
+
199
+ # Check if column already exists
200
+ cursor.execute("PRAGMA table_info(interactions)")
201
+ columns = [row[1] for row in cursor.fetchall()]
202
+
203
+ # Add interaction_hash column if it doesn't exist
204
+ if 'interaction_hash' not in columns:
205
+ try:
206
+ cursor.execute("""
207
+ ALTER TABLE interactions
208
+ ADD COLUMN interaction_hash TEXT
209
+ """)
210
+ logger.info("✓ Added interaction_hash column to interactions table")
211
+ except sqlite3.OperationalError:
212
+ pass # Column already exists
213
+
214
+ # Create unique index for deduplication (this enforces uniqueness)
215
+ try:
216
+ cursor.execute("""
217
+ CREATE UNIQUE INDEX IF NOT EXISTS idx_interaction_hash_unique
218
+ ON interactions(interaction_hash)
219
+ """)
220
+ logger.info("✓ Created unique index on interaction_hash")
221
+ except sqlite3.OperationalError:
222
+ # Index might already exist, try non-unique index as fallback
223
+ cursor.execute("""
224
+ CREATE INDEX IF NOT EXISTS idx_interaction_hash
225
+ ON interactions(interaction_hash)
226
+ """)
227
+
228
+ conn.commit()
229
+ conn.close()
230
+ logger.info("✓ Interactions table updated for deduplication")
231
+
232
+ except Exception as e:
233
+ logger.error(f"Error updating interactions table: {e}", exc_info=True)
234
+
235
  async def manage_context(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
236
  """
237
  Efficient context management with separated session/user caching
 
246
  # Get session context from cache
247
  session_context = self._get_from_memory_cache(session_cache_key)
248
 
249
+ # Check if cached session context matches current user_id
250
+ if session_context and session_context.get("user_id") != user_id:
251
+ # User changed, invalidate session cache
252
+ logger.info(f"User mismatch in cache for session {session_id}, invalidating cache")
253
+ session_context = None
254
+ if session_cache_key in self.session_cache:
255
+ del self.session_cache[session_cache_key]
256
+
257
  # Get user context separately
258
  user_context = self._get_from_memory_cache(user_cache_key)
259
 
 
261
  # Retrieve from database with user context
262
  session_context = await self._retrieve_from_db(session_id, user_input, user_id)
263
 
264
+ # Cache session context (cache invalidation for user changes is handled in _retrieve_from_db)
265
  self._warm_memory_cache(session_cache_key, session_context)
266
 
267
  # Handle user context separately to prevent loops
 
586
 
587
  async def _retrieve_from_db(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
588
  """
589
+ Retrieve session context with proper user_id synchronization
590
+ Uses transactions to ensure atomic updates of database and cache
591
  """
592
+ conn = None
593
  try:
594
  conn = sqlite3.connect(self.db_path)
595
  cursor = conn.cursor()
596
 
597
+ # Use transaction to ensure atomic updates
598
+ cursor.execute("BEGIN TRANSACTION")
599
+
600
+ # Get session data (SQLite doesn't support FOR UPDATE, but transaction ensures consistency)
601
  cursor.execute("""
602
  SELECT context_data, user_metadata, last_activity, user_id
603
  FROM sessions
 
612
  last_activity = row[2]
613
  session_user_id = row[3] if len(row) > 3 else user_id
614
 
615
+ # Check for user_id change and update atomically
616
+ user_changed = False
617
  if session_user_id != user_id:
618
+ logger.info(f"User change detected: {session_user_id} -> {user_id} for session {session_id}")
619
+ user_changed = True
620
+
621
+ # Update session with new user_id
622
  cursor.execute("""
623
+ UPDATE sessions
624
+ SET user_id = ?, last_activity = ?
625
+ WHERE session_id = ?
626
+ """, (user_id, datetime.now().isoformat(), session_id))
627
+
628
+ # Clear any cached interaction contexts for old user by marking for refresh
629
+ try:
630
+ cursor.execute("""
631
+ UPDATE interaction_contexts
632
+ SET needs_refresh = 1
633
+ WHERE session_id = ?
634
+ """, (session_id,))
635
+ except sqlite3.OperationalError:
636
+ # Column might not exist yet, will be created by schema update
637
+ pass
638
+
639
+ # Log user change event
640
+ try:
641
+ cursor.execute("""
642
+ INSERT INTO user_change_log (session_id, old_user_id, new_user_id, timestamp)
643
+ VALUES (?, ?, ?, ?)
644
+ """, (session_id, session_user_id, user_id, datetime.now().isoformat()))
645
+ except sqlite3.OperationalError:
646
+ # Table might not exist yet, will be created by schema update
647
+ pass
648
+
649
  # Clear old cache entries when user changes
650
  self._clear_user_cache_on_change(session_id, user_id, session_user_id)
651
 
652
+ cursor.execute("COMMIT")
653
+
654
+ # Get interaction contexts with refresh flag check
655
+ try:
656
+ cursor.execute("""
657
+ SELECT interaction_summary, created_at, needs_refresh
658
+ FROM interaction_contexts
659
+ WHERE session_id = ? AND (needs_refresh IS NULL OR needs_refresh = 0)
660
+ ORDER BY created_at DESC
661
+ LIMIT 20
662
+ """, (session_id,))
663
+ except sqlite3.OperationalError:
664
+ # Column might not exist yet, fall back to query without needs_refresh
665
+ cursor.execute("""
666
+ SELECT interaction_summary, created_at
667
+ FROM interaction_contexts
668
+ WHERE session_id = ?
669
+ ORDER BY created_at DESC
670
+ LIMIT 20
671
+ """, (session_id,))
672
 
673
  interaction_contexts = []
674
  for ic_row in cursor.fetchall():
675
+ # Handle both query formats (with and without needs_refresh)
676
+ if len(ic_row) >= 2:
677
+ summary = ic_row[0]
678
+ timestamp = ic_row[1]
679
+ needs_refresh = ic_row[2] if len(ic_row) > 2 else 0
680
+
681
+ if summary and not needs_refresh:
682
+ interaction_contexts.append({
683
+ "summary": summary,
684
+ "timestamp": timestamp
685
+ })
686
 
687
  context = {
688
  "session_id": session_id,
 
691
  "preferences": user_metadata.get("preferences", {}),
692
  "active_tasks": user_metadata.get("active_tasks", []),
693
  "last_activity": last_activity,
694
+ "user_context_loaded": False,
695
+ "user_changed": user_changed
696
  }
697
 
698
  conn.close()
699
  return context
700
  else:
701
+ # Create new session with transaction
702
  cursor.execute("""
703
  INSERT INTO sessions (session_id, user_id, created_at, last_activity, context_data, user_metadata)
704
  VALUES (?, ?, ?, ?, ?, ?)
705
  """, (session_id, user_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}"))
706
+
707
+ cursor.execute("COMMIT")
708
  conn.close()
709
 
710
  return {
 
713
  "interaction_contexts": [],
714
  "preferences": {},
715
  "active_tasks": [],
716
+ "user_context_loaded": False,
717
+ "user_changed": False
718
  }
719
 
720
+ except sqlite3.Error as e:
721
+ logger.error(f"Database transaction error: {e}", exc_info=True)
722
+ if conn:
723
+ try:
724
+ conn.rollback()
725
+ except:
726
+ pass
727
+ conn.close()
728
+ # Return safe fallback
729
+ return {
730
+ "session_id": session_id,
731
+ "user_id": user_id,
732
+ "interaction_contexts": [],
733
+ "preferences": {},
734
+ "active_tasks": [],
735
+ "user_context_loaded": False,
736
+ "error": str(e),
737
+ "user_changed": False
738
+ }
739
  except Exception as e:
740
  logger.error(f"Database retrieval error: {e}", exc_info=True)
741
+ if conn:
742
+ try:
743
+ conn.rollback()
744
+ except:
745
+ pass
746
+ conn.close()
747
+ # Return safe fallback
748
  return {
749
  "session_id": session_id,
750
  "user_id": user_id,
751
  "interaction_contexts": [],
752
  "preferences": {},
753
  "active_tasks": [],
754
+ "user_context_loaded": False,
755
+ "error": str(e),
756
+ "user_changed": False
757
  }
758
 
759
  def _warm_memory_cache(self, cache_key: str, context: dict):
 
764
 
765
  def _update_context(self, context: dict, user_input: str, response: str = None, user_id: str = "Test_Any") -> dict:
766
  """
767
+ Update context with deduplication and idempotency checks
768
+ Prevents duplicate context updates using interaction hashes
769
  """
770
  try:
771
+ # Generate unique interaction hash to prevent duplicates
772
+ interaction_hash = self._generate_interaction_hash(user_input, context["session_id"], user_id)
773
+
774
+ # Check if this interaction was already processed
775
+ if self._is_duplicate_interaction(interaction_hash):
776
+ logger.info(f"Duplicate interaction detected, skipping update: {interaction_hash[:8]}")
777
+ return context
778
+
779
+ # Use transaction for atomic updates
780
+ current_time = datetime.now().isoformat()
781
+ with self.transaction_manager.transaction(context["session_id"]) as cursor:
782
+ # Update session activity (only if last_activity is older to prevent unnecessary updates)
783
+ cursor.execute("""
784
+ UPDATE sessions
785
+ SET last_activity = ?, user_id = ?
786
+ WHERE session_id = ? AND (last_activity IS NULL OR last_activity < ?)
787
+ """, (current_time, user_id, context["session_id"], current_time))
788
+
789
+ # Store interaction with duplicate prevention using INSERT OR IGNORE
790
+ session_context = {
791
+ "preferences": context.get("preferences", {}),
792
+ "active_tasks": context.get("active_tasks", [])
793
+ }
794
+
795
+ cursor.execute("""
796
+ INSERT OR IGNORE INTO interactions (
797
+ interaction_hash,
798
+ session_id,
799
+ user_input,
800
+ context_snapshot,
801
+ created_at
802
+ ) VALUES (?, ?, ?, ?, ?)
803
+ """, (
804
+ interaction_hash,
805
+ context["session_id"],
806
+ user_input,
807
+ json.dumps(session_context),
808
+ current_time
809
+ ))
810
+
811
+ # Mark interaction as processed (outside transaction)
812
+ self._mark_interaction_processed(interaction_hash)
813
+
814
+ # Update in-memory context
815
+ context["last_interaction"] = user_input
816
+ context["last_update"] = current_time
817
+
818
+ logger.info(f"Context updated for session {context['session_id']} with hash {interaction_hash[:8]}")
819
+
820
+ return context
821
+
822
+ except Exception as e:
823
+ logger.error(f"Error updating context: {e}", exc_info=True)
824
+ return context
825
+
826
+ def _generate_interaction_hash(self, user_input: str, session_id: str, user_id: str) -> str:
827
+ """Generate unique hash for interaction to prevent duplicates"""
828
+ # Use session_id, user_id, and user_input for exact duplicate detection
829
+ # Normalize user input by stripping whitespace
830
+ normalized_input = user_input.strip()
831
+ content = f"{session_id}:{user_id}:{normalized_input}"
832
+ return hashlib.sha256(content.encode()).hexdigest()
833
+
834
+ def _is_duplicate_interaction(self, interaction_hash: str) -> bool:
835
+ """Check if interaction was already processed"""
836
+ # Keep a rolling window of recent interaction hashes in memory
837
+ if not hasattr(self, '_processed_interactions'):
838
+ self._processed_interactions = set()
839
+
840
+ # Check in-memory cache first
841
+ if interaction_hash in self._processed_interactions:
842
+ return True
843
+
844
+ # Also check database for persistent duplicates
845
+ try:
846
  conn = sqlite3.connect(self.db_path)
847
  cursor = conn.cursor()
848
+ # Check if interaction_hash column exists and query for duplicates
849
+ cursor.execute("PRAGMA table_info(interactions)")
850
+ columns = [row[1] for row in cursor.fetchall()]
851
+ if 'interaction_hash' in columns:
852
+ cursor.execute("""
853
+ SELECT COUNT(*) FROM interactions
854
+ WHERE interaction_hash IS NOT NULL AND interaction_hash = ?
855
+ """, (interaction_hash,))
856
+ count = cursor.fetchone()[0]
857
+ conn.close()
858
+ return count > 0
859
+ else:
860
+ conn.close()
861
+ return False
862
+ except sqlite3.OperationalError:
863
+ # Column might not exist yet, only check in-memory
864
+ return interaction_hash in self._processed_interactions
865
+
866
+ def _mark_interaction_processed(self, interaction_hash: str):
867
+ """Mark interaction as processed"""
868
+ if not hasattr(self, '_processed_interactions'):
869
+ self._processed_interactions = set()
870
+ self._processed_interactions.add(interaction_hash)
871
+
872
+ # Limit memory usage by keeping only last 1000 hashes
873
+ if len(self._processed_interactions) > 1000:
874
+ # Keep most recent 500 entries (simple truncation)
875
+ self._processed_interactions = set(list(self._processed_interactions)[-500:])
876
+
877
+ async def manage_context_optimized(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
878
+ """
879
+ Efficient context management with transaction optimization
880
+ """
881
+ # Use session-only cache key
882
+ session_cache_key = f"session_{session_id}"
883
+
884
+ # Try to get from cache first (no DB access)
885
+ cached_context = self._get_from_memory_cache(session_cache_key)
886
+ if cached_context and self._is_cache_valid(cached_context):
887
+ logger.debug(f"Using cached context for session {session_id}")
888
+ return cached_context
889
+
890
+ # Use transaction for all DB operations
891
+ with self.transaction_manager.transaction(session_id) as cursor:
892
+ # Atomic session retrieval and update
893
+ cursor.execute("""
894
+ SELECT s.context_data, s.user_metadata, s.last_activity, s.user_id,
895
+ COUNT(ic.interaction_id) as interaction_count
896
+ FROM sessions s
897
+ LEFT JOIN interaction_contexts ic ON s.session_id = ic.session_id
898
+ WHERE s.session_id = ?
899
+ GROUP BY s.session_id
900
+ """, (session_id,))
901
+
902
+ row = cursor.fetchone()
903
 
904
+ if row:
905
+ # Parse existing session data
906
+ context_data = json.loads(row[0] or '{}')
907
+ user_metadata = json.loads(row[1] or '{}')
908
+ last_activity = row[2]
909
+ stored_user_id = row[3] or user_id
910
+ interaction_count = row[4] or 0
911
+
912
+ # Handle user change atomically
913
+ if stored_user_id != user_id:
914
+ self._handle_user_change_atomic(cursor, session_id, stored_user_id, user_id)
915
+
916
+ # Get interaction contexts efficiently
917
+ interaction_contexts = self._get_interaction_contexts_atomic(cursor, session_id)
918
+
919
+ else:
920
+ # Create new session atomically
921
+ cursor.execute("""
922
+ INSERT INTO sessions (session_id, user_id, created_at, last_activity, context_data, user_metadata)
923
+ VALUES (?, ?, datetime('now'), datetime('now'), '{}', '{}')
924
+ """, (session_id, user_id))
925
+
926
+ context_data = {}
927
+ user_metadata = {}
928
+ interaction_contexts = []
929
+ interaction_count = 0
930
+
931
+ # Load user context asynchronously (outside transaction)
932
+ user_context = await self._load_user_context_async(user_id)
933
+
934
+ # Build final context
935
+ final_context = {
936
+ "session_id": session_id,
937
+ "user_id": user_id,
938
+ "interaction_contexts": interaction_contexts,
939
+ "user_context": user_context,
940
+ "preferences": user_metadata.get("preferences", {}),
941
+ "active_tasks": user_metadata.get("active_tasks", []),
942
+ "interaction_count": interaction_count,
943
+ "cache_timestamp": datetime.now().isoformat()
944
+ }
945
+
946
+ # Update cache
947
+ self._warm_memory_cache(session_cache_key, final_context)
948
+
949
+ return self._optimize_context(final_context)
950
+
951
+ def _handle_user_change_atomic(self, cursor, session_id: str, old_user_id: str, new_user_id: str):
952
+ """Handle user change within transaction"""
953
+ logger.info(f"Handling user change in transaction: {old_user_id} -> {new_user_id}")
954
+
955
+ # Update session
956
+ cursor.execute("""
957
+ UPDATE sessions
958
+ SET user_id = ?, last_activity = datetime('now')
959
+ WHERE session_id = ?
960
+ """, (new_user_id, session_id))
961
+
962
+ # Log the change
963
+ try:
964
+ cursor.execute("""
965
+ INSERT INTO user_change_log (session_id, old_user_id, new_user_id, timestamp)
966
+ VALUES (?, ?, ?, datetime('now'))
967
+ """, (session_id, old_user_id, new_user_id))
968
+ except sqlite3.OperationalError:
969
+ # Table might not exist yet
970
+ pass
971
+
972
+ # Invalidate related caches
973
+ try:
974
  cursor.execute("""
975
+ UPDATE interaction_contexts
976
+ SET needs_refresh = 1
977
  WHERE session_id = ?
978
+ """, (session_id,))
979
+ except sqlite3.OperationalError:
980
+ # Column might not exist yet
981
+ pass
982
+
983
+ def _get_interaction_contexts_atomic(self, cursor, session_id: str, limit: int = 20):
984
+ """Get interaction contexts within transaction"""
985
+ try:
986
+ cursor.execute("""
987
+ SELECT interaction_summary, created_at, interaction_id
988
+ FROM interaction_contexts
989
+ WHERE session_id = ? AND (needs_refresh IS NULL OR needs_refresh = 0)
990
+ ORDER BY created_at DESC
991
+ LIMIT ?
992
+ """, (session_id, limit))
993
+ except sqlite3.OperationalError:
994
+ # Fallback if needs_refresh column doesn't exist
995
+ cursor.execute("""
996
+ SELECT interaction_summary, created_at, interaction_id
997
+ FROM interaction_contexts
998
+ WHERE session_id = ?
999
+ ORDER BY created_at DESC
1000
+ LIMIT ?
1001
+ """, (session_id, limit))
1002
+
1003
+ contexts = []
1004
+ for row in cursor.fetchall():
1005
+ if row[0]:
1006
+ contexts.append({
1007
+ "summary": row[0],
1008
+ "timestamp": row[1],
1009
+ "id": row[2] if len(row) > 2 else None
1010
+ })
1011
+
1012
+ return contexts
1013
+
1014
+ async def _load_user_context_async(self, user_id: str):
1015
+ """Load user context asynchronously to avoid blocking"""
1016
+ try:
1017
+ # Check memory cache first
1018
+ user_cache_key = f"user_{user_id}"
1019
+ cached = self._get_from_memory_cache(user_cache_key)
1020
+ if cached:
1021
+ return cached.get("user_context", "")
1022
 
1023
+ # Load from database
1024
+ return await self.get_user_context(user_id)
1025
+ except Exception as e:
1026
+ logger.error(f"Error loading user context: {e}")
1027
+ return ""
1028
+
1029
+ def _is_cache_valid(self, cached_context: dict, max_age_seconds: int = 60) -> bool:
1030
+ """Check if cached context is still valid"""
1031
+ if not cached_context:
1032
+ return False
1033
+
1034
+ cache_timestamp = cached_context.get("cache_timestamp")
1035
+ if not cache_timestamp:
1036
+ return False
1037
+
1038
+ try:
1039
+ cache_time = datetime.fromisoformat(cache_timestamp)
1040
+ age = (datetime.now() - cache_time).total_seconds()
1041
+ return age < max_age_seconds
1042
+ except:
1043
+ return False
1044
+
1045
+ def optimize_database_indexes(self):
1046
+ """Create database indexes for better query performance"""
1047
+ try:
1048
+ conn = sqlite3.connect(self.db_path)
1049
+ cursor = conn.cursor()
1050
 
1051
+ # Create indexes for frequently queried columns
1052
+ indexes = [
1053
+ "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)",
1054
+ "CREATE INDEX IF NOT EXISTS idx_sessions_last_activity ON sessions(last_activity)",
1055
+ "CREATE INDEX IF NOT EXISTS idx_interactions_session_id ON interactions(session_id)",
1056
+ "CREATE INDEX IF NOT EXISTS idx_interaction_contexts_session_id ON interaction_contexts(session_id)",
1057
+ "CREATE INDEX IF NOT EXISTS idx_interaction_contexts_created_at ON interaction_contexts(created_at)",
1058
+ "CREATE INDEX IF NOT EXISTS idx_user_change_log_session_id ON user_change_log(session_id)",
1059
+ "CREATE INDEX IF NOT EXISTS idx_user_contexts_updated_at ON user_contexts(updated_at)"
1060
+ ]
1061
+
1062
+ for index in indexes:
1063
+ try:
1064
+ cursor.execute(index)
1065
+ except sqlite3.OperationalError as e:
1066
+ # Table might not exist yet, skip this index
1067
+ logger.debug(f"Skipping index creation (table may not exist): {e}")
1068
+
1069
+ # Analyze database for query optimization
1070
+ try:
1071
+ cursor.execute("ANALYZE")
1072
+ except sqlite3.OperationalError:
1073
+ # ANALYZE might not be available in all SQLite versions
1074
+ pass
1075
 
1076
  conn.commit()
1077
  conn.close()
1078
 
1079
+ logger.info("✓ Database indexes optimized successfully")
1080
+
1081
  except Exception as e:
1082
+ logger.error(f"Error optimizing database indexes: {e}", exc_info=True)
 
 
1083
 
1084
  def _extract_entities(self, context: dict) -> list:
1085
  """
src/orchestrator_engine.py CHANGED
@@ -49,35 +49,99 @@ class MVPOrchestrator:
49
  # User ID tracking for context system
50
  self._current_user_id = {} # session_id -> user_id
51
 
 
 
 
52
  logger.info("MVPOrchestrator initialized with safety revision thresholds")
53
 
54
  def set_user_id(self, session_id: str, user_id: str):
55
- """Set user_id for a session"""
56
- self._current_user_id[session_id] = user_id
57
- logger.info(f"Set user_id={user_id} for session {session_id}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
  async def process_request(self, session_id: str, user_input: str) -> dict:
60
  """
61
- Main orchestration flow with academic differentiation and enhanced reasoning chain
62
  """
63
  logger.info(f"Processing request for session {session_id}")
64
  logger.info(f"User input: {user_input[:100]}")
65
 
66
- # Safety context bypass: Skip safety checks for binary responses to safety prompts
67
  user_input_upper = user_input.strip().upper()
68
  is_binary_response = user_input_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N']
69
 
 
70
  if is_binary_response and self.awaiting_safety_response.get(session_id, False):
71
- logger.info(f"Binary safety response detected ({user_input_upper}) - bypassing safety check to prevent loop")
72
- # Clear the flag immediately to prevent re-triggering
 
73
  self.awaiting_safety_response[session_id] = False
74
 
75
- # Return a signal that this should be handled by the choice handler, not normal processing
 
 
 
 
76
  return {
77
  'is_safety_response': True,
78
  'response': user_input_upper,
79
  'requires_user_choice': False,
80
- 'skip_safety_check': True
 
 
81
  }
82
 
83
  # Clear previous trace for new request
@@ -98,11 +162,15 @@ class MVPOrchestrator:
98
  interaction_id = self._generate_interaction_id(session_id)
99
  logger.info(f"Generated interaction ID: {interaction_id}")
100
 
101
- # Step 2: Context management with reasoning
102
- logger.info("Step 2: Managing context...")
103
- # Get user_id from context or default to Test_Any (passed from app.py)
104
- user_id = getattr(self, '_current_user_id', {}).get(session_id, "Test_Any")
105
- context = await self.context_manager.manage_context(session_id, user_input, user_id=user_id)
 
 
 
 
106
  logger.info(f"Context retrieved: {len(context.get('interaction_contexts', []))} interaction contexts")
107
 
108
  # Add context analysis to reasoning chain
 
49
  # User ID tracking for context system
50
  self._current_user_id = {} # session_id -> user_id
51
 
52
+ # Context cache to prevent loops
53
+ self._context_cache = {} # cache_key -> {context, timestamp}
54
+
55
  logger.info("MVPOrchestrator initialized with safety revision thresholds")
56
 
57
  def set_user_id(self, session_id: str, user_id: str):
58
+ """Set user_id with loop prevention"""
59
+ # Check if user_id actually changed
60
+ old_user_id = self._current_user_id.get(session_id)
61
+
62
+ if old_user_id != user_id:
63
+ self._current_user_id[session_id] = user_id
64
+ logger.info(f"Set user_id={user_id} for session {session_id} (was: {old_user_id})")
65
+
66
+ # Clear context cache on user change
67
+ cache_key = f"context_{session_id}"
68
+ if cache_key in self._context_cache:
69
+ del self._context_cache[cache_key]
70
+ logger.info(f"Cleared context cache for session {session_id} due to user change")
71
+ else:
72
+ self._current_user_id[session_id] = user_id
73
+
74
+ def _get_user_id_for_session(self, session_id: str) -> str:
75
+ """Get user_id without triggering context loops"""
76
+ # Use in-memory mapping first
77
+ if hasattr(self, '_current_user_id') and session_id in self._current_user_id:
78
+ return self._current_user_id[session_id]
79
+
80
+ # Fallback to default if not found
81
+ return "Test_Any"
82
+
83
+ async def _get_or_create_context(self, session_id: str, user_input: str, user_id: str) -> dict:
84
+ """Get context with loop prevention and caching"""
85
+ # Check if we recently fetched context for this session
86
+ cache_key = f"context_{session_id}"
87
+ current_time = time.time()
88
+
89
+ if hasattr(self, '_context_cache'):
90
+ cached = self._context_cache.get(cache_key)
91
+ if cached and (current_time - cached['timestamp']) < 5: # 5 second cache
92
+ logger.info(f"Using cached context for session {session_id}")
93
+ return cached['context']
94
+
95
+ # Fetch new context
96
+ context = await self.context_manager.manage_context(session_id, user_input, user_id=user_id)
97
+
98
+ # Cache the context
99
+ if not hasattr(self, '_context_cache'):
100
+ self._context_cache = {}
101
+
102
+ self._context_cache[cache_key] = {
103
+ 'context': context,
104
+ 'timestamp': current_time
105
+ }
106
+
107
+ # Clean old cache entries
108
+ if len(self._context_cache) > 100:
109
+ # Remove oldest entries
110
+ sorted_items = sorted(self._context_cache.items(), key=lambda x: x[1]['timestamp'])
111
+ self._context_cache = dict(sorted_items[-50:])
112
+
113
+ return context
114
 
115
  async def process_request(self, session_id: str, user_input: str) -> dict:
116
  """
117
+ Main orchestration flow with loop prevention
118
  """
119
  logger.info(f"Processing request for session {session_id}")
120
  logger.info(f"User input: {user_input[:100]}")
121
 
122
+ # Critical: Prevent safety check loops on binary responses
123
  user_input_upper = user_input.strip().upper()
124
  is_binary_response = user_input_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N']
125
 
126
+ # Check if we're in a safety response context
127
  if is_binary_response and self.awaiting_safety_response.get(session_id, False):
128
+ logger.info(f"Binary safety response detected ({user_input_upper}) - bypassing recursive safety check")
129
+
130
+ # Immediately clear the flag to prevent any further loops
131
  self.awaiting_safety_response[session_id] = False
132
 
133
+ # Remove from pending choices if exists
134
+ if hasattr(self, '_pending_choices'):
135
+ self._pending_choices.pop(session_id, None)
136
+
137
+ # Return with skip flag to prevent further processing
138
  return {
139
  'is_safety_response': True,
140
  'response': user_input_upper,
141
  'requires_user_choice': False,
142
+ 'skip_safety_check': True,
143
+ 'final_response': f"Choice '{user_input_upper}' has been applied.",
144
+ 'bypass_reason': 'binary_safety_response'
145
  }
146
 
147
  # Clear previous trace for new request
 
162
  interaction_id = self._generate_interaction_id(session_id)
163
  logger.info(f"Generated interaction ID: {interaction_id}")
164
 
165
+ # Step 2: Context management with loop prevention
166
+ logger.info("Step 2: Managing context with loop prevention...")
167
+
168
+ # Get user_id from stored mapping, avoiding context retrieval loops
169
+ user_id = self._get_user_id_for_session(session_id)
170
+
171
+ # Use context with deduplication check
172
+ context = await self._get_or_create_context(session_id, user_input, user_id)
173
+
174
  logger.info(f"Context retrieved: {len(context.get('interaction_contexts', []))} interaction contexts")
175
 
176
  # Add context analysis to reasoning chain