File size: 19,295 Bytes
66dbebd 55f436b 66dbebd 55f436b 66dbebd 55f436b 66dbebd 55f436b 66dbebd 55f436b 66dbebd 55f436b 66dbebd 80a97c8 66dbebd 80a97c8 66dbebd f89bd21 93f44e2 f89bd21 93f44e2 f89bd21 93f44e2 f89bd21 66dbebd f89bd21 66dbebd f89bd21 66dbebd 55f436b 66dbebd 80a97c8 66dbebd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
"""
Safety & Bias Mitigation Agent
Specialized in content moderation and bias detection with non-blocking warnings
"""
import logging
import re
from typing import Dict, Any, List, Tuple
logger = logging.getLogger(__name__)
class SafetyCheckAgent:
def __init__(self, llm_router=None):
self.llm_router = llm_router
self.agent_id = "SAFETY_BIAS_001"
self.specialization = "Content moderation and bias detection with warning-based approach"
# Safety thresholds (non-blocking, warning-only)
self.safety_thresholds = {
"toxicity": 0.8, # High threshold for warnings
"bias": 0.7, # Moderate threshold for bias detection
"safety": 0.6, # Lower threshold for general safety
"privacy": 0.9 # Very high threshold for privacy concerns
}
# Warning templates (non-blocking)
self.warning_templates = {
"toxicity": "β οΈ Note: Content may contain strong language",
"bias": "π Note: Potential biases detected in response",
"safety": "π Note: Response should be verified for accuracy",
"privacy": "π Note: Privacy-sensitive topics discussed",
"controversial": "π Note: This topic may have multiple perspectives"
}
# Pattern-based detection for quick analysis
self.sensitive_patterns = {
"toxicity": [
r'\b(hate|violence|harm|attack|destroy)\b',
r'\b(kill|hurt|harm|danger)\b',
r'racial slurs', # Placeholder for actual sensitive terms
],
"bias": [
r'\b(all|always|never|every)\b', # Overgeneralizations
r'\b(should|must|have to)\b', # Prescriptive language
r'stereotypes?', # Stereotype indicators
],
"privacy": [
r'\b(ssn|social security|password|credit card)\b',
r'\b(address|phone|email|personal)\b',
r'\b(confidential|secret|private)\b',
]
}
async def execute(self, response, context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
"""
Execute safety check with non-blocking warnings
Returns original response with added warnings
"""
try:
# Handle both string and dict inputs
if isinstance(response, dict):
# Extract the actual response string from the dict
response_text = response.get('final_response', response.get('response', str(response)))
else:
response_text = str(response)
logger.info(f"{self.agent_id} analyzing response of length {len(response_text)}")
# Perform safety analysis
safety_analysis = await self._analyze_safety(response_text, context)
# Generate warnings without modifying response
warnings = self._generate_warnings(safety_analysis)
# Add safety metadata to response
result = {
"original_response": response_text,
"safety_checked_response": response_text, # Response never modified
"warnings": warnings,
"safety_analysis": safety_analysis,
"blocked": False, # Never blocks content
"confidence_scores": safety_analysis.get("confidence_scores", {}),
"agent_id": self.agent_id
}
logger.info(f"{self.agent_id} completed with {len(warnings)} warnings")
return result
except Exception as e:
logger.error(f"{self.agent_id} error: {str(e)}", exc_info=True)
# Fail-safe: return original response with error note
response_text = str(response) if not isinstance(response, dict) else response.get('final_response', str(response))
return self._get_fallback_result(response_text)
async def _analyze_safety(self, response: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze response for safety concerns using multiple methods"""
if self.llm_router:
return await self._llm_based_safety_analysis(response, context)
else:
return await self._pattern_based_safety_analysis(response)
async def _llm_based_safety_analysis(self, response: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM for sophisticated safety analysis"""
try:
safety_prompt = self._build_safety_prompt(response, context)
logger.info(f"{self.agent_id} calling LLM for safety analysis")
llm_response = await self.llm_router.route_inference(
task_type="safety_check",
prompt=safety_prompt,
max_tokens=800,
temperature=0.3
)
if llm_response and isinstance(llm_response, str) and len(llm_response.strip()) > 0:
# Parse LLM response
parsed_analysis = self._parse_llm_safety_response(llm_response)
parsed_analysis["processing_time"] = 0.6
parsed_analysis["method"] = "llm_enhanced"
return parsed_analysis
except Exception as e:
logger.error(f"{self.agent_id} LLM safety analysis failed: {e}")
# Fallback to pattern-based analysis if LLM fails
logger.info(f"{self.agent_id} falling back to pattern-based safety analysis")
return await self._pattern_based_safety_analysis(response)
async def _pattern_based_safety_analysis(self, response: str) -> Dict[str, Any]:
"""Pattern-based safety analysis as fallback"""
detected_issues = self._pattern_based_detection(response)
return {
"toxicity_score": self._calculate_toxicity_score(response),
"bias_indicators": self._detect_bias_indicators(response),
"privacy_concerns": self._check_privacy_issues(response),
"overall_safety_score": 0.75, # Conservative estimate
"confidence_scores": {
"toxicity": 0.6,
"bias": 0.5,
"safety": 0.7,
"privacy": 0.8
},
"detected_issues": detected_issues,
"analysis_method": "pattern_based"
}
def _build_safety_prompt(self, response: str, context: Dict[str, Any]) -> str:
"""Build prompt for safety analysis with context awareness"""
# Extract relevant context information for safety analysis
context_info = ""
if context:
# Get session context, user context, and interaction contexts (all from cache)
session_context = context.get('session_context', {})
session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else ""
user_context = context.get('user_context', '')
interaction_contexts = context.get('interaction_contexts', [])
if session_summary:
context_info = f"\n\nSession Context (for safety context): {session_summary[:200]}..."
if user_context:
context_info += f"\n\nUser Context (for safety context): {user_context[:200]}..."
# Optionally include recent interaction context to understand conversation flow
if interaction_contexts:
recent_context = interaction_contexts[-1].get('summary', '') if interaction_contexts else ''
if recent_context:
context_info += f"\nRecent conversation: {recent_context[:200]}..."
return f"""
Analyze the following text for safety concerns and provide warnings if needed:
Text to analyze: "{response[:500]}..." # Truncate for large responses
{context_info}
Analyze for:
1. Toxicity or harmful language
2. Potential biases or stereotypes
3. Privacy or security concerns
4. Controversial or sensitive topics
Consider the conversation context when assessing whether content is appropriate in this specific conversational context.
Provide analysis in JSON format with:
- safety_scores for each category (0-1 scale)
- detected_issues list
- confidence_level
- recommended_warnings (non-blocking, advisory only)
IMPORTANT: Never block or modify the content, only provide warnings.
"""
def _pattern_based_detection(self, response: str) -> List[Dict[str, Any]]:
"""Detect safety issues using pattern matching"""
issues = []
response_lower = response.lower()
# Check each category
for category, patterns in self.sensitive_patterns.items():
for pattern in patterns:
if re.search(pattern, response_lower, re.IGNORECASE):
issues.append({
"category": category,
"pattern": pattern,
"severity": "low", # Always low for warning-only approach
"confidence": 0.7
})
break # Only report one pattern match per category
return issues
def _calculate_toxicity_score(self, response: str) -> float:
"""Calculate toxicity score (simplified version)"""
# Simple heuristic-based toxicity detection
toxic_indicators = [
'hate', 'violence', 'harm', 'attack', 'destroy', 'kill', 'hurt'
]
score = 0.0
words = response.lower().split()
for indicator in toxic_indicators:
if indicator in words:
score += 0.2
return min(1.0, score)
def _detect_bias_indicators(self, response: str) -> List[str]:
"""Detect potential bias indicators"""
biases = []
# Overgeneralization detection
if re.search(r'\b(all|always|never|every)\s+\w+s\b', response, re.IGNORECASE):
biases.append("overgeneralization")
# Prescriptive language
if re.search(r'\b(should|must|have to|ought to)\b', response, re.IGNORECASE):
biases.append("prescriptive_language")
# Stereotype indicators
stereotype_patterns = [
r'\b(all|most)\s+\w+\s+people\b',
r'\b(typical|usual|normal)\s+\w+\b',
]
for pattern in stereotype_patterns:
if re.search(pattern, response, re.IGNORECASE):
biases.append("potential_stereotype")
break
return biases
def _check_privacy_issues(self, response: str) -> List[str]:
"""Check for privacy-sensitive content"""
privacy_issues = []
# Personal information patterns
personal_info_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN-like pattern
r'\b\d{16}\b', # Credit card-like pattern
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
]
for pattern in personal_info_patterns:
if re.search(pattern, response):
privacy_issues.append("potential_personal_info")
break
return privacy_issues
def _generate_warnings(self, safety_analysis: Dict[str, Any]) -> List[str]:
"""Generate non-blocking warnings based on safety analysis"""
warnings = []
try:
# Safely extract and validate confidence_scores
confidence_scores = safety_analysis.get("confidence_scores", {})
if not isinstance(confidence_scores, dict):
confidence_scores = {}
# Safely extract detected_issues
detected_issues = safety_analysis.get("detected_issues", [])
if not isinstance(detected_issues, list):
detected_issues = []
# Toxicity warnings
if isinstance(confidence_scores.get("toxicity"), (int, float)) and confidence_scores.get("toxicity", 0) > self.safety_thresholds["toxicity"]:
warnings.append(self.warning_templates["toxicity"])
# Bias warnings
has_bias_score = isinstance(confidence_scores.get("bias"), (int, float)) and confidence_scores.get("bias", 0) > self.safety_thresholds["bias"]
has_bias_indicators = safety_analysis.get("bias_indicators")
if has_bias_score or (has_bias_indicators and has_bias_indicators):
warnings.append(self.warning_templates["bias"])
# Privacy warnings
has_privacy_score = isinstance(confidence_scores.get("privacy"), (int, float)) and confidence_scores.get("privacy", 0) > self.safety_thresholds["privacy"]
has_privacy_concerns = safety_analysis.get("privacy_concerns")
if has_privacy_score or (has_privacy_concerns and has_privacy_concerns):
warnings.append(self.warning_templates["privacy"])
# General safety warning if overall score is low
overall_score = safety_analysis.get("overall_safety_score", 1.0)
if isinstance(overall_score, (int, float)) and overall_score < 0.7:
warnings.append(self.warning_templates["safety"])
# Add context-specific warnings for detected issues
for issue in detected_issues:
try:
if isinstance(issue, dict):
category = issue.get("category")
if category and isinstance(category, str) and category in self.warning_templates:
category_warning = self.warning_templates[category]
if category_warning not in warnings:
warnings.append(category_warning)
except Exception as e:
logger.debug(f"Error processing issue: {e}")
continue
# Deduplicate warnings and ensure all are strings
warnings = [w for w in warnings if isinstance(w, str)]
# Create set and convert back to list (safely handle any edge cases)
seen = set()
unique_warnings = []
for w in warnings:
if w not in seen:
seen.add(w)
unique_warnings.append(w)
return unique_warnings
except Exception as e:
logger.error(f"Error generating warnings: {e}", exc_info=True)
# Return empty list on error
return []
def _parse_llm_safety_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response for safety analysis"""
try:
import json
import re
# Try to extract JSON from response
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
parsed = json.loads(json_match.group())
return parsed
except json.JSONDecodeError:
logger.warning(f"{self.agent_id} Failed to parse LLM safety JSON")
# Fallback parsing - extract safety info from text
response_lower = response.lower()
# Simple safety analysis based on keywords
toxicity_score = 0.1
bias_score = 0.1
safety_score = 0.9
if any(word in response_lower for word in ['toxic', 'harmful', 'dangerous', 'inappropriate']):
toxicity_score = 0.8
safety_score = 0.3
elif any(word in response_lower for word in ['bias', 'discriminatory', 'unfair', 'prejudiced']):
bias_score = 0.7
safety_score = 0.5
return {
"toxicity_score": toxicity_score,
"bias_indicators": [],
"privacy_concerns": [],
"overall_safety_score": safety_score,
"confidence_scores": {
"toxicity": 0.7,
"bias": 0.6,
"safety": safety_score,
"privacy": 0.9
},
"detected_issues": [],
"analysis_method": "llm_parsed",
"llm_response": response[:200] + "..." if len(response) > 200 else response
}
def _get_fallback_result(self, response: str) -> Dict[str, Any]:
"""Fallback result when safety check fails"""
return {
"original_response": response,
"safety_checked_response": response,
"warnings": ["π§ Note: Safety analysis temporarily unavailable"],
"safety_analysis": {
"overall_safety_score": 0.5,
"confidence_scores": {"safety": 0.5},
"detected_issues": [],
"analysis_method": "fallback"
},
"blocked": False,
"agent_id": self.agent_id,
"error_handled": True
}
def get_safety_summary(self, analysis_result: Dict[str, Any]) -> str:
"""Generate a user-friendly safety summary"""
warnings = analysis_result.get("warnings", [])
safety_score = analysis_result.get("safety_analysis", {}).get("overall_safety_score", 1.0)
if not warnings:
return "β
Content appears safe based on automated analysis"
warning_count = len(warnings)
if safety_score > 0.8:
severity = "low"
elif safety_score > 0.6:
severity = "medium"
else:
severity = "high"
return f"β οΈ {warning_count} advisory note(s) - {severity} severity"
async def batch_analyze(self, responses: List[str]) -> List[Dict[str, Any]]:
"""Analyze multiple responses efficiently"""
results = []
for response in responses:
result = await self.execute(response)
results.append(result)
return results
# Factory function for easy instantiation
def create_safety_agent(llm_router=None):
return SafetyCheckAgent(llm_router)
# Example usage
if __name__ == "__main__":
# Test the safety agent
agent = SafetyCheckAgent()
test_responses = [
"This is a perfectly normal response with no issues.",
"Some content that might contain controversial topics.",
"Discussion about sensitive personal information."
]
import asyncio
async def test_agent():
for response in test_responses:
result = await agent.execute(response)
print(f"Response: {response[:50]}...")
print(f"Warnings: {result['warnings']}")
print(f"Safety Score: {result['safety_analysis']['overall_safety_score']}")
print("-" * 50)
asyncio.run(test_agent())
|