JatsTheAIGen commited on
Commit
f759046
·
1 Parent(s): 207f9f7

cache key error when user id changes -fixed task 1 31_10_2025 v8

Browse files
OPTIMIZATION_ENHANCEMENTS_REVIEW.md ADDED
@@ -0,0 +1,116 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Optimization Enhancements - Review and Implementation Plan
2
+
3
+ ## Executive Summary
4
+
5
+ This document reviews the requested optimization enhancements and provides an implementation plan with any required deviations from the original specifications.
6
+
7
+ ## Current State Analysis
8
+
9
+ ### ✅ Already Implemented (Partial)
10
+
11
+ 1. **Parallel Processing**:
12
+ - `process_request_parallel()` method exists (lines 696-751 in src/orchestrator_engine.py)
13
+ - Runs intent, skills, and safety agents in parallel using `asyncio.gather()`
14
+ - **Deviation Required**: The requested `process_agents_parallel()` method with different signature needs to be added
15
+
16
+ 2. **Context Caching**:
17
+ - Basic caching infrastructure exists with `session_cache` dictionary
18
+ - Cache config has TTL defined (3600s) but expiration not actively checked
19
+ - `_is_cache_valid()` exists but uses hardcoded 60s instead of config TTL
20
+ - **Deviation Required**: Need to add `add_context_cache()` method with proper TTL expiration
21
+
22
+ 3. **Metrics Tracking**:
23
+ - Basic token_count tracking exists in metadata
24
+ - Processing time tracked
25
+ - **Deviation Required**: Need comprehensive `track_response_metrics()` method with structured logging
26
+
27
+ ### ❌ Not Implemented
28
+
29
+ 4. **Query Similarity Detection**: No implementation found
30
+ 5. **Smart Context Pruning**: No token-count-based pruning exists
31
+
32
+ ## Implementation Plan
33
+
34
+ ### Step 1: Optimize Agent Chain
35
+ **Status**: ⚠️ Partial Implementation
36
+ **Action Required**: Add new `process_agents_parallel()` method while keeping existing `process_request_parallel()`
37
+
38
+ **Deviation Notes**:
39
+ - Existing `process_request_parallel()` handles intent+skills+safety together
40
+ - New method will be more generic for any agent pair execution
41
+ - Will integrate with existing parallel processing flow
42
+
43
+ ### Step 2: Implement Context Caching with TTL
44
+ **Status**: ⚠️ Infrastructure exists, expiration missing
45
+ **Action Required**: Add `add_context_cache()` method with expiration checking
46
+
47
+ **Deviation Notes**:
48
+ - Cache expiration needs to be checked on retrieval, not just set on store
49
+ - Will modify `_get_from_memory_cache()` to check expiration
50
+ - Will respect existing `cache_config['ttl']` value (3600s)
51
+
52
+ ### Step 3: Add Query Similarity Detection
53
+ **Status**: ❌ Not Implemented
54
+ **Action Required**: Implement similarity checking using embeddings
55
+
56
+ **Deviation Notes**:
57
+ - FAISS infrastructure exists but is incomplete
58
+ - Will use simple string similarity (Levenshtein/cosine) for MVP
59
+ - Can be enhanced with embeddings later if needed
60
+ - Will cache recent queries in orchestrator for similarity checking
61
+
62
+ ### Step 4: Implement Smart Context Pruning
63
+ **Status**: ❌ Not Implemented
64
+ **Action Required**: Add `prune_context()` method with token counting
65
+
66
+ **Deviation Notes**:
67
+ - Token counting will use approximate method (4 chars ≈ 1 token)
68
+ - Will preserve most recent interactions + most relevant (by keyword match)
69
+ - Pruning threshold: 2000 tokens (configurable)
70
+
71
+ ### Step 5: Add Response Metrics Tracking
72
+ **Status**: ⚠️ Partial Implementation
73
+ **Action Required**: Add comprehensive `track_response_metrics()` method
74
+
75
+ **Deviation Notes**:
76
+ - Will extend existing metadata tracking
77
+ - Add structured logging for metrics
78
+ - Track: latency, token_count, agent_calls, safety_score
79
+
80
+ ## Files to Modify
81
+
82
+ 1. `Research_AI_Assistant/src/orchestrator_engine.py`
83
+ - Add `process_agents_parallel()` method
84
+ - Add query similarity detection
85
+ - Add response metrics tracking
86
+ - Add agent_call_count tracking
87
+
88
+ 2. `Research_AI_Assistant/src/context_manager.py`
89
+ - Add `add_context_cache()` with TTL
90
+ - Enhance `_get_from_memory_cache()` with expiration check
91
+ - Add `prune_context()` method
92
+ - Add `get_token_count()` helper
93
+
94
+ ## Compatibility Considerations
95
+
96
+ - All enhancements will be backward compatible
97
+ - Existing functionality preserved
98
+ - New methods will be additive, not replacing existing code
99
+ - Cache TTL will respect existing config values
100
+
101
+ ## Testing Recommendations
102
+
103
+ 1. Test parallel agent execution with various agent combinations
104
+ 2. Verify cache expiration works correctly (test with different TTL values)
105
+ 3. Test query similarity with similar queries (threshold: 0.85)
106
+ 4. Verify context pruning maintains important information
107
+ 5. Validate metrics are tracked correctly in logs
108
+
109
+ ## Implementation Status
110
+
111
+ - [ ] Step 1: Optimize Agent Chain
112
+ - [ ] Step 2: Implement Context Caching
113
+ - [ ] Step 3: Add Query Similarity Detection
114
+ - [ ] Step 4: Implement Smart Context Pruning
115
+ - [ ] Step 5: Add Response Metrics Tracking
116
+
OPTIMIZATION_IMPLEMENTATION_COMPLETE.md ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Optimization Enhancements - Implementation Complete
2
+
3
+ ## Summary
4
+
5
+ All 5 optimization enhancements have been successfully implemented with the following deviations and notes:
6
+
7
+ ## ✅ Step 1: Optimize Agent Chain
8
+
9
+ **Implementation**: Added `process_agents_parallel()` method in `orchestrator_engine.py`
10
+
11
+ **Location**: `Research_AI_Assistant/src/orchestrator_engine.py` lines 704-744
12
+
13
+ **Features**:
14
+ - Processes intent and skills agents in parallel using `asyncio.gather()`
15
+ - Tracks agent call count for metrics
16
+ - Handles exceptions gracefully
17
+ - Returns list of results in order [intent_result, skills_result]
18
+
19
+ **Deviation**: Method signature differs from original specification to work with existing agent structure. Uses dictionary input instead of direct request object.
20
+
21
+ ## ✅ Step 2: Implement Context Caching with TTL
22
+
23
+ **Implementation**: Added `add_context_cache()` method with expiration checking
24
+
25
+ **Location**: `Research_AI_Assistant/src/context_manager.py` lines 632-649
26
+
27
+ **Features**:
28
+ - Stores cache entries with expiration timestamps
29
+ - TTL default: 3600 seconds (1 hour) from `cache_config`
30
+ - Automatic expiration check in `_get_from_memory_cache()`
31
+ - Backward compatible with old cache format
32
+
33
+ **Integration**:
34
+ - `_get_from_memory_cache()` now checks expiration before returning
35
+ - Cache entries stored with structure: `{'value': context, 'expires': timestamp, 'timestamp': timestamp}`
36
+ - Expired entries automatically removed
37
+
38
+ ## ✅ Step 3: Add Query Similarity Detection
39
+
40
+ **Implementation**: Added `check_query_similarity()` and `_calculate_similarity()` methods
41
+
42
+ **Location**: `Research_AI_Assistant/src/orchestrator_engine.py` lines 1982-2045
43
+
44
+ **Features**:
45
+ - Uses Jaccard similarity on word sets for comparison
46
+ - Default threshold: 0.85 (configurable)
47
+ - Stores recent queries in `self.recent_queries` list (last 50 queries)
48
+ - Checks most recent queries first for better performance
49
+ - Early exit in `process_request()` for duplicate detection
50
+
51
+ **Algorithm**:
52
+ - Jaccard similarity: `intersection / union` of word sets
53
+ - Substring matching for very similar queries (boosts score to 0.9)
54
+ - Case-insensitive comparison
55
+
56
+ **Note**: Can be enhanced with embeddings for semantic similarity in future.
57
+
58
+ ## ✅ Step 4: Implement Smart Context Pruning
59
+
60
+ **Implementation**: Added `prune_context()` and `get_token_count()` methods
61
+
62
+ **Location**: `Research_AI_Assistant/src/context_manager.py` lines 651-755
63
+
64
+ **Features**:
65
+ - Token counting using approximation: 4 characters ≈ 1 token
66
+ - Default max tokens: 2000 (configurable)
67
+ - Priority system:
68
+ 1. User context (essential)
69
+ 2. Session context (essential)
70
+ 3. Most recent interaction contexts (fits in remaining budget)
71
+ - Preserves most recent interactions first
72
+ - Logs pruning statistics
73
+
74
+ **Integration**:
75
+ - Called automatically in `_optimize_context()` before formatting
76
+ - Ensures context stays within token limits for LLM consumption
77
+
78
+ ## ✅ Step 5: Add Response Metrics Tracking
79
+
80
+ **Implementation**: Added `track_response_metrics()` method
81
+
82
+ **Location**: `Research_AI_Assistant/src/orchestrator_engine.py` lines 2047-2100
83
+
84
+ **Features**:
85
+ - Tracks latency (processing time)
86
+ - Tracks token count (word count approximation)
87
+ - Tracks agent calls (incremented during parallel processing)
88
+ - Tracks safety score (extracted from metadata)
89
+ - Stores metrics history (last 100 entries)
90
+ - Logs metrics for monitoring
91
+ - Resets agent call count after each request
92
+
93
+ **Metrics Tracked**:
94
+ - `latency`: Processing time in seconds
95
+ - `token_count`: Approximate tokens in response
96
+ - `agent_calls`: Number of agents called during processing
97
+ - `safety_score`: Overall safety score from safety analysis
98
+ - `timestamp`: ISO timestamp of the metrics
99
+
100
+ ## Integration Points
101
+
102
+ ### Orchestrator Engine (`src/orchestrator_engine.py`)
103
+ - Initialized tracking variables in `__init__()`:
104
+ - `self.recent_queries = []`
105
+ - `self.agent_call_count = 0`
106
+ - `self.response_metrics_history = []`
107
+ - Query similarity checked early in `process_request()`
108
+ - Metrics tracked after response generation
109
+ - Recent queries stored for similarity checking
110
+
111
+ ### Context Manager (`src/context_manager.py`)
112
+ - Cache structure updated to support TTL
113
+ - Context pruning integrated into `_optimize_context()`
114
+ - Cache expiration checked on retrieval
115
+ - Token counting utilities added
116
+
117
+ ## Testing Recommendations
118
+
119
+ 1. **Parallel Processing**: Test with multiple agent combinations
120
+ 2. **Cache TTL**: Verify expiration after TTL period (change TTL to short value for testing)
121
+ 3. **Query Similarity**: Test with similar queries (e.g., "What is AI?" vs "Tell me about AI")
122
+ 4. **Context Pruning**: Test with large contexts (add many interaction contexts)
123
+ 5. **Metrics Tracking**: Verify metrics appear in logs and history
124
+
125
+ ## Configuration
126
+
127
+ - **Cache TTL**: Set in `context_manager.cache_config['ttl']` (default: 3600s)
128
+ - **Similarity Threshold**: Set in `check_query_similarity(threshold=0.85)`
129
+ - **Max Tokens**: Set in `prune_context(max_tokens=2000)`
130
+ - **Max Recent Queries**: Set in `self.max_recent_queries` (default: 50)
131
+
132
+ ## Backward Compatibility
133
+
134
+ All enhancements are backward compatible:
135
+ - Old cache format still works (direct value storage)
136
+ - New cache format detected and handled appropriately
137
+ - Existing functionality preserved
138
+ - No breaking changes to API
139
+
140
+ ## Performance Impact
141
+
142
+ - **Parallel Processing**: Reduces latency for multi-agent operations
143
+ - **Cache with TTL**: Reduces database queries
144
+ - **Query Similarity**: Prevents duplicate processing
145
+ - **Context Pruning**: Ensures context fits within LLM token limits
146
+ - **Metrics Tracking**: Minimal overhead (logging only)
147
+
148
+ ## Future Enhancements
149
+
150
+ 1. **Query Similarity**: Use embeddings for semantic similarity
151
+ 2. **Context Pruning**: Implement relevance-based ranking (not just recency)
152
+ 3. **Metrics Tracking**: Add metrics aggregation and analytics
153
+ 4. **Cache**: Implement LRU eviction policy (currently only TTL)
154
+
src/context_manager.py CHANGED
@@ -5,6 +5,7 @@ import logging
5
  import uuid
