"""Zero-Trust Permission Validation System""" import json import re from pathlib import Path from typing import Dict, Any, List, Optional from datetime import datetime def load_permission_matrix() -> Dict[str, Any]: """Load permission matrix from JSON""" matrix_path = Path(__file__).parent.parent / "data" / "permission_matrix.json" with open(matrix_path, 'r') as f: return json.load(f) def get_agent_role(agent_id: str) -> Optional[str]: """ Extract role from agent_id Expected format: role-name-01, role-name-02, etc. """ if not agent_id: return "guest-agent" # Extract role from agent_id (e.g., "data-processor-01" -> "data-processor") parts = agent_id.rsplit('-', 1) if len(parts) == 2 and parts[1].isdigit(): return parts[0] return agent_id def check_pattern_match(resource: str, patterns: List[str]) -> bool: """ Check if resource matches any of the allowed patterns Supports wildcards like database:*:read or filesystem:/tmp/*:write """ for pattern in patterns: # Convert wildcard pattern to regex regex_pattern = pattern.replace('*', '.*').replace(':', r'\:') if re.match(f"^{regex_pattern}$", resource): return True return False def check_always_deny(action: str, resource: str) -> tuple[bool, Optional[str]]: """Check if action is in always_deny list""" matrix = load_permission_matrix() always_deny = matrix['default_policies']['always_deny'] for denied_pattern in always_deny: # Check if action matches denied pattern if '*' in denied_pattern: pattern = denied_pattern.replace('*', '.*') if re.match(f"^{pattern}$", action): return True, f"Action '{action}' is globally denied" elif action == denied_pattern: return True, f"Action '{action}' is globally denied" return False, None def check_requires_approval(action: str, resource: str) -> bool: """Check if action requires human approval""" matrix = load_permission_matrix() require_approval = matrix['default_policies']['require_approval_for'] for approval_pattern in require_approval: if '*' in approval_pattern: pattern = approval_pattern.replace('*', '.*') if re.match(f"^{pattern}$", action): return True elif action == approval_pattern: return True # Check resource patterns for sensitive data sensitive_keywords = ['secret', 'credential', 'password', 'token', 'key', 'payment'] resource_lower = resource.lower() if any(keyword in resource_lower for keyword in sensitive_keywords): return True return False def validate_permissions( agent_id: str, action: str, resource: str, current_permissions: Optional[List[str]] = None, request_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Zero-trust permission validation Args: agent_id: Unique identifier for the agent action: The action being attempted (e.g., "read_file", "execute_code") resource: The target resource (e.g., "/etc/passwd", "database:users") current_permissions: Agent's current permission set (optional) request_context: Additional context (IP, session_id, timestamp) Returns: Validation result with decision and recommendations """ matrix = load_permission_matrix() # Check if action is globally denied is_denied, deny_reason = check_always_deny(action, resource) if is_denied: from .audit import generate_audit_id audit_id = generate_audit_id("perm") return { "allowed": False, "decision": "DENY", "reason": deny_reason, "agent_role": "unknown", "required_permissions": [], "current_permissions": current_permissions or [], "permission_gap": [], "recommendations": ["This action is prohibited by security policy"], "escalation_path": "Contact security-admin@company.com", "audit_id": audit_id } # Get agent role role = get_agent_role(agent_id) # Check if role exists in matrix if role not in matrix['roles']: from .audit import generate_audit_id audit_id = generate_audit_id("perm") return { "allowed": False, "decision": "DENY", "reason": f"Unknown agent role: '{role}'", "agent_role": role, "required_permissions": [], "current_permissions": current_permissions or [], "permission_gap": [], "recommendations": ["Register agent with valid role in permission matrix"], "escalation_path": "Contact admin to configure agent permissions", "audit_id": audit_id } role_config = matrix['roles'][role] # Check if action is explicitly denied for this role if action in role_config.get('denied_actions', []): from .audit import generate_audit_id audit_id = generate_audit_id("perm") return { "allowed": False, "decision": "DENY", "reason": f"Agent role '{role}' explicitly denies action '{action}'", "agent_role": role, "required_permissions": [], "current_permissions": current_permissions or [], "permission_gap": [f"{action} on {resource}"], "recommendations": [ "This action is not permitted for your role", "Request role change if elevated access is needed" ], "escalation_path": "Contact security-admin@company.com", "audit_id": audit_id } # Check if action is in allowed_actions if action not in role_config['allowed_actions']: from .audit import generate_audit_id audit_id = generate_audit_id("perm") return { "allowed": False, "decision": "DENY", "reason": f"Action '{action}' not in allowed actions for role '{role}'", "agent_role": role, "required_permissions": [f"{action}:{resource}"], "current_permissions": role_config['allowed_actions'], "permission_gap": [action], "recommendations": [ "Request permission addition from administrator", "Use alternative action within your current permissions" ], "escalation_path": "Submit permission request at /admin/permissions", "audit_id": audit_id } # Check if resource matches allowed patterns resource_allowed = check_pattern_match(resource, role_config['resource_patterns']) if not resource_allowed: from .audit import generate_audit_id audit_id = generate_audit_id("perm") return { "allowed": False, "decision": "DENY", "reason": f"Resource '{resource}' does not match allowed patterns for role '{role}'", "agent_role": role, "required_permissions": [f"{action}:{resource}"], "current_permissions": role_config['resource_patterns'], "permission_gap": [f"access to {resource}"], "recommendations": [ "Verify resource path is correct", "Request access to this resource pattern" ], "escalation_path": "Contact security-admin@company.com", "audit_id": audit_id } # Check if action requires approval requires_approval = check_requires_approval(action, resource) from .audit import generate_audit_id audit_id = generate_audit_id("perm") if requires_approval: return { "allowed": False, "decision": "REQUIRES_APPROVAL", "reason": f"Action '{action}' on '{resource}' requires human approval", "agent_role": role, "required_permissions": [f"{action}:{resource}"], "current_permissions": role_config['allowed_actions'], "permission_gap": ["human approval"], "recommendations": [ "Submit approval request with justification", "Approval required due to sensitive action/resource" ], "escalation_path": "Submit at /admin/approval-requests", "audit_id": audit_id, "approval_required": True } # Permission granted return { "allowed": True, "decision": "ALLOW", "reason": f"Agent '{agent_id}' has valid permissions for '{action}' on '{resource}'", "agent_role": role, "required_permissions": [f"{action}:{resource}"], "current_permissions": role_config['allowed_actions'], "permission_gap": [], "recommendations": [], "escalation_path": None, "audit_id": audit_id }