|
|
""" |
|
|
Safety & Bias Mitigation Agent |
|
|
Specialized in content moderation and bias detection with non-blocking warnings |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import re |
|
|
from typing import Dict, Any, List, Tuple |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class SafetyCheckAgent: |
|
|
def __init__(self, llm_router=None): |
|
|
self.llm_router = llm_router |
|
|
self.agent_id = "SAFETY_BIAS_001" |
|
|
self.specialization = "Content moderation and bias detection with warning-based approach" |
|
|
|
|
|
|
|
|
self.safety_thresholds = { |
|
|
"toxicity": 0.8, |
|
|
"bias": 0.7, |
|
|
"safety": 0.6, |
|
|
"privacy": 0.9 |
|
|
} |
|
|
|
|
|
|
|
|
self.warning_templates = { |
|
|
"toxicity": "β οΈ Note: Content may contain strong language", |
|
|
"bias": "π Note: Potential biases detected in response", |
|
|
"safety": "π Note: Response should be verified for accuracy", |
|
|
"privacy": "π Note: Privacy-sensitive topics discussed", |
|
|
"controversial": "π Note: This topic may have multiple perspectives" |
|
|
} |
|
|
|
|
|
|
|
|
self.sensitive_patterns = { |
|
|
"toxicity": [ |
|
|
r'\b(hate|violence|harm|attack|destroy)\b', |
|
|
r'\b(kill|hurt|harm|danger)\b', |
|
|
r'racial slurs', |
|
|
], |
|
|
"bias": [ |
|
|
r'\b(all|always|never|every)\b', |
|
|
r'\b(should|must|have to)\b', |
|
|
r'stereotypes?', |
|
|
], |
|
|
"privacy": [ |
|
|
r'\b(ssn|social security|password|credit card)\b', |
|
|
r'\b(address|phone|email|personal)\b', |
|
|
r'\b(confidential|secret|private)\b', |
|
|
] |
|
|
} |
|
|
|
|
|
async def execute(self, response: str, context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute safety check with non-blocking warnings |
|
|
Returns original response with added warnings |
|
|
""" |
|
|
try: |
|
|
logger.info(f"{self.agent_id} analyzing response of length {len(response)}") |
|
|
|
|
|
|
|
|
safety_analysis = await self._analyze_safety(response, context) |
|
|
|
|
|
|
|
|
warnings = self._generate_warnings(safety_analysis) |
|
|
|
|
|
|
|
|
result = { |
|
|
"original_response": response, |
|
|
"safety_checked_response": response, |
|
|
"warnings": warnings, |
|
|
"safety_analysis": safety_analysis, |
|
|
"blocked": False, |
|
|
"confidence_scores": safety_analysis.get("confidence_scores", {}), |
|
|
"agent_id": self.agent_id |
|
|
} |
|
|
|
|
|
logger.info(f"{self.agent_id} completed with {len(warnings)} warnings") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"{self.agent_id} error: {str(e)}") |
|
|
|
|
|
return self._get_fallback_result(response) |
|
|
|
|
|
async def _analyze_safety(self, response: str, context: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Analyze response for safety concerns using multiple methods""" |
|
|
|
|
|
if self.llm_router: |
|
|
return await self._llm_based_safety_analysis(response, context) |
|
|
else: |
|
|
return await self._pattern_based_safety_analysis(response) |
|
|
|
|
|
async def _llm_based_safety_analysis(self, response: str, context: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Use LLM for sophisticated safety analysis""" |
|
|
|
|
|
safety_prompt = self._build_safety_prompt(response, context) |
|
|
|
|
|
|
|
|
simulated_analysis = { |
|
|
"toxicity_score": self._calculate_toxicity_score(response), |
|
|
"bias_indicators": self._detect_bias_indicators(response), |
|
|
"privacy_concerns": self._check_privacy_issues(response), |
|
|
"overall_safety_score": 0.85, |
|
|
"confidence_scores": { |
|
|
"toxicity": 0.7, |
|
|
"bias": 0.6, |
|
|
"safety": 0.8, |
|
|
"privacy": 0.9 |
|
|
}, |
|
|
"detected_issues": self._pattern_based_detection(response), |
|
|
"analysis_method": "llm_enhanced" |
|
|
} |
|
|
|
|
|
return simulated_analysis |
|
|
|
|
|
async def _pattern_based_safety_analysis(self, response: str) -> Dict[str, Any]: |
|
|
"""Pattern-based safety analysis as fallback""" |
|
|
|
|
|
detected_issues = self._pattern_based_detection(response) |
|
|
|
|
|
return { |
|
|
"toxicity_score": self._calculate_toxicity_score(response), |
|
|
"bias_indicators": self._detect_bias_indicators(response), |
|
|
"privacy_concerns": self._check_privacy_issues(response), |
|
|
"overall_safety_score": 0.75, |
|
|
"confidence_scores": { |
|
|
"toxicity": 0.6, |
|
|
"bias": 0.5, |
|
|
"safety": 0.7, |
|
|
"privacy": 0.8 |
|
|
}, |
|
|
"detected_issues": detected_issues, |
|
|
"analysis_method": "pattern_based" |
|
|
} |
|
|
|
|
|
def _build_safety_prompt(self, response: str, context: Dict[str, Any]) -> str: |
|
|
"""Build prompt for safety analysis""" |
|
|
|
|
|
return f""" |
|
|
Analyze the following text for safety concerns and provide warnings if needed: |
|
|
|
|
|
Text to analyze: "{response[:500]}..." # Truncate for large responses |
|
|
|
|
|
Analyze for: |
|
|
1. Toxicity or harmful language |
|
|
2. Potential biases or stereotypes |
|
|
3. Privacy or security concerns |
|
|
4. Controversial or sensitive topics |
|
|
|
|
|
Provide analysis in JSON format with: |
|
|
- safety_scores for each category (0-1 scale) |
|
|
- detected_issues list |
|
|
- confidence_level |
|
|
- recommended_warnings (non-blocking, advisory only) |
|
|
|
|
|
IMPORTANT: Never block or modify the content, only provide warnings. |
|
|
""" |
|
|
|
|
|
def _pattern_based_detection(self, response: str) -> List[Dict[str, Any]]: |
|
|
"""Detect safety issues using pattern matching""" |
|
|
issues = [] |
|
|
response_lower = response.lower() |
|
|
|
|
|
|
|
|
for category, patterns in self.sensitive_patterns.items(): |
|
|
for pattern in patterns: |
|
|
if re.search(pattern, response_lower, re.IGNORECASE): |
|
|
issues.append({ |
|
|
"category": category, |
|
|
"pattern": pattern, |
|
|
"severity": "low", |
|
|
"confidence": 0.7 |
|
|
}) |
|
|
break |
|
|
|
|
|
return issues |
|
|
|
|
|
def _calculate_toxicity_score(self, response: str) -> float: |
|
|
"""Calculate toxicity score (simplified version)""" |
|
|
|
|
|
toxic_indicators = [ |
|
|
'hate', 'violence', 'harm', 'attack', 'destroy', 'kill', 'hurt' |
|
|
] |
|
|
|
|
|
score = 0.0 |
|
|
words = response.lower().split() |
|
|
for indicator in toxic_indicators: |
|
|
if indicator in words: |
|
|
score += 0.2 |
|
|
|
|
|
return min(1.0, score) |
|
|
|
|
|
def _detect_bias_indicators(self, response: str) -> List[str]: |
|
|
"""Detect potential bias indicators""" |
|
|
biases = [] |
|
|
|
|
|
|
|
|
if re.search(r'\b(all|always|never|every)\s+\w+s\b', response, re.IGNORECASE): |
|
|
biases.append("overgeneralization") |
|
|
|
|
|
|
|
|
if re.search(r'\b(should|must|have to|ought to)\b', response, re.IGNORECASE): |
|
|
biases.append("prescriptive_language") |
|
|
|
|
|
|
|
|
stereotype_patterns = [ |
|
|
r'\b(all|most)\s+\w+\s+people\b', |
|
|
r'\b(typical|usual|normal)\s+\w+\b', |
|
|
] |
|
|
|
|
|
for pattern in stereotype_patterns: |
|
|
if re.search(pattern, response, re.IGNORECASE): |
|
|
biases.append("potential_stereotype") |
|
|
break |
|
|
|
|
|
return biases |
|
|
|
|
|
def _check_privacy_issues(self, response: str) -> List[str]: |
|
|
"""Check for privacy-sensitive content""" |
|
|
privacy_issues = [] |
|
|
|
|
|
|
|
|
personal_info_patterns = [ |
|
|
r'\b\d{3}-\d{2}-\d{4}\b', |
|
|
r'\b\d{16}\b', |
|
|
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', |
|
|
] |
|
|
|
|
|
for pattern in personal_info_patterns: |
|
|
if re.search(pattern, response): |
|
|
privacy_issues.append("potential_personal_info") |
|
|
break |
|
|
|
|
|
return privacy_issues |
|
|
|
|
|
def _generate_warnings(self, safety_analysis: Dict[str, Any]) -> List[str]: |
|
|
"""Generate non-blocking warnings based on safety analysis""" |
|
|
warnings = [] |
|
|
|
|
|
|
|
|
confidence_scores = safety_analysis.get("confidence_scores", {}) |
|
|
detected_issues = safety_analysis.get("detected_issues", []) |
|
|
|
|
|
|
|
|
if confidence_scores.get("toxicity", 0) > self.safety_thresholds["toxicity"]: |
|
|
warnings.append(self.warning_templates["toxicity"]) |
|
|
|
|
|
|
|
|
if (confidence_scores.get("bias", 0) > self.safety_thresholds["bias"] or |
|
|
safety_analysis.get("bias_indicators")): |
|
|
warnings.append(self.warning_templates["bias"]) |
|
|
|
|
|
|
|
|
if (confidence_scores.get("privacy", 0) > self.safety_thresholds["privacy"] or |
|
|
safety_analysis.get("privacy_concerns")): |
|
|
warnings.append(self.warning_templates["privacy"]) |
|
|
|
|
|
|
|
|
if safety_analysis.get("overall_safety_score", 1.0) < 0.7: |
|
|
warnings.append(self.warning_templates["safety"]) |
|
|
|
|
|
|
|
|
for issue in detected_issues: |
|
|
category = issue.get("category") |
|
|
if category in self.warning_templates and category not in [w.split(":")[1].strip() for w in warnings]: |
|
|
warnings.append(self.warning_templates[category]) |
|
|
|
|
|
|
|
|
return list(set(warnings)) |
|
|
|
|
|
def _get_fallback_result(self, response: str) -> Dict[str, Any]: |
|
|
"""Fallback result when safety check fails""" |
|
|
return { |
|
|
"original_response": response, |
|
|
"safety_checked_response": response, |
|
|
"warnings": ["π§ Note: Safety analysis temporarily unavailable"], |
|
|
"safety_analysis": { |
|
|
"overall_safety_score": 0.5, |
|
|
"confidence_scores": {"safety": 0.5}, |
|
|
"detected_issues": [], |
|
|
"analysis_method": "fallback" |
|
|
}, |
|
|
"blocked": False, |
|
|
"agent_id": self.agent_id, |
|
|
"error_handled": True |
|
|
} |
|
|
|
|
|
def get_safety_summary(self, analysis_result: Dict[str, Any]) -> str: |
|
|
"""Generate a user-friendly safety summary""" |
|
|
warnings = analysis_result.get("warnings", []) |
|
|
safety_score = analysis_result.get("safety_analysis", {}).get("overall_safety_score", 1.0) |
|
|
|
|
|
if not warnings: |
|
|
return "β
Content appears safe based on automated analysis" |
|
|
|
|
|
warning_count = len(warnings) |
|
|
if safety_score > 0.8: |
|
|
severity = "low" |
|
|
elif safety_score > 0.6: |
|
|
severity = "medium" |
|
|
else: |
|
|
severity = "high" |
|
|
|
|
|
return f"β οΈ {warning_count} advisory note(s) - {severity} severity" |
|
|
|
|
|
async def batch_analyze(self, responses: List[str]) -> List[Dict[str, Any]]: |
|
|
"""Analyze multiple responses efficiently""" |
|
|
results = [] |
|
|
for response in responses: |
|
|
result = await self.execute(response) |
|
|
results.append(result) |
|
|
return results |
|
|
|
|
|
|
|
|
def create_safety_agent(llm_router=None): |
|
|
return SafetyCheckAgent(llm_router) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
agent = SafetyCheckAgent() |
|
|
|
|
|
test_responses = [ |
|
|
"This is a perfectly normal response with no issues.", |
|
|
"Some content that might contain controversial topics.", |
|
|
"Discussion about sensitive personal information." |
|
|
] |
|
|
|
|
|
import asyncio |
|
|
|
|
|
async def test_agent(): |
|
|
for response in test_responses: |
|
|
result = await agent.execute(response) |
|
|
print(f"Response: {response[:50]}...") |
|
|
print(f"Warnings: {result['warnings']}") |
|
|
print(f"Safety Score: {result['safety_analysis']['overall_safety_score']}") |
|
|
print("-" * 50) |
|
|
|
|
|
asyncio.run(test_agent()) |
|
|
|
|
|
|