6
  import hashlib
7
  import threading
 
8
  from contextlib import contextmanager
9
  from datetime import datetime, timedelta
10
  from typing import Dict, Optional, List
@@ -249,12 +250,23 @@ class EfficientContextManager:
249
  session_context = self._get_from_memory_cache(session_cache_key)
250
 
251
  # Check if cached session context matches current user_id
252
- if session_context and session_context.get("user_id") != user_id:
253
- # User changed, invalidate session cache
254
- logger.info(f"User mismatch in cache for session {session_id}, invalidating cache")
255
- session_context = None
256
- if session_cache_key in self.session_cache:
257
- del self.session_cache[session_cache_key]
 
 
 
 
 
 
 
 
 
 
 
258
 
259
  # Get user context separately
260
  user_context = self._get_from_memory_cache(user_cache_key)
@@ -263,8 +275,8 @@ class EfficientContextManager:
263
  # Retrieve from database with user context
264
  session_context = await self._retrieve_from_db(session_id, user_input, user_id)
265
 
266
- # Cache session context (cache invalidation for user changes is handled in _retrieve_from_db)
267
- self._warm_memory_cache(session_cache_key, session_context)
268
 
269
  # Handle user context separately - load only once and cache thereafter
270
  # Cache does not refer to database after initial load
