File size: 8,973 Bytes
e856398
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""Zero-Trust Permission Validation System"""

import json
import re
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime

def load_permission_matrix() -> Dict[str, Any]:
    """Load permission matrix from JSON"""
    matrix_path = Path(__file__).parent.parent / "data" / "permission_matrix.json"
    with open(matrix_path, 'r') as f:
        return json.load(f)

def get_agent_role(agent_id: str) -> Optional[str]:
    """
    Extract role from agent_id
    Expected format: role-name-01, role-name-02, etc.
    """
    if not agent_id:
        return "guest-agent"

    # Extract role from agent_id (e.g., "data-processor-01" -> "data-processor")
    parts = agent_id.rsplit('-', 1)
    if len(parts) == 2 and parts[1].isdigit():
        return parts[0]
    return agent_id

def check_pattern_match(resource: str, patterns: List[str]) -> bool:
    """
    Check if resource matches any of the allowed patterns
    Supports wildcards like database:*:read or filesystem:/tmp/*:write
    """
    for pattern in patterns:
        # Convert wildcard pattern to regex
        regex_pattern = pattern.replace('*', '.*').replace(':', r'\:')
        if re.match(f"^{regex_pattern}$", resource):
            return True
    return False

def check_always_deny(action: str, resource: str) -> tuple[bool, Optional[str]]:
    """Check if action is in always_deny list"""
    matrix = load_permission_matrix()
    always_deny = matrix['default_policies']['always_deny']

    for denied_pattern in always_deny:
        # Check if action matches denied pattern
        if '*' in denied_pattern:
            pattern = denied_pattern.replace('*', '.*')
            if re.match(f"^{pattern}$", action):
                return True, f"Action '{action}' is globally denied"
        elif action == denied_pattern:
            return True, f"Action '{action}' is globally denied"

    return False, None

def check_requires_approval(action: str, resource: str) -> bool:
    """Check if action requires human approval"""
    matrix = load_permission_matrix()
    require_approval = matrix['default_policies']['require_approval_for']

    for approval_pattern in require_approval:
        if '*' in approval_pattern:
            pattern = approval_pattern.replace('*', '.*')
            if re.match(f"^{pattern}$", action):
                return True
        elif action == approval_pattern:
            return True

    # Check resource patterns for sensitive data
    sensitive_keywords = ['secret', 'credential', 'password', 'token', 'key', 'payment']
    resource_lower = resource.lower()
    if any(keyword in resource_lower for keyword in sensitive_keywords):
        return True

    return False

def validate_permissions(
    agent_id: str,
    action: str,
    resource: str,
    current_permissions: Optional[List[str]] = None,
    request_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Zero-trust permission validation

    Args:
        agent_id: Unique identifier for the agent
        action: The action being attempted (e.g., "read_file", "execute_code")
        resource: The target resource (e.g., "/etc/passwd", "database:users")
        current_permissions: Agent's current permission set (optional)
        request_context: Additional context (IP, session_id, timestamp)

    Returns:
        Validation result with decision and recommendations
    """
    matrix = load_permission_matrix()

    # Check if action is globally denied
    is_denied, deny_reason = check_always_deny(action, resource)
    if is_denied:
        from .audit import generate_audit_id
        audit_id = generate_audit_id("perm")

        return {
            "allowed": False,
            "decision": "DENY",
            "reason": deny_reason,
            "agent_role": "unknown",
            "required_permissions": [],
            "current_permissions": current_permissions or [],
            "permission_gap": [],
            "recommendations": ["This action is prohibited by security policy"],
            "escalation_path": "Contact security-admin@company.com",
            "audit_id": audit_id
        }

    # Get agent role
    role = get_agent_role(agent_id)

    # Check if role exists in matrix
    if role not in matrix['roles']:
        from .audit import generate_audit_id
        audit_id = generate_audit_id("perm")

        return {
            "allowed": False,
            "decision": "DENY",
            "reason": f"Unknown agent role: '{role}'",
            "agent_role": role,
            "required_permissions": [],
            "current_permissions": current_permissions or [],
            "permission_gap": [],
            "recommendations": ["Register agent with valid role in permission matrix"],
            "escalation_path": "Contact admin to configure agent permissions",
            "audit_id": audit_id
        }

    role_config = matrix['roles'][role]

    # Check if action is explicitly denied for this role
    if action in role_config.get('denied_actions', []):
        from .audit import generate_audit_id
        audit_id = generate_audit_id("perm")

        return {
            "allowed": False,
            "decision": "DENY",
            "reason": f"Agent role '{role}' explicitly denies action '{action}'",
            "agent_role": role,
            "required_permissions": [],
            "current_permissions": current_permissions or [],
            "permission_gap": [f"{action} on {resource}"],
            "recommendations": [
                "This action is not permitted for your role",
                "Request role change if elevated access is needed"
            ],
            "escalation_path": "Contact security-admin@company.com",
            "audit_id": audit_id
        }

    # Check if action is in allowed_actions
    if action not in role_config['allowed_actions']:
        from .audit import generate_audit_id
        audit_id = generate_audit_id("perm")

        return {
            "allowed": False,
            "decision": "DENY",
            "reason": f"Action '{action}' not in allowed actions for role '{role}'",
            "agent_role": role,
            "required_permissions": [f"{action}:{resource}"],
            "current_permissions": role_config['allowed_actions'],
            "permission_gap": [action],
            "recommendations": [
                "Request permission addition from administrator",
                "Use alternative action within your current permissions"
            ],
            "escalation_path": "Submit permission request at /admin/permissions",
            "audit_id": audit_id
        }

    # Check if resource matches allowed patterns
    resource_allowed = check_pattern_match(resource, role_config['resource_patterns'])

    if not resource_allowed:
        from .audit import generate_audit_id
        audit_id = generate_audit_id("perm")

        return {
            "allowed": False,
            "decision": "DENY",
            "reason": f"Resource '{resource}' does not match allowed patterns for role '{role}'",
            "agent_role": role,
            "required_permissions": [f"{action}:{resource}"],
            "current_permissions": role_config['resource_patterns'],
            "permission_gap": [f"access to {resource}"],
            "recommendations": [
                "Verify resource path is correct",
                "Request access to this resource pattern"
            ],
            "escalation_path": "Contact security-admin@company.com",
            "audit_id": audit_id
        }

    # Check if action requires approval
    requires_approval = check_requires_approval(action, resource)

    from .audit import generate_audit_id
    audit_id = generate_audit_id("perm")

    if requires_approval:
        return {
            "allowed": False,
            "decision": "REQUIRES_APPROVAL",
            "reason": f"Action '{action}' on '{resource}' requires human approval",
            "agent_role": role,
            "required_permissions": [f"{action}:{resource}"],
            "current_permissions": role_config['allowed_actions'],
            "permission_gap": ["human approval"],
            "recommendations": [
                "Submit approval request with justification",
                "Approval required due to sensitive action/resource"
            ],
            "escalation_path": "Submit at /admin/approval-requests",
            "audit_id": audit_id,
            "approval_required": True
        }

    # Permission granted
    return {
        "allowed": True,
        "decision": "ALLOW",
        "reason": f"Agent '{agent_id}' has valid permissions for '{action}' on '{resource}'",
        "agent_role": role,
        "required_permissions": [f"{action}:{resource}"],
        "current_permissions": role_config['allowed_actions'],
        "permission_gap": [],
        "recommendations": [],
        "escalation_path": None,
        "audit_id": audit_id
    }