File size: 19,295 Bytes
66dbebd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55f436b
66dbebd
 
 
 
 
55f436b
 
 
 
 
 
 
 
66dbebd
 
55f436b
66dbebd
 
 
 
 
 
55f436b
 
66dbebd
 
 
 
 
 
 
 
 
 
 
55f436b
66dbebd
55f436b
 
66dbebd
 
 
 
 
 
 
 
 
 
 
 
80a97c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66dbebd
80a97c8
 
 
66dbebd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f89bd21
 
 
 
 
93f44e2
 
 
f89bd21
93f44e2
 
 
 
f89bd21
93f44e2
f89bd21
 
 
 
 
 
66dbebd
 
 
 
 
f89bd21
66dbebd
 
 
 
 
 
 
f89bd21
 
66dbebd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55f436b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66dbebd
80a97c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66dbebd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
"""
Safety & Bias Mitigation Agent
Specialized in content moderation and bias detection with non-blocking warnings
"""

import logging
import re
from typing import Dict, Any, List, Tuple

logger = logging.getLogger(__name__)

class SafetyCheckAgent:
    def __init__(self, llm_router=None):
        self.llm_router = llm_router
        self.agent_id = "SAFETY_BIAS_001"
        self.specialization = "Content moderation and bias detection with warning-based approach"
        
        # Safety thresholds (non-blocking, warning-only)
        self.safety_thresholds = {
            "toxicity": 0.8,        # High threshold for warnings
            "bias": 0.7,           # Moderate threshold for bias detection
            "safety": 0.6,         # Lower threshold for general safety
            "privacy": 0.9         # Very high threshold for privacy concerns
        }
        
        # Warning templates (non-blocking)
        self.warning_templates = {
            "toxicity": "⚠️ Note: Content may contain strong language",
            "bias": "πŸ” Note: Potential biases detected in response",
            "safety": "πŸ“ Note: Response should be verified for accuracy",
            "privacy": "πŸ”’ Note: Privacy-sensitive topics discussed",
            "controversial": "πŸ’­ Note: This topic may have multiple perspectives"
        }
        
        # Pattern-based detection for quick analysis
        self.sensitive_patterns = {
            "toxicity": [
                r'\b(hate|violence|harm|attack|destroy)\b',
                r'\b(kill|hurt|harm|danger)\b',
                r'racial slurs',  # Placeholder for actual sensitive terms
            ],
            "bias": [
                r'\b(all|always|never|every)\b',  # Overgeneralizations
                r'\b(should|must|have to)\b',     # Prescriptive language
                r'stereotypes?',                  # Stereotype indicators
            ],
            "privacy": [
                r'\b(ssn|social security|password|credit card)\b',
                r'\b(address|phone|email|personal)\b',
                r'\b(confidential|secret|private)\b',
            ]
        }
    
    async def execute(self, response, context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
        """
        Execute safety check with non-blocking warnings
        Returns original response with added warnings
        """
        try:
            # Handle both string and dict inputs
            if isinstance(response, dict):
                # Extract the actual response string from the dict
                response_text = response.get('final_response', response.get('response', str(response)))
            else:
                response_text = str(response)
            
            logger.info(f"{self.agent_id} analyzing response of length {len(response_text)}")
            
            # Perform safety analysis
            safety_analysis = await self._analyze_safety(response_text, context)
            
            # Generate warnings without modifying response
            warnings = self._generate_warnings(safety_analysis)
            
            # Add safety metadata to response
            result = {
                "original_response": response_text,
                "safety_checked_response": response_text,  # Response never modified
                "warnings": warnings,
                "safety_analysis": safety_analysis,
                "blocked": False,  # Never blocks content
                "confidence_scores": safety_analysis.get("confidence_scores", {}),
                "agent_id": self.agent_id
            }
            
            logger.info(f"{self.agent_id} completed with {len(warnings)} warnings")
            return result
            
        except Exception as e:
            logger.error(f"{self.agent_id} error: {str(e)}", exc_info=True)
            # Fail-safe: return original response with error note
            response_text = str(response) if not isinstance(response, dict) else response.get('final_response', str(response))
            return self._get_fallback_result(response_text)
    
    async def _analyze_safety(self, response: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze response for safety concerns using multiple methods"""
        
        if self.llm_router:
            return await self._llm_based_safety_analysis(response, context)
        else:
            return await self._pattern_based_safety_analysis(response)
    
    async def _llm_based_safety_analysis(self, response: str, context: Dict[str, Any]) -> Dict[str, Any]:
        """Use LLM for sophisticated safety analysis"""
        
        try:
            safety_prompt = self._build_safety_prompt(response, context)
            
            logger.info(f"{self.agent_id} calling LLM for safety analysis")
            llm_response = await self.llm_router.route_inference(
                task_type="safety_check",
                prompt=safety_prompt,
                max_tokens=800,
                temperature=0.3
            )
            
            if llm_response and isinstance(llm_response, str) and len(llm_response.strip()) > 0:
                # Parse LLM response
                parsed_analysis = self._parse_llm_safety_response(llm_response)
                parsed_analysis["processing_time"] = 0.6
                parsed_analysis["method"] = "llm_enhanced"
                return parsed_analysis
            
        except Exception as e:
            logger.error(f"{self.agent_id} LLM safety analysis failed: {e}")
        
        # Fallback to pattern-based analysis if LLM fails
        logger.info(f"{self.agent_id} falling back to pattern-based safety analysis")
        return await self._pattern_based_safety_analysis(response)
    
    async def _pattern_based_safety_analysis(self, response: str) -> Dict[str, Any]:
        """Pattern-based safety analysis as fallback"""
        
        detected_issues = self._pattern_based_detection(response)
        
        return {
            "toxicity_score": self._calculate_toxicity_score(response),
            "bias_indicators": self._detect_bias_indicators(response),
            "privacy_concerns": self._check_privacy_issues(response),
            "overall_safety_score": 0.75,  # Conservative estimate
            "confidence_scores": {
                "toxicity": 0.6,
                "bias": 0.5,
                "safety": 0.7,
                "privacy": 0.8
            },
            "detected_issues": detected_issues,
            "analysis_method": "pattern_based"
        }
    
    def _build_safety_prompt(self, response: str, context: Dict[str, Any]) -> str:
        """Build prompt for safety analysis with context awareness"""
        
        # Extract relevant context information for safety analysis
        context_info = ""
        if context:
            # Get session context, user context, and interaction contexts (all from cache)
            session_context = context.get('session_context', {})
            session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else ""
            user_context = context.get('user_context', '')
            interaction_contexts = context.get('interaction_contexts', [])
            
            if session_summary:
                context_info = f"\n\nSession Context (for safety context): {session_summary[:200]}..."
            if user_context:
                context_info += f"\n\nUser Context (for safety context): {user_context[:200]}..."
            
            # Optionally include recent interaction context to understand conversation flow
            if interaction_contexts:
                recent_context = interaction_contexts[-1].get('summary', '') if interaction_contexts else ''
                if recent_context:
                    context_info += f"\nRecent conversation: {recent_context[:200]}..."
        
        return f"""
        Analyze the following text for safety concerns and provide warnings if needed:
        
        Text to analyze: "{response[:500]}..."  # Truncate for large responses
        {context_info}
        
        Analyze for:
        1. Toxicity or harmful language
        2. Potential biases or stereotypes
        3. Privacy or security concerns
        4. Controversial or sensitive topics
        
        Consider the conversation context when assessing whether content is appropriate in this specific conversational context.
        
        Provide analysis in JSON format with:
        - safety_scores for each category (0-1 scale)
        - detected_issues list
        - confidence_level
        - recommended_warnings (non-blocking, advisory only)
        
        IMPORTANT: Never block or modify the content, only provide warnings.
        """
    
    def _pattern_based_detection(self, response: str) -> List[Dict[str, Any]]:
        """Detect safety issues using pattern matching"""
        issues = []
        response_lower = response.lower()
        
        # Check each category
        for category, patterns in self.sensitive_patterns.items():
            for pattern in patterns:
                if re.search(pattern, response_lower, re.IGNORECASE):
                    issues.append({
                        "category": category,
                        "pattern": pattern,
                        "severity": "low",  # Always low for warning-only approach
                        "confidence": 0.7
                    })
                    break  # Only report one pattern match per category
        
        return issues
    
    def _calculate_toxicity_score(self, response: str) -> float:
        """Calculate toxicity score (simplified version)"""
        # Simple heuristic-based toxicity detection
        toxic_indicators = [
            'hate', 'violence', 'harm', 'attack', 'destroy', 'kill', 'hurt'
        ]
        
        score = 0.0
        words = response.lower().split()
        for indicator in toxic_indicators:
            if indicator in words:
                score += 0.2
        
        return min(1.0, score)
    
    def _detect_bias_indicators(self, response: str) -> List[str]:
        """Detect potential bias indicators"""
        biases = []
        
        # Overgeneralization detection
        if re.search(r'\b(all|always|never|every)\s+\w+s\b', response, re.IGNORECASE):
            biases.append("overgeneralization")
        
        # Prescriptive language
        if re.search(r'\b(should|must|have to|ought to)\b', response, re.IGNORECASE):
            biases.append("prescriptive_language")
        
        # Stereotype indicators
        stereotype_patterns = [
            r'\b(all|most)\s+\w+\s+people\b',
            r'\b(typical|usual|normal)\s+\w+\b',
        ]
        
        for pattern in stereotype_patterns:
            if re.search(pattern, response, re.IGNORECASE):
                biases.append("potential_stereotype")
                break
        
        return biases
    
    def _check_privacy_issues(self, response: str) -> List[str]:
        """Check for privacy-sensitive content"""
        privacy_issues = []
        
        # Personal information patterns
        personal_info_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN-like pattern
            r'\b\d{16}\b',              # Credit card-like pattern
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
        ]
        
        for pattern in personal_info_patterns:
            if re.search(pattern, response):
                privacy_issues.append("potential_personal_info")
                break
        
        return privacy_issues
    
    def _generate_warnings(self, safety_analysis: Dict[str, Any]) -> List[str]:
        """Generate non-blocking warnings based on safety analysis"""
        warnings = []
        
        try:
            # Safely extract and validate confidence_scores
            confidence_scores = safety_analysis.get("confidence_scores", {})
            if not isinstance(confidence_scores, dict):
                confidence_scores = {}
            
            # Safely extract detected_issues
            detected_issues = safety_analysis.get("detected_issues", [])
            if not isinstance(detected_issues, list):
                detected_issues = []
            
            # Toxicity warnings
            if isinstance(confidence_scores.get("toxicity"), (int, float)) and confidence_scores.get("toxicity", 0) > self.safety_thresholds["toxicity"]:
                warnings.append(self.warning_templates["toxicity"])
            
            # Bias warnings
            has_bias_score = isinstance(confidence_scores.get("bias"), (int, float)) and confidence_scores.get("bias", 0) > self.safety_thresholds["bias"]
            has_bias_indicators = safety_analysis.get("bias_indicators")
            if has_bias_score or (has_bias_indicators and has_bias_indicators):
                warnings.append(self.warning_templates["bias"])
            
            # Privacy warnings
            has_privacy_score = isinstance(confidence_scores.get("privacy"), (int, float)) and confidence_scores.get("privacy", 0) > self.safety_thresholds["privacy"]
            has_privacy_concerns = safety_analysis.get("privacy_concerns")
            if has_privacy_score or (has_privacy_concerns and has_privacy_concerns):
                warnings.append(self.warning_templates["privacy"])
            
            # General safety warning if overall score is low
            overall_score = safety_analysis.get("overall_safety_score", 1.0)
            if isinstance(overall_score, (int, float)) and overall_score < 0.7:
                warnings.append(self.warning_templates["safety"])
            
            # Add context-specific warnings for detected issues
            for issue in detected_issues:
                try:
                    if isinstance(issue, dict):
                        category = issue.get("category")
                        if category and isinstance(category, str) and category in self.warning_templates:
                            category_warning = self.warning_templates[category]
                            if category_warning not in warnings:
                                warnings.append(category_warning)
                except Exception as e:
                    logger.debug(f"Error processing issue: {e}")
                    continue
            
            # Deduplicate warnings and ensure all are strings
            warnings = [w for w in warnings if isinstance(w, str)]
            # Create set and convert back to list (safely handle any edge cases)
            seen = set()
            unique_warnings = []
            for w in warnings:
                if w not in seen:
                    seen.add(w)
                    unique_warnings.append(w)
            return unique_warnings
            
        except Exception as e:
            logger.error(f"Error generating warnings: {e}", exc_info=True)
            # Return empty list on error
            return []
    
    def _parse_llm_safety_response(self, response: str) -> Dict[str, Any]:
        """Parse LLM response for safety analysis"""
        try:
            import json
            import re
            
            # Try to extract JSON from response
            json_match = re.search(r'\{.*\}', response, re.DOTALL)
            if json_match:
                parsed = json.loads(json_match.group())
                return parsed
        except json.JSONDecodeError:
            logger.warning(f"{self.agent_id} Failed to parse LLM safety JSON")
        
        # Fallback parsing - extract safety info from text
        response_lower = response.lower()
        
        # Simple safety analysis based on keywords
        toxicity_score = 0.1
        bias_score = 0.1
        safety_score = 0.9
        
        if any(word in response_lower for word in ['toxic', 'harmful', 'dangerous', 'inappropriate']):
            toxicity_score = 0.8
            safety_score = 0.3
        elif any(word in response_lower for word in ['bias', 'discriminatory', 'unfair', 'prejudiced']):
            bias_score = 0.7
            safety_score = 0.5
        
        return {
            "toxicity_score": toxicity_score,
            "bias_indicators": [],
            "privacy_concerns": [],
            "overall_safety_score": safety_score,
            "confidence_scores": {
                "toxicity": 0.7,
                "bias": 0.6,
                "safety": safety_score,
                "privacy": 0.9
            },
            "detected_issues": [],
            "analysis_method": "llm_parsed",
            "llm_response": response[:200] + "..." if len(response) > 200 else response
        }
    
    def _get_fallback_result(self, response: str) -> Dict[str, Any]:
        """Fallback result when safety check fails"""
        return {
            "original_response": response,
            "safety_checked_response": response,
            "warnings": ["πŸ”§ Note: Safety analysis temporarily unavailable"],
            "safety_analysis": {
                "overall_safety_score": 0.5,
                "confidence_scores": {"safety": 0.5},
                "detected_issues": [],
                "analysis_method": "fallback"
            },
            "blocked": False,
            "agent_id": self.agent_id,
            "error_handled": True
        }
    
    def get_safety_summary(self, analysis_result: Dict[str, Any]) -> str:
        """Generate a user-friendly safety summary"""
        warnings = analysis_result.get("warnings", [])
        safety_score = analysis_result.get("safety_analysis", {}).get("overall_safety_score", 1.0)
        
        if not warnings:
            return "βœ… Content appears safe based on automated analysis"
        
        warning_count = len(warnings)
        if safety_score > 0.8:
            severity = "low"
        elif safety_score > 0.6:
            severity = "medium"
        else:
            severity = "high"
        
        return f"⚠️ {warning_count} advisory note(s) - {severity} severity"
    
    async def batch_analyze(self, responses: List[str]) -> List[Dict[str, Any]]:
        """Analyze multiple responses efficiently"""
        results = []
        for response in responses:
            result = await self.execute(response)
            results.append(result)
        return results

# Factory function for easy instantiation
def create_safety_agent(llm_router=None):
    return SafetyCheckAgent(llm_router)

# Example usage
if __name__ == "__main__":
    # Test the safety agent
    agent = SafetyCheckAgent()
    
    test_responses = [
        "This is a perfectly normal response with no issues.",
        "Some content that might contain controversial topics.",
        "Discussion about sensitive personal information."
    ]
    
    import asyncio
    
    async def test_agent():
        for response in test_responses:
            result = await agent.execute(response)
            print(f"Response: {response[:50]}...")
            print(f"Warnings: {result['warnings']}")
            print(f"Safety Score: {result['safety_analysis']['overall_safety_score']}")
            print("-" * 50)
    
    asyncio.run(test_agent())