@@ -572,10 +584,15 @@ Keep the summary concise (approximately 100 tokens)."""
572
  """
573
  Optimize context for LLM consumption
574
  Format: [Session Context] + [User Context] + [Interaction Context #N, #N-1, ...]
 
 
575
  """
576
- user_context = context.get("user_context", "")
577
- interaction_contexts = context.get("interaction_contexts", [])
578
- session_context = context.get("session_context", {})
 
 
 
579
  session_summary = session_context.get("summary", "") if isinstance(session_context, dict) else ""
580
 
581
  # Format interaction contexts as requested
@@ -593,22 +610,175 @@ Keep the summary concise (approximately 100 tokens)."""
593
  combined_context += "\n\n".join(formatted_interactions)
594
 
595
  return {
596
- "session_id": context.get("session_id"),
597
- "user_id": context.get("user_id", "Test_Any"),
598
  "user_context": user_context,
599
  "session_context": session_context,
600
  "interaction_contexts": interaction_contexts,
601
  "combined_context": combined_context, # For direct use in prompts
602
- "preferences": context.get("preferences", {}),
603
- "active_tasks": context.get("active_tasks", []),
604
- "last_activity": context.get("last_activity")
605
  }
606
 
607
  def _get_from_memory_cache(self, cache_key: str) -> dict:
608
  """
609
- Retrieve context from in-memory session cache
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
610
  """
611
- return self.session_cache.get(cache_key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
612
 
613
  async def _retrieve_from_db(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
614
  """
@@ -809,8 +979,10 @@ Keep the summary concise (approximately 100 tokens)."""
809
  def _warm_memory_cache(self, cache_key: str, context: dict):
810
  """
811
  Warm the in-memory cache with retrieved context
 
812
  """
813
- self.session_cache[cache_key] = context
 
814
 
815
  def _update_cache_with_interaction_context(self, session_id: str, interaction_summary: str, created_at: str):
816
  """
 
5
  import uuid
6
  import hashlib
7
  import threading
8
+ import time
9
  from contextlib import contextmanager
10
  from datetime import datetime, timedelta
11
  from typing import Dict, Optional, List
 
250
  session_context = self._get_from_memory_cache(session_cache_key)
251
 
252
  # Check if cached session context matches current user_id
253
+ # Handle both old and new cache formats
254
+ cached_entry = self.session_cache.get(session_cache_key)
255
+ if cached_entry:
256
+ # Extract actual context from cache entry
257
+ if isinstance(cached_entry, dict) and 'value' in cached_entry:
258
+ actual_context = cached_entry.get('value', {})
259
+ else:
260
+ actual_context = cached_entry
261
+
262
+ if actual_context and actual_context.get("user_id") != user_id:
263
+ # User changed, invalidate session cache
264
+ logger.info(f"User mismatch in cache for session {session_id}, invalidating cache")
265
+ session_context = None
266
+ if session_cache_key in self.session_cache:
267
+ del self.session_cache[session_cache_key]
268
+ else:
269
+ session_context = actual_context
270
 
271
  # Get user context separately
272
  user_context = self._get_from_memory_cache(user_cache_key)
 
275
  # Retrieve from database with user context
276
  session_context = await self._retrieve_from_db(session_id, user_input, user_id)
277
 
278
+ # Step 2: Cache session context with TTL
279
+ self.add_context_cache(session_cache_key, session_context, ttl=self.cache_config.get("ttl", 3600))
280
 
281
  # Handle user context separately - load only once and cache thereafter
282
  # Cache does not refer to database after initial load
 
584
  """
585
  Optimize context for LLM consumption
586
  Format: [Session Context] + [User Context] + [Interaction Context #N, #N-1, ...]
587
+
588
+ Applies smart pruning before formatting.
589
  """
590
+ # Step 4: Prune context if it exceeds token limits
591
+ pruned_context = self.prune_context(context, max_tokens=2000)
592
+
593
+ user_context = pruned_context.get("user_context", "")
594
+ interaction_contexts = pruned_context.get("interaction_contexts", [])
595
+ session_context = pruned_context.get("session_context", {})
596
  session_summary = session_context.get("summary", "") if isinstance(session_context, dict) else ""
597
 
598
  # Format interaction contexts as requested
 
610
  combined_context += "\n\n".join(formatted_interactions)
611
 
612
  return {
613
+ "session_id": pruned_context.get("session_id"),
614
+ "user_id": pruned_context.get("user_id", "Test_Any"),
615
  "user_context": user_context,
616
  "session_context": session_context,
617
  "interaction_contexts": interaction_contexts,
618
  "combined_context": combined_context, # For direct use in prompts
619
+ "preferences": pruned_context.get("preferences", {}),
620
+ "active_tasks": pruned_context.get("active_tasks", []),
621
+ "last_activity": pruned_context.get("last_activity")
622
  }
623
 
624
  def _get_from_memory_cache(self, cache_key: str) -> dict:
625
  """
626
+ Retrieve context from in-memory session cache with expiration check
627
+ """
628
+ cached = self.session_cache.get(cache_key)
629
+ if not cached:
630
+ return None
631
+
632
+ # Check if it's the new format with expiration
633
+ if isinstance(cached, dict) and 'value' in cached:
634
+ # New format with TTL
635
+ if self._is_cache_expired(cached):
636
+ # Remove expired cache entry
637
+ del self.session_cache[cache_key]
638
+ logger.debug(f"Cache expired for key: {cache_key}")
639
+ return None
640
+ return cached.get('value')
641
+ else:
642
+ # Old format (direct value) - return as-is for backward compatibility
643
+ return cached
644
+
645
+ def _is_cache_expired(self, cache_entry: dict) -> bool:
646
+ """
647
+ Check if cache entry has expired based on TTL
648
+ """
649
+ if not isinstance(cache_entry, dict):
650
+ return True
651
+
652
+ expires = cache_entry.get('expires')
653
+ if not expires:
654
+ return False # No expiration set, consider valid
655
+
656
+ return time.time() > expires
657
+
658
+ def add_context_cache(self, key: str, value: dict, ttl: int = 3600):
659
+ """
660
+ Step 2: Implement Context Caching with TTL expiration
661
+
662
+ Add context to cache with expiration time.
663
+
664
+ Args:
665
+ key: Cache key
666
+ value: Value to cache (dict)
667
+ ttl: Time to live in seconds (default 3600 = 1 hour)
668
+ """
669
+ import time
670
+ self.session_cache[key] = {
671
+ 'value': value,
672
+ 'expires': time.time() + ttl,
673
+ 'timestamp': time.time()
674
+ }
675
+ logger.debug(f"Cached context for key: {key} with TTL: {ttl}s")
676
+
677
+ def get_token_count(self, text: str) -> int:
678
  """
679
+ Approximate token count for text (4 characters ≈ 1 token)
680
+
681
+ Args:
682
+ text: Text to count tokens for
683
+
684
+ Returns:
685
+ Approximate token count
686
+ """
687
+ if not text:
688
+ return 0
689
+ # Simple approximation: 4 characters per token
690
+ return len(text) // 4
691
+
692
+ def prune_context(self, context: dict, max_tokens: int = 2000) -> dict:
693
+ """
694
+ Step 4: Implement Smart Context Pruning
695
+
696
+ Prune context to stay within token limit while keeping most recent and relevant content.
697
+
698
+ Args:
699
+ context: Context dictionary to prune
700
+ max_tokens: Maximum token count (default 2000)
701
+
702
+ Returns:
703
+ Pruned context dictionary
704
+ """
705
+ try:
706
+ # Calculate current token count
707
+ current_tokens = self._calculate_context_tokens(context)
708
+
709
+ if current_tokens <= max_tokens:
710
+ return context # No pruning needed
711
+
712
+ logger.info(f"Context token count ({current_tokens}) exceeds limit ({max_tokens}), pruning...")
713
+
714
+ # Create a copy to avoid modifying original
715
+ pruned_context = context.copy()
716
+
717
+ # Priority: Keep most recent interactions + session context + user context
718
+ interaction_contexts = pruned_context.get('interaction_contexts', [])
719
+ session_context = pruned_context.get('session_context', {})
720
+ user_context = pruned_context.get('user_context', '')
721
+
722
+ # Keep user context and session context (essential)
723
+ essential_tokens = (
724
+ self.get_token_count(user_context) +
725
+ self.get_token_count(str(session_context))
726
+ )
727
+
728
+ # Calculate how many interaction contexts we can keep
729
+ available_tokens = max_tokens - essential_tokens
730
+ if available_tokens < 0:
731
+ # Essential context itself is too large - summarize user context
732
+ if self.get_token_count(user_context) > max_tokens // 2:
733
+ pruned_context['user_context'] = user_context[:max_tokens * 2] # Rough cut
734
+ logger.warning(f"User context too large, truncated")
735
+ return pruned_context
736
+
737
+ # Keep most recent interactions that fit in token budget
738
+ kept_interactions = []
739
+ current_size = 0
740
+
741
+ for interaction in interaction_contexts:
742
+ summary = interaction.get('summary', '')
743
+ interaction_tokens = self.get_token_count(summary)
744
+
745
+ if current_size + interaction_tokens <= available_tokens:
746
+ kept_interactions.append(interaction)
747
+ current_size += interaction_tokens
748
+ else:
749
+ break # Can't fit any more
750
+
751
+ pruned_context['interaction_contexts'] = kept_interactions
752
+
753
+ logger.info(f"Pruned context: kept {len(kept_interactions)}/{len(interaction_contexts)} interactions, "
754
+ f"reduced from {current_tokens} to {self._calculate_context_tokens(pruned_context)} tokens")
755
+
756
+ return pruned_context
757
+
758
+ except Exception as e:
759
+ logger.error(f"Error pruning context: {e}", exc_info=True)
760
+ return context # Return original on error
761
+
762
+ def _calculate_context_tokens(self, context: dict) -> int:
763
+ """Calculate total token count for context"""
764
+ total = 0
765
+
766
+ # Count tokens in each component
767
+ user_context = context.get('user_context', '')
768
+ total += self.get_token_count(str(user_context))
769
+
770
+ session_context = context.get('session_context', {})
771
+ if isinstance(session_context, dict):
772
+ total += self.get_token_count(str(session_context.get('summary', '')))
773
+ else:
774
+ total += self.get_token_count(str(session_context))
775
+
776
+ interaction_contexts = context.get('interaction_contexts', [])
777
+ for interaction in interaction_contexts:
778
+ summary = interaction.get('summary', '')
779
+ total += self.get_token_count(str(summary))
780
+
781
+ return total
782
 
783
  async def _retrieve_from_db(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict:
784
  """
 
979
  def _warm_memory_cache(self, cache_key: str, context: dict):
980
  """
981
  Warm the in-memory cache with retrieved context
982
+ Note: Use add_context_cache() instead for TTL support
983
  """
984
+ # Use add_context_cache for consistency with TTL
985
+ self.add_context_cache(cache_key, context, ttl=self.cache_config.get("ttl", 3600))
986
 
987
  def _update_cache_with_interaction_context(self, session_id: str, interaction_summary: str, created_at: str):
988
  """
src/orchestrator_engine.py CHANGED
@@ -57,6 +57,14 @@ class MVPOrchestrator:
57
  # Context cache to prevent loops
58
  self._context_cache = {} # cache_key -> {context, timestamp}
59
 
 
 
 
 
 
 
 
 
60
  logger.info("MVPOrchestrator initialized with safety revision thresholds")
61
 
62
  def set_user_id(self, session_id: str, user_id: str):
@@ -163,6 +171,16 @@ class MVPOrchestrator:
163
  }
164
 
165
  try:
 
 
 
 
 
 
 
 
 
 
166
  # Step 1: Generate unique interaction ID
167
  interaction_id = self._generate_interaction_id(session_id)
168
  logger.info(f"Generated interaction ID: {interaction_id}")
@@ -486,6 +504,19 @@ This response has been flagged for potential safety concerns:
486
  except Exception as e:
487
  logger.error(f"Error generating interaction context: {e}", exc_info=True)
488
 
 
 
 
 
 
 
 
 
 
 
 
 
 
489
  logger.info(f"Request processing complete. Response length: {len(response_text)}")
490
  return result
491
 
@@ -693,6 +724,48 @@ This response has been flagged for potential safety concerns:
693
 
694
  return " | ".join(summary_parts) if summary_parts else "No prior context"
695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
696
  async def process_request_parallel(self, session_id: str, user_input: str, context: Dict) -> Dict:
697
  """Process intent, skills, and safety in parallel"""
698
 
@@ -714,6 +787,9 @@ This response has been flagged for potential safety concerns:
714
  context=context
715
  )
716
 
 
 
 
717
  # Wait for all to complete
718
  results = await asyncio.gather(
719
  intent_task,
@@ -1904,3 +1980,123 @@ Revised Response:"""
1904
  Additional guidance for response: {improvement_instructions}. Ensure all advice is specific, actionable, and acknowledges different backgrounds and circumstances."""
1905
 
1906
  return improved_prompt
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  # Context cache to prevent loops
58
  self._context_cache = {} # cache_key -> {context, timestamp}
59
 
60
+ # Query similarity tracking for duplicate detection
61
+ self.recent_queries = [] # List of {query, response, timestamp}
62
+ self.max_recent_queries = 50 # Keep last 50 queries
63
+
64
+ # Response metrics tracking
65
+ self.agent_call_count = 0
66
+ self.response_metrics_history = [] # Store recent metrics
67
+
68
  logger.info("MVPOrchestrator initialized with safety revision thresholds")
69
 
70
  def set_user_id(self, session_id: str, user_id: str):
 
171
  }
172
 
173
  try:
174
+ # Step 3: Check query similarity BEFORE processing (early exit for duplicates)
175
+ # Note: This happens early to skip full processing for identical/similar queries
176
+ similar_response = self.check_query_similarity(user_input, threshold=0.95) # Higher threshold for exact duplicates
177
+ if similar_response:
178
+ logger.info(f"Similar/duplicate query detected, using cached response")
179
+ # Still track metrics for cached response (minimal processing)
180
+ metrics_start = time.time()
181
+ self.track_response_metrics(metrics_start, similar_response)
182
+ return similar_response
183
+
184
  # Step 1: Generate unique interaction ID
185
  interaction_id = self._generate_interaction_id(session_id)
186
  logger.info(f"Generated interaction ID: {interaction_id}")
 
504
  except Exception as e:
505
  logger.error(f"Error generating interaction context: {e}", exc_info=True)
506
 
507
+ # Track response metrics
508
+ self.track_response_metrics(start_time, result)
509
+
510
+ # Store query and response for similarity checking
511
+ self.recent_queries.append({
512
+ 'query': user_input,
513
+ 'response': result,
514
+ 'timestamp': time.time()
515
+ })
516
+ # Keep only recent queries
517
+ if len(self.recent_queries) > self.max_recent_queries:
518
+ self.recent_queries = self.recent_queries[-self.max_recent_queries:]
519
+
520
  logger.info(f"Request processing complete. Response length: {len(response_text)}")
521
  return result
522
 
 
724
 
725
  return " | ".join(summary_parts) if summary_parts else "No prior context"
726
 
727
+ async def process_agents_parallel(self, request: Dict) -> List:
728
+ """
729
+ Step 1: Optimize Agent Chain - Process multiple agents in parallel
730
+
731
+ Args:
732
+ request: Dictionary containing request data with 'user_input' and 'context'
733
+
734
+ Returns:
735
+ List of agent results in order [intent_result, skills_result]
736
+ """
737
+ user_input = request.get('user_input', '')
738
+ context = request.get('context', {})
739
+
740
+ # Increment agent call count for metrics
741
+ self.agent_call_count += 2 # Two agents called
742
+
743
+ tasks = [
744
+ self.agents['intent_recognition'].execute(
745
+ user_input=user_input,
746
+ context=context
747
+ ),
748
+ self.agents['skills_identification'].execute(
749
+ user_input=user_input,
750
+ context=context
751
+ ),
752
+ ]
753
+
754
+ try:
755
+ results = await asyncio.gather(*tasks, return_exceptions=True)
756
+ # Handle exceptions
757
+ processed_results = []
758
+ for idx, result in enumerate(results):
759
+ if isinstance(result, Exception):
760
+ logger.error(f"Agent task {idx} failed: {result}")
761
+ processed_results.append({})
762
+ else:
763
+ processed_results.append(result)
764
+ return processed_results
765
+ except Exception as e:
766
+ logger.error(f"Error in parallel agent processing: {e}", exc_info=True)
767
+ return [{}, {}]
768
+
769
  async def process_request_parallel(self, session_id: str, user_input: str, context: Dict) -> Dict:
770
  """Process intent, skills, and safety in parallel"""
771
 
 
787
  context=context
788
  )
789
 
790
+ # Increment agent call count for metrics
791
+ self.agent_call_count += 3
792
+
793
  # Wait for all to complete
794
  results = await asyncio.gather(
795
  intent_task,
 
1980
  Additional guidance for response: {improvement_instructions}. Ensure all advice is specific, actionable, and acknowledges different backgrounds and circumstances."""
1981
 
1982
  return improved_prompt
1983
+
1984
+ def check_query_similarity(self, new_query: str, threshold: float = 0.85) -> Optional[Dict]:
1985
+ """
1986
+ Step 3: Add Query Similarity Detection
1987
+
1988
+ Check if new query is similar to any recent queries above threshold.
1989
+ Uses simple string similarity (can be enhanced with embeddings later).
1990
+
1991
+ Args:
1992
+ new_query: The new query to check
1993
+ threshold: Similarity threshold (default 0.85)
1994
+
1995
+ Returns:
1996
+ Cached response dict if similar query found, None otherwise
1997
+ """
1998
+ if not self.recent_queries:
1999
+ return None
2000
+
2001
+ new_query_lower = new_query.lower().strip()
2002
+
2003
+ for cached_query_data in reversed(self.recent_queries): # Check most recent first
2004
+ cached_query = cached_query_data.get('query', '')
2005
+ if not cached_query:
2006
+ continue
2007
+
2008
+ cached_query_lower = cached_query.lower().strip()
2009
+
2010
+ # Calculate similarity using simple word overlap (Jaccard similarity)
2011
+ similarity = self._calculate_similarity(new_query_lower, cached_query_lower)
2012
+
2013
+ if similarity > threshold:
2014
+ logger.info(f"Similar query detected (similarity: {similarity:.2f}): '{new_query[:50]}...' similar to '{cached_query[:50]}...'")
2015
+ return cached_query_data.get('response')
2016
+
2017
+ return None
2018
+
2019
+ def _calculate_similarity(self, query1: str, query2: str) -> float:
2020
+ """
2021
+ Calculate similarity between two queries using Jaccard similarity on words.
2022
+ Can be enhanced with embeddings for semantic similarity.
2023
+ """
2024
+ if not query1 or not query2:
2025
+ return 0.0
2026
+
2027
+ # Split into words and create sets
2028
+ words1 = set(query1.split())
2029
+ words2 = set(query2.split())
2030
+
2031
+ if not words1 or not words2:
2032
+ return 0.0
2033
+
2034
+ # Calculate Jaccard similarity
2035
+ intersection = len(words1.intersection(words2))
2036
+ union = len(words1.union(words2))
2037
+
2038
+ if union == 0:
2039
+ return 0.0
2040
+
2041
+ jaccard = intersection / union
2042
+
2043
+ # Also check for substring similarity for very similar queries
2044
+ if query1 in query2 or query2 in query1:
2045
+ jaccard = max(jaccard, 0.9)
2046
+
2047
+ return jaccard
2048
+
2049
+ def track_response_metrics(self, start_time: float, response: Dict):
2050
+ """
2051
+ Step 5: Add Response Metrics Tracking
2052
+
2053
+ Track performance metrics for responses.
2054
+
2055
+ Args:
2056
+ start_time: Start time from time.time()
2057
+ response: Response dictionary containing response data
2058
+ """
2059
+ try:
2060
+ latency = time.time() - start_time
2061
+
2062
+ # Extract response text for token counting
2063
+ response_text = (
2064
+ response.get('response') or
2065
+ response.get('final_response') or
2066
+ str(response.get('result', ''))
2067
+ )
2068
+
2069
+ # Approximate token count (4 characters ≈ 1 token)
2070
+ token_count = len(response_text.split()) if response_text else 0
2071
+
2072
+ # Extract safety score
2073
+ safety_score = 0.8 # Default
2074
+ if 'metadata' in response:
2075
+ synthesis_result = response['metadata'].get('synthesis_result', {})
2076
+ safety_result = response['metadata'].get('safety_result', {})
2077
+ if safety_result:
2078
+ safety_analysis = safety_result.get('safety_analysis', {})
2079
+ safety_score = safety_analysis.get('overall_safety_score', 0.8)
2080
+
2081
+ metrics = {
2082
+ 'latency': latency,
2083
+ 'token_count': token_count,
2084
+ 'agent_calls': self.agent_call_count,
2085
+ 'safety_score': safety_score,
2086
+ 'timestamp': datetime.now().isoformat()
2087
+ }
2088
+
2089
+ # Store in history (keep last 100)
2090
+ self.response_metrics_history.append(metrics)
2091
+ if len(self.response_metrics_history) > 100:
2092
+ self.response_metrics_history = self.response_metrics_history[-100:]
2093
+
2094
+ # Log metrics
2095
+ logger.info(f"Response Metrics - Latency: {latency:.3f}s, Tokens: {token_count}, "
2096
+ f"Agent Calls: {self.agent_call_count}, Safety Score: {safety_score:.2f}")
2097
+
2098
+ # Reset agent call count for next request
2099
+ self.agent_call_count = 0
2100
+
2101
+ except Exception as e:
2102
+ logger.error(f"Error tracking response metrics: {e}", exc_info=True)