"""AIVSS-Aligned Risk Scoring System""" import json import os from pathlib import Path from typing import Dict, Any, Optional def load_risk_thresholds() -> Dict[str, Any]: """Load risk thresholds configuration""" thresholds_path = Path(__file__).parent.parent / "data" / "risk_thresholds.json" with open(thresholds_path, 'r') as f: return json.load(f) def analyze_action_with_llm( action: str, target_system: str, context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Use LLM to analyze action for nuanced risk assessment Returns unintended consequences, cascading risks, and reversibility """ try: import anthropic api_key = os.environ.get('ANTHROPIC_API_KEY') if not api_key: return { "unintended_consequences": [], "cascading_risks": [], "reversibility": "unknown", "confidence": 0.0, "error": "ANTHROPIC_API_KEY not set" } client = anthropic.Anthropic(api_key=api_key) context_str = json.dumps(context, indent=2) if context else "No additional context" prompt = f"""You are a security risk analyst. Analyze this proposed action for potential risks: Action: {action} Target System: {target_system} Context: {context_str} Provide a risk analysis including: 1. Potential unintended consequences 2. Cascading failure risks 3. Reversibility assessment (fully reversible, partially reversible, irreversible) Respond with JSON only: {{ "unintended_consequences": ["list of 2-3 potential unintended effects"], "cascading_risks": ["list of 1-2 potential cascading failures"], "reversibility": "fully reversible|partially reversible|irreversible", "confidence": 0.0-1.0 }}""" response = client.messages.create( model="claude-3-haiku-20240307", max_tokens=500, messages=[{"role": "user", "content": prompt}] ) response_text = response.content[0].text.strip() # Extract JSON if wrapped in markdown if "```json" in response_text: response_text = response_text.split("```json")[1].split("```")[0].strip() elif "```" in response_text: response_text = response_text.split("```")[1].split("```")[0].strip() result = json.loads(response_text) return result except Exception as e: return { "unintended_consequences": [], "cascading_risks": [], "reversibility": "unknown", "confidence": 0.0, "error": str(e) } def calculate_impact_scores( action: str, target_system: str, context: Optional[Dict[str, Any]] = None ) -> Dict[str, Dict[str, Any]]: """ Calculate AIVSS impact scores based on action and context Returns scores for C, I, A, S, PR, AC """ # Default scores scores = { "confidentiality_impact": {"score": 0, "rationale": "No data access detected"}, "integrity_impact": {"score": 0, "rationale": "No data modification detected"}, "availability_impact": {"score": 0, "rationale": "No service disruption detected"}, "scope": {"score": 1, "rationale": "Unchanged scope"}, "privilege_required": {"score": 0, "rationale": "No authentication required"}, "attack_complexity": {"score": 0, "rationale": "Low complexity"} } action_lower = action.lower() target_lower = target_system.lower() # Confidentiality Impact if any(keyword in action_lower for keyword in ['read', 'access', 'view', 'query', 'list']): if any(keyword in target_lower for keyword in ['pii', 'personal', 'user', 'customer', 'payment', 'credential']): scores["confidentiality_impact"] = {"score": 3, "rationale": "Action accesses sensitive data (PII/credentials)"} elif any(keyword in target_lower for keyword in ['database', 'file', 'record']): scores["confidentiality_impact"] = {"score": 2, "rationale": "Action accesses internal data"} else: scores["confidentiality_impact"] = {"score": 1, "rationale": "Action accesses low-sensitivity data"} # Integrity Impact if any(keyword in action_lower for keyword in ['write', 'modify', 'update', 'delete', 'drop', 'alter', 'change']): if any(keyword in action_lower for keyword in ['delete', 'drop', 'remove']): scores["integrity_impact"] = {"score": 3, "rationale": "Action permanently modifies/deletes data"} elif any(keyword in target_lower for keyword in ['database', 'user', 'record', 'config']): scores["integrity_impact"] = {"score": 2, "rationale": "Action modifies critical data"} else: scores["integrity_impact"] = {"score": 1, "rationale": "Action makes minor modifications"} # Availability Impact if any(keyword in action_lower for keyword in ['delete', 'drop', 'shutdown', 'terminate', 'kill', 'stop']): if 'all' in action_lower or 'database' in target_lower or 'service' in target_lower: scores["availability_impact"] = {"score": 3, "rationale": "Action could cause service outage"} else: scores["availability_impact"] = {"score": 2, "rationale": "Action affects availability of resources"} elif any(keyword in action_lower for keyword in ['restart', 'reload']): scores["availability_impact"] = {"score": 1, "rationale": "Action causes temporary disruption"} # Scope if any(keyword in target_lower for keyword in ['all', 'system', 'global', 'production']): scores["scope"] = {"score": 2, "rationale": "Action affects multiple systems/components"} # Check context for scope if context and context.get('connected_systems'): scores["scope"] = {"score": 2, "rationale": "Action affects downstream systems"} # Privilege Required if any(keyword in action_lower for keyword in ['admin', 'root', 'sudo', 'execute', 'delete']): scores["privilege_required"] = {"score": 2, "rationale": "Action requires elevated privileges"} elif any(keyword in action_lower for keyword in ['write', 'modify', 'create']): scores["privilege_required"] = {"score": 1, "rationale": "Action requires authenticated user"} # Attack Complexity if any(keyword in action_lower for keyword in ['sql', 'execute', 'eval', 'script']): scores["attack_complexity"] = {"score": 2, "rationale": "High technical skill required"} elif any(keyword in action_lower for keyword in ['modify', 'delete']): scores["attack_complexity"] = {"score": 1, "rationale": "Moderate technical skill needed"} return scores def calculate_risk_score(breakdown: Dict[str, Dict[str, Any]]) -> float: """ Calculate overall risk score using AIVSS formula: Base Score = (C + I + A) × S × (1 + PR/4) × (1 - AC/6) Normalized Score = min(10, Base Score) """ C = breakdown["confidentiality_impact"]["score"] I = breakdown["integrity_impact"]["score"] A = breakdown["availability_impact"]["score"] S = breakdown["scope"]["score"] PR = breakdown["privilege_required"]["score"] AC = breakdown["attack_complexity"]["score"] base_score = (C + I + A) * S * (1 + PR/4) * (1 - AC/6) normalized_score = min(10.0, base_score) return round(normalized_score, 1) def get_severity(score: float) -> str: """Map score to severity level""" if score >= 8.0: return "CRITICAL" elif score >= 6.0: return "HIGH" elif score >= 3.0: return "MEDIUM" else: return "LOW" def get_decision(score: float, risk_tolerance: str) -> str: """Determine decision based on score and risk tolerance""" thresholds = load_risk_thresholds() tolerance = thresholds['risk_tolerance_levels'].get(risk_tolerance, thresholds['risk_tolerance_levels']['medium']) if score < tolerance['approve_threshold']: return "APPROVE" elif score < tolerance['deny_threshold']: return "REQUIRES_APPROVAL" else: return "DENY" def score_action_risk( action: str, target_system: str, agent_id: Optional[str] = None, context: Optional[Dict[str, Any]] = None, risk_tolerance: str = "medium" ) -> Dict[str, Any]: """ Comprehensive risk scoring aligned with AIVSS methodology Args: action: Description of the proposed action target_system: System/resource being acted upon agent_id: Agent requesting the action (optional) context: Additional context (data sensitivity, connected systems, etc.) risk_tolerance: "low", "medium", or "high" - organizational risk appetite Returns: Risk assessment with score, severity, decision, and recommendations """ # Calculate impact scores breakdown = calculate_impact_scores(action, target_system, context) # Calculate overall risk score overall_score = calculate_risk_score(breakdown) # Get severity and decision severity = get_severity(overall_score) decision = get_decision(overall_score, risk_tolerance) # Get LLM analysis for nuanced assessment llm_analysis = analyze_action_with_llm(action, target_system, context) # Generate recommendations based on score and decision recommendations = [] required_controls = [] if decision == "DENY": recommendations.append("Action poses unacceptable risk and should not proceed") recommendations.append("Consider alternative approaches with lower risk") elif decision == "REQUIRES_APPROVAL": recommendations.append("Proceed with human approval and enhanced logging") recommendations.append("Document justification and rollback plan") required_controls = [ "Human-in-the-loop approval", "Transaction logging enabled", "Rollback plan documented" ] else: recommendations.append("Action approved with standard monitoring") required_controls = ["Standard audit logging"] # Add controls based on severity if severity in ["HIGH", "CRITICAL"]: required_controls.append("Real-time monitoring required") if llm_analysis.get('reversibility') == 'irreversible': required_controls.append("Backup verification before execution") from .audit import generate_audit_id audit_id = generate_audit_id("risk") return { "overall_score": overall_score, "severity": severity, "decision": decision, "breakdown": breakdown, "llm_analysis": llm_analysis, "recommendation": recommendations[0] if recommendations else "", "required_controls": required_controls, "audit_id": audit_id, "risk_tolerance": risk_tolerance }