|
|
"""AIVSS-Aligned Risk Scoring System""" |
|
|
|
|
|
import json |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Dict, Any, Optional |
|
|
|
|
|
def load_risk_thresholds() -> Dict[str, Any]: |
|
|
"""Load risk thresholds configuration""" |
|
|
thresholds_path = Path(__file__).parent.parent / "data" / "risk_thresholds.json" |
|
|
with open(thresholds_path, 'r') as f: |
|
|
return json.load(f) |
|
|
|
|
|
def analyze_action_with_llm( |
|
|
action: str, |
|
|
target_system: str, |
|
|
context: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Use LLM to analyze action for nuanced risk assessment |
|
|
Returns unintended consequences, cascading risks, and reversibility |
|
|
""" |
|
|
try: |
|
|
import anthropic |
|
|
api_key = os.environ.get('ANTHROPIC_API_KEY') |
|
|
if not api_key: |
|
|
return { |
|
|
"unintended_consequences": [], |
|
|
"cascading_risks": [], |
|
|
"reversibility": "unknown", |
|
|
"confidence": 0.0, |
|
|
"error": "ANTHROPIC_API_KEY not set" |
|
|
} |
|
|
|
|
|
client = anthropic.Anthropic(api_key=api_key) |
|
|
|
|
|
context_str = json.dumps(context, indent=2) if context else "No additional context" |
|
|
|
|
|
prompt = f"""You are a security risk analyst. Analyze this proposed action for potential risks: |
|
|
|
|
|
Action: {action} |
|
|
Target System: {target_system} |
|
|
Context: {context_str} |
|
|
|
|
|
Provide a risk analysis including: |
|
|
1. Potential unintended consequences |
|
|
2. Cascading failure risks |
|
|
3. Reversibility assessment (fully reversible, partially reversible, irreversible) |
|
|
|
|
|
Respond with JSON only: |
|
|
{{ |
|
|
"unintended_consequences": ["list of 2-3 potential unintended effects"], |
|
|
"cascading_risks": ["list of 1-2 potential cascading failures"], |
|
|
"reversibility": "fully reversible|partially reversible|irreversible", |
|
|
"confidence": 0.0-1.0 |
|
|
}}""" |
|
|
|
|
|
response = client.messages.create( |
|
|
model="claude-3-haiku-20240307", |
|
|
max_tokens=500, |
|
|
messages=[{"role": "user", "content": prompt}] |
|
|
) |
|
|
|
|
|
response_text = response.content[0].text.strip() |
|
|
|
|
|
|
|
|
if "```json" in response_text: |
|
|
response_text = response_text.split("```json")[1].split("```")[0].strip() |
|
|
elif "```" in response_text: |
|
|
response_text = response_text.split("```")[1].split("```")[0].strip() |
|
|
|
|
|
result = json.loads(response_text) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"unintended_consequences": [], |
|
|
"cascading_risks": [], |
|
|
"reversibility": "unknown", |
|
|
"confidence": 0.0, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
def calculate_impact_scores( |
|
|
action: str, |
|
|
target_system: str, |
|
|
context: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Dict[str, Any]]: |
|
|
""" |
|
|
Calculate AIVSS impact scores based on action and context |
|
|
Returns scores for C, I, A, S, PR, AC |
|
|
""" |
|
|
|
|
|
scores = { |
|
|
"confidentiality_impact": {"score": 0, "rationale": "No data access detected"}, |
|
|
"integrity_impact": {"score": 0, "rationale": "No data modification detected"}, |
|
|
"availability_impact": {"score": 0, "rationale": "No service disruption detected"}, |
|
|
"scope": {"score": 1, "rationale": "Unchanged scope"}, |
|
|
"privilege_required": {"score": 0, "rationale": "No authentication required"}, |
|
|
"attack_complexity": {"score": 0, "rationale": "Low complexity"} |
|
|
} |
|
|
|
|
|
action_lower = action.lower() |
|
|
target_lower = target_system.lower() |
|
|
|
|
|
|
|
|
if any(keyword in action_lower for keyword in ['read', 'access', 'view', 'query', 'list']): |
|
|
if any(keyword in target_lower for keyword in ['pii', 'personal', 'user', 'customer', 'payment', 'credential']): |
|
|
scores["confidentiality_impact"] = {"score": 3, "rationale": "Action accesses sensitive data (PII/credentials)"} |
|
|
elif any(keyword in target_lower for keyword in ['database', 'file', 'record']): |
|
|
scores["confidentiality_impact"] = {"score": 2, "rationale": "Action accesses internal data"} |
|
|
else: |
|
|
scores["confidentiality_impact"] = {"score": 1, "rationale": "Action accesses low-sensitivity data"} |
|
|
|
|
|
|
|
|
if any(keyword in action_lower for keyword in ['write', 'modify', 'update', 'delete', 'drop', 'alter', 'change']): |
|
|
if any(keyword in action_lower for keyword in ['delete', 'drop', 'remove']): |
|
|
scores["integrity_impact"] = {"score": 3, "rationale": "Action permanently modifies/deletes data"} |
|
|
elif any(keyword in target_lower for keyword in ['database', 'user', 'record', 'config']): |
|
|
scores["integrity_impact"] = {"score": 2, "rationale": "Action modifies critical data"} |
|
|
else: |
|
|
scores["integrity_impact"] = {"score": 1, "rationale": "Action makes minor modifications"} |
|
|
|
|
|
|
|
|
if any(keyword in action_lower for keyword in ['delete', 'drop', 'shutdown', 'terminate', 'kill', 'stop']): |
|
|
if 'all' in action_lower or 'database' in target_lower or 'service' in target_lower: |
|
|
scores["availability_impact"] = {"score": 3, "rationale": "Action could cause service outage"} |
|
|
else: |
|
|
scores["availability_impact"] = {"score": 2, "rationale": "Action affects availability of resources"} |
|
|
elif any(keyword in action_lower for keyword in ['restart', 'reload']): |
|
|
scores["availability_impact"] = {"score": 1, "rationale": "Action causes temporary disruption"} |
|
|
|
|
|
|
|
|
if any(keyword in target_lower for keyword in ['all', 'system', 'global', 'production']): |
|
|
scores["scope"] = {"score": 2, "rationale": "Action affects multiple systems/components"} |
|
|
|
|
|
|
|
|
if context and context.get('connected_systems'): |
|
|
scores["scope"] = {"score": 2, "rationale": "Action affects downstream systems"} |
|
|
|
|
|
|
|
|
if any(keyword in action_lower for keyword in ['admin', 'root', 'sudo', 'execute', 'delete']): |
|
|
scores["privilege_required"] = {"score": 2, "rationale": "Action requires elevated privileges"} |
|
|
elif any(keyword in action_lower for keyword in ['write', 'modify', 'create']): |
|
|
scores["privilege_required"] = {"score": 1, "rationale": "Action requires authenticated user"} |
|
|
|
|
|
|
|
|
if any(keyword in action_lower for keyword in ['sql', 'execute', 'eval', 'script']): |
|
|
scores["attack_complexity"] = {"score": 2, "rationale": "High technical skill required"} |
|
|
elif any(keyword in action_lower for keyword in ['modify', 'delete']): |
|
|
scores["attack_complexity"] = {"score": 1, "rationale": "Moderate technical skill needed"} |
|
|
|
|
|
return scores |
|
|
|
|
|
def calculate_risk_score(breakdown: Dict[str, Dict[str, Any]]) -> float: |
|
|
""" |
|
|
Calculate overall risk score using AIVSS formula: |
|
|
Base Score = (C + I + A) × S × (1 + PR/4) × (1 - AC/6) |
|
|
Normalized Score = min(10, Base Score) |
|
|
""" |
|
|
C = breakdown["confidentiality_impact"]["score"] |
|
|
I = breakdown["integrity_impact"]["score"] |
|
|
A = breakdown["availability_impact"]["score"] |
|
|
S = breakdown["scope"]["score"] |
|
|
PR = breakdown["privilege_required"]["score"] |
|
|
AC = breakdown["attack_complexity"]["score"] |
|
|
|
|
|
base_score = (C + I + A) * S * (1 + PR/4) * (1 - AC/6) |
|
|
normalized_score = min(10.0, base_score) |
|
|
|
|
|
return round(normalized_score, 1) |
|
|
|
|
|
def get_severity(score: float) -> str: |
|
|
"""Map score to severity level""" |
|
|
if score >= 8.0: |
|
|
return "CRITICAL" |
|
|
elif score >= 6.0: |
|
|
return "HIGH" |
|
|
elif score >= 3.0: |
|
|
return "MEDIUM" |
|
|
else: |
|
|
return "LOW" |
|
|
|
|
|
def get_decision(score: float, risk_tolerance: str) -> str: |
|
|
"""Determine decision based on score and risk tolerance""" |
|
|
thresholds = load_risk_thresholds() |
|
|
tolerance = thresholds['risk_tolerance_levels'].get(risk_tolerance, thresholds['risk_tolerance_levels']['medium']) |
|
|
|
|
|
if score < tolerance['approve_threshold']: |
|
|
return "APPROVE" |
|
|
elif score < tolerance['deny_threshold']: |
|
|
return "REQUIRES_APPROVAL" |
|
|
else: |
|
|
return "DENY" |
|
|
|
|
|
def score_action_risk( |
|
|
action: str, |
|
|
target_system: str, |
|
|
agent_id: Optional[str] = None, |
|
|
context: Optional[Dict[str, Any]] = None, |
|
|
risk_tolerance: str = "medium" |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Comprehensive risk scoring aligned with AIVSS methodology |
|
|
|
|
|
Args: |
|
|
action: Description of the proposed action |
|
|
target_system: System/resource being acted upon |
|
|
agent_id: Agent requesting the action (optional) |
|
|
context: Additional context (data sensitivity, connected systems, etc.) |
|
|
risk_tolerance: "low", "medium", or "high" - organizational risk appetite |
|
|
|
|
|
Returns: |
|
|
Risk assessment with score, severity, decision, and recommendations |
|
|
""" |
|
|
|
|
|
breakdown = calculate_impact_scores(action, target_system, context) |
|
|
|
|
|
|
|
|
overall_score = calculate_risk_score(breakdown) |
|
|
|
|
|
|
|
|
severity = get_severity(overall_score) |
|
|
decision = get_decision(overall_score, risk_tolerance) |
|
|
|
|
|
|
|
|
llm_analysis = analyze_action_with_llm(action, target_system, context) |
|
|
|
|
|
|
|
|
recommendations = [] |
|
|
required_controls = [] |
|
|
|
|
|
if decision == "DENY": |
|
|
recommendations.append("Action poses unacceptable risk and should not proceed") |
|
|
recommendations.append("Consider alternative approaches with lower risk") |
|
|
elif decision == "REQUIRES_APPROVAL": |
|
|
recommendations.append("Proceed with human approval and enhanced logging") |
|
|
recommendations.append("Document justification and rollback plan") |
|
|
required_controls = [ |
|
|
"Human-in-the-loop approval", |
|
|
"Transaction logging enabled", |
|
|
"Rollback plan documented" |
|
|
] |
|
|
else: |
|
|
recommendations.append("Action approved with standard monitoring") |
|
|
required_controls = ["Standard audit logging"] |
|
|
|
|
|
|
|
|
if severity in ["HIGH", "CRITICAL"]: |
|
|
required_controls.append("Real-time monitoring required") |
|
|
if llm_analysis.get('reversibility') == 'irreversible': |
|
|
required_controls.append("Backup verification before execution") |
|
|
|
|
|
from .audit import generate_audit_id |
|
|
audit_id = generate_audit_id("risk") |
|
|
|
|
|
return { |
|
|
"overall_score": overall_score, |
|
|
"severity": severity, |
|
|
"decision": decision, |
|
|
"breakdown": breakdown, |
|
|
"llm_analysis": llm_analysis, |
|
|
"recommendation": recommendations[0] if recommendations else "", |
|
|
"required_controls": required_controls, |
|
|
"audit_id": audit_id, |
|
|
"risk_tolerance": risk_tolerance |
|
|
} |
|
|
|