Ken Huang
Initial deployment: Security-Aware AI Agent Demo
e856398
"""AIVSS-Aligned Risk Scoring System"""
import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
def load_risk_thresholds() -> Dict[str, Any]:
"""Load risk thresholds configuration"""
thresholds_path = Path(__file__).parent.parent / "data" / "risk_thresholds.json"
with open(thresholds_path, 'r') as f:
return json.load(f)
def analyze_action_with_llm(
action: str,
target_system: str,
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Use LLM to analyze action for nuanced risk assessment
Returns unintended consequences, cascading risks, and reversibility
"""
try:
import anthropic
api_key = os.environ.get('ANTHROPIC_API_KEY')
if not api_key:
return {
"unintended_consequences": [],
"cascading_risks": [],
"reversibility": "unknown",
"confidence": 0.0,
"error": "ANTHROPIC_API_KEY not set"
}
client = anthropic.Anthropic(api_key=api_key)
context_str = json.dumps(context, indent=2) if context else "No additional context"
prompt = f"""You are a security risk analyst. Analyze this proposed action for potential risks:
Action: {action}
Target System: {target_system}
Context: {context_str}
Provide a risk analysis including:
1. Potential unintended consequences
2. Cascading failure risks
3. Reversibility assessment (fully reversible, partially reversible, irreversible)
Respond with JSON only:
{{
"unintended_consequences": ["list of 2-3 potential unintended effects"],
"cascading_risks": ["list of 1-2 potential cascading failures"],
"reversibility": "fully reversible|partially reversible|irreversible",
"confidence": 0.0-1.0
}}"""
response = client.messages.create(
model="claude-3-haiku-20240307",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
response_text = response.content[0].text.strip()
# Extract JSON if wrapped in markdown
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
result = json.loads(response_text)
return result
except Exception as e:
return {
"unintended_consequences": [],
"cascading_risks": [],
"reversibility": "unknown",
"confidence": 0.0,
"error": str(e)
}
def calculate_impact_scores(
action: str,
target_system: str,
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Dict[str, Any]]:
"""
Calculate AIVSS impact scores based on action and context
Returns scores for C, I, A, S, PR, AC
"""
# Default scores
scores = {
"confidentiality_impact": {"score": 0, "rationale": "No data access detected"},
"integrity_impact": {"score": 0, "rationale": "No data modification detected"},
"availability_impact": {"score": 0, "rationale": "No service disruption detected"},
"scope": {"score": 1, "rationale": "Unchanged scope"},
"privilege_required": {"score": 0, "rationale": "No authentication required"},
"attack_complexity": {"score": 0, "rationale": "Low complexity"}
}
action_lower = action.lower()
target_lower = target_system.lower()
# Confidentiality Impact
if any(keyword in action_lower for keyword in ['read', 'access', 'view', 'query', 'list']):
if any(keyword in target_lower for keyword in ['pii', 'personal', 'user', 'customer', 'payment', 'credential']):
scores["confidentiality_impact"] = {"score": 3, "rationale": "Action accesses sensitive data (PII/credentials)"}
elif any(keyword in target_lower for keyword in ['database', 'file', 'record']):
scores["confidentiality_impact"] = {"score": 2, "rationale": "Action accesses internal data"}
else:
scores["confidentiality_impact"] = {"score": 1, "rationale": "Action accesses low-sensitivity data"}
# Integrity Impact
if any(keyword in action_lower for keyword in ['write', 'modify', 'update', 'delete', 'drop', 'alter', 'change']):
if any(keyword in action_lower for keyword in ['delete', 'drop', 'remove']):
scores["integrity_impact"] = {"score": 3, "rationale": "Action permanently modifies/deletes data"}
elif any(keyword in target_lower for keyword in ['database', 'user', 'record', 'config']):
scores["integrity_impact"] = {"score": 2, "rationale": "Action modifies critical data"}
else:
scores["integrity_impact"] = {"score": 1, "rationale": "Action makes minor modifications"}
# Availability Impact
if any(keyword in action_lower for keyword in ['delete', 'drop', 'shutdown', 'terminate', 'kill', 'stop']):
if 'all' in action_lower or 'database' in target_lower or 'service' in target_lower:
scores["availability_impact"] = {"score": 3, "rationale": "Action could cause service outage"}
else:
scores["availability_impact"] = {"score": 2, "rationale": "Action affects availability of resources"}
elif any(keyword in action_lower for keyword in ['restart', 'reload']):
scores["availability_impact"] = {"score": 1, "rationale": "Action causes temporary disruption"}
# Scope
if any(keyword in target_lower for keyword in ['all', 'system', 'global', 'production']):
scores["scope"] = {"score": 2, "rationale": "Action affects multiple systems/components"}
# Check context for scope
if context and context.get('connected_systems'):
scores["scope"] = {"score": 2, "rationale": "Action affects downstream systems"}
# Privilege Required
if any(keyword in action_lower for keyword in ['admin', 'root', 'sudo', 'execute', 'delete']):
scores["privilege_required"] = {"score": 2, "rationale": "Action requires elevated privileges"}
elif any(keyword in action_lower for keyword in ['write', 'modify', 'create']):
scores["privilege_required"] = {"score": 1, "rationale": "Action requires authenticated user"}
# Attack Complexity
if any(keyword in action_lower for keyword in ['sql', 'execute', 'eval', 'script']):
scores["attack_complexity"] = {"score": 2, "rationale": "High technical skill required"}
elif any(keyword in action_lower for keyword in ['modify', 'delete']):
scores["attack_complexity"] = {"score": 1, "rationale": "Moderate technical skill needed"}
return scores
def calculate_risk_score(breakdown: Dict[str, Dict[str, Any]]) -> float:
"""
Calculate overall risk score using AIVSS formula:
Base Score = (C + I + A) × S × (1 + PR/4) × (1 - AC/6)
Normalized Score = min(10, Base Score)
"""
C = breakdown["confidentiality_impact"]["score"]
I = breakdown["integrity_impact"]["score"]
A = breakdown["availability_impact"]["score"]
S = breakdown["scope"]["score"]
PR = breakdown["privilege_required"]["score"]
AC = breakdown["attack_complexity"]["score"]
base_score = (C + I + A) * S * (1 + PR/4) * (1 - AC/6)
normalized_score = min(10.0, base_score)
return round(normalized_score, 1)
def get_severity(score: float) -> str:
"""Map score to severity level"""
if score >= 8.0:
return "CRITICAL"
elif score >= 6.0:
return "HIGH"
elif score >= 3.0:
return "MEDIUM"
else:
return "LOW"
def get_decision(score: float, risk_tolerance: str) -> str:
"""Determine decision based on score and risk tolerance"""
thresholds = load_risk_thresholds()
tolerance = thresholds['risk_tolerance_levels'].get(risk_tolerance, thresholds['risk_tolerance_levels']['medium'])
if score < tolerance['approve_threshold']:
return "APPROVE"
elif score < tolerance['deny_threshold']:
return "REQUIRES_APPROVAL"
else:
return "DENY"
def score_action_risk(
action: str,
target_system: str,
agent_id: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
risk_tolerance: str = "medium"
) -> Dict[str, Any]:
"""
Comprehensive risk scoring aligned with AIVSS methodology
Args:
action: Description of the proposed action
target_system: System/resource being acted upon
agent_id: Agent requesting the action (optional)
context: Additional context (data sensitivity, connected systems, etc.)
risk_tolerance: "low", "medium", or "high" - organizational risk appetite
Returns:
Risk assessment with score, severity, decision, and recommendations
"""
# Calculate impact scores
breakdown = calculate_impact_scores(action, target_system, context)
# Calculate overall risk score
overall_score = calculate_risk_score(breakdown)
# Get severity and decision
severity = get_severity(overall_score)
decision = get_decision(overall_score, risk_tolerance)
# Get LLM analysis for nuanced assessment
llm_analysis = analyze_action_with_llm(action, target_system, context)
# Generate recommendations based on score and decision
recommendations = []
required_controls = []
if decision == "DENY":
recommendations.append("Action poses unacceptable risk and should not proceed")
recommendations.append("Consider alternative approaches with lower risk")
elif decision == "REQUIRES_APPROVAL":
recommendations.append("Proceed with human approval and enhanced logging")
recommendations.append("Document justification and rollback plan")
required_controls = [
"Human-in-the-loop approval",
"Transaction logging enabled",
"Rollback plan documented"
]
else:
recommendations.append("Action approved with standard monitoring")
required_controls = ["Standard audit logging"]
# Add controls based on severity
if severity in ["HIGH", "CRITICAL"]:
required_controls.append("Real-time monitoring required")
if llm_analysis.get('reversibility') == 'irreversible':
required_controls.append("Backup verification before execution")
from .audit import generate_audit_id
audit_id = generate_audit_id("risk")
return {
"overall_score": overall_score,
"severity": severity,
"decision": decision,
"breakdown": breakdown,
"llm_analysis": llm_analysis,
"recommendation": recommendations[0] if recommendations else "",
"required_controls": required_controls,
"audit_id": audit_id,
"risk_tolerance": risk_tolerance
}