File size: 8,973 Bytes
e856398 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
"""Zero-Trust Permission Validation System"""
import json
import re
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
def load_permission_matrix() -> Dict[str, Any]:
"""Load permission matrix from JSON"""
matrix_path = Path(__file__).parent.parent / "data" / "permission_matrix.json"
with open(matrix_path, 'r') as f:
return json.load(f)
def get_agent_role(agent_id: str) -> Optional[str]:
"""
Extract role from agent_id
Expected format: role-name-01, role-name-02, etc.
"""
if not agent_id:
return "guest-agent"
# Extract role from agent_id (e.g., "data-processor-01" -> "data-processor")
parts = agent_id.rsplit('-', 1)
if len(parts) == 2 and parts[1].isdigit():
return parts[0]
return agent_id
def check_pattern_match(resource: str, patterns: List[str]) -> bool:
"""
Check if resource matches any of the allowed patterns
Supports wildcards like database:*:read or filesystem:/tmp/*:write
"""
for pattern in patterns:
# Convert wildcard pattern to regex
regex_pattern = pattern.replace('*', '.*').replace(':', r'\:')
if re.match(f"^{regex_pattern}$", resource):
return True
return False
def check_always_deny(action: str, resource: str) -> tuple[bool, Optional[str]]:
"""Check if action is in always_deny list"""
matrix = load_permission_matrix()
always_deny = matrix['default_policies']['always_deny']
for denied_pattern in always_deny:
# Check if action matches denied pattern
if '*' in denied_pattern:
pattern = denied_pattern.replace('*', '.*')
if re.match(f"^{pattern}$", action):
return True, f"Action '{action}' is globally denied"
elif action == denied_pattern:
return True, f"Action '{action}' is globally denied"
return False, None
def check_requires_approval(action: str, resource: str) -> bool:
"""Check if action requires human approval"""
matrix = load_permission_matrix()
require_approval = matrix['default_policies']['require_approval_for']
for approval_pattern in require_approval:
if '*' in approval_pattern:
pattern = approval_pattern.replace('*', '.*')
if re.match(f"^{pattern}$", action):
return True
elif action == approval_pattern:
return True
# Check resource patterns for sensitive data
sensitive_keywords = ['secret', 'credential', 'password', 'token', 'key', 'payment']
resource_lower = resource.lower()
if any(keyword in resource_lower for keyword in sensitive_keywords):
return True
return False
def validate_permissions(
agent_id: str,
action: str,
resource: str,
current_permissions: Optional[List[str]] = None,
request_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Zero-trust permission validation
Args:
agent_id: Unique identifier for the agent
action: The action being attempted (e.g., "read_file", "execute_code")
resource: The target resource (e.g., "/etc/passwd", "database:users")
current_permissions: Agent's current permission set (optional)
request_context: Additional context (IP, session_id, timestamp)
Returns:
Validation result with decision and recommendations
"""
matrix = load_permission_matrix()
# Check if action is globally denied
is_denied, deny_reason = check_always_deny(action, resource)
if is_denied:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": deny_reason,
"agent_role": "unknown",
"required_permissions": [],
"current_permissions": current_permissions or [],
"permission_gap": [],
"recommendations": ["This action is prohibited by security policy"],
"escalation_path": "Contact security-admin@company.com",
"audit_id": audit_id
}
# Get agent role
role = get_agent_role(agent_id)
# Check if role exists in matrix
if role not in matrix['roles']:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Unknown agent role: '{role}'",
"agent_role": role,
"required_permissions": [],
"current_permissions": current_permissions or [],
"permission_gap": [],
"recommendations": ["Register agent with valid role in permission matrix"],
"escalation_path": "Contact admin to configure agent permissions",
"audit_id": audit_id
}
role_config = matrix['roles'][role]
# Check if action is explicitly denied for this role
if action in role_config.get('denied_actions', []):
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Agent role '{role}' explicitly denies action '{action}'",
"agent_role": role,
"required_permissions": [],
"current_permissions": current_permissions or [],
"permission_gap": [f"{action} on {resource}"],
"recommendations": [
"This action is not permitted for your role",
"Request role change if elevated access is needed"
],
"escalation_path": "Contact security-admin@company.com",
"audit_id": audit_id
}
# Check if action is in allowed_actions
if action not in role_config['allowed_actions']:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Action '{action}' not in allowed actions for role '{role}'",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['allowed_actions'],
"permission_gap": [action],
"recommendations": [
"Request permission addition from administrator",
"Use alternative action within your current permissions"
],
"escalation_path": "Submit permission request at /admin/permissions",
"audit_id": audit_id
}
# Check if resource matches allowed patterns
resource_allowed = check_pattern_match(resource, role_config['resource_patterns'])
if not resource_allowed:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Resource '{resource}' does not match allowed patterns for role '{role}'",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['resource_patterns'],
"permission_gap": [f"access to {resource}"],
"recommendations": [
"Verify resource path is correct",
"Request access to this resource pattern"
],
"escalation_path": "Contact security-admin@company.com",
"audit_id": audit_id
}
# Check if action requires approval
requires_approval = check_requires_approval(action, resource)
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
if requires_approval:
return {
"allowed": False,
"decision": "REQUIRES_APPROVAL",
"reason": f"Action '{action}' on '{resource}' requires human approval",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['allowed_actions'],
"permission_gap": ["human approval"],
"recommendations": [
"Submit approval request with justification",
"Approval required due to sensitive action/resource"
],
"escalation_path": "Submit at /admin/approval-requests",
"audit_id": audit_id,
"approval_required": True
}
# Permission granted
return {
"allowed": True,
"decision": "ALLOW",
"reason": f"Agent '{agent_id}' has valid permissions for '{action}' on '{resource}'",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['allowed_actions'],
"permission_gap": [],
"recommendations": [],
"escalation_path": None,
"audit_id": audit_id
}
|