Skip to content

Commit fe93475

Browse files
committed
refined version
1 parent 2e4220f commit fe93475

File tree

7 files changed

+358
-141
lines changed

7 files changed

+358
-141
lines changed

.coverage

0 Bytes
Binary file not shown.

devolv/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.1.10"
1+
__version__ = "0.1.13"

devolv/iam/validator/cli.py

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,66 @@
11
import typer
22
import os
3+
import json
4+
import ast
35
from typer import Exit
46
from devolv.iam.validator.core import validate_policy_file
57
from devolv.iam.validator.folder import validate_policy_folder
68

9+
app = typer.Typer(help="IAM Policy Validator CLI")
10+
11+
@app.command("validate")
712
def validate(
813
path: str,
914
json_output: bool = typer.Option(False, "--json", help="Output findings in JSON format"),
1015
quiet: bool = typer.Option(False, "--quiet", help="Suppress debug logs"),
1116
):
1217
if not os.path.exists(path):
13-
typer.secho(f"❌ Path not found: {path}", fg=typer.colors.RED)
18+
typer.secho(f"❌ File not found: {path}", fg=typer.colors.RED)
1419
raise Exit(code=1)
1520

21+
findings = []
1622
if os.path.isfile(path):
1723
findings = validate_policy_file(path)
18-
if not findings:
19-
typer.secho("✅ Policy is valid and passed all checks.", fg=typer.colors.GREEN)
20-
raise Exit(code=0)
21-
for finding in findings:
22-
typer.secho(f"❌ {finding['level'].upper()}: {finding['message']}", fg=typer.colors.RED)
23-
raise Exit(code=1)
24-
2524
elif os.path.isdir(path):
2625
findings = validate_policy_folder(path)
27-
if not findings:
28-
typer.secho("✅ All policies passed validation.", fg=typer.colors.GREEN)
29-
raise Exit(code=0)
30-
for finding in findings:
31-
typer.secho(f"❌ {finding['level'].upper()}: {finding['message']}", fg=typer.colors.RED)
32-
if any(f["level"] in ("error", "high") for f in findings):
33-
raise Exit(code=1)
26+
else:
27+
typer.secho(f"❌ Unsupported path type: {path}", fg=typer.colors.RED)
28+
raise Exit(code=1)
29+
30+
if not findings:
31+
msg = (
32+
"✅ Policy is valid and passed all checks."
33+
if os.path.isfile(path)
34+
else "✅ All policies passed validation."
35+
)
36+
typer.secho(msg, fg=typer.colors.GREEN)
3437
raise Exit(code=0)
3538

39+
if json_output:
40+
typer.echo(json.dumps(findings, indent=2))
3641
else:
37-
typer.secho(f"❌ Unsupported path type: {path}", fg=typer.colors.RED)
42+
for finding in findings:
43+
msg = finding.get('message', '')
44+
try:
45+
inner_findings = ast.literal_eval(msg) if isinstance(msg, str) else msg
46+
if isinstance(inner_findings, list):
47+
for inner in inner_findings:
48+
typer.secho(
49+
f"❌ {inner.get('level', '').upper()}: {inner.get('message', '')}",
50+
fg=typer.colors.RED
51+
)
52+
else:
53+
typer.secho(
54+
f"❌ {finding.get('level', '').upper()}: {msg}",
55+
fg=typer.colors.RED
56+
)
57+
except Exception:
58+
typer.secho(
59+
f"❌ {finding.get('level', '').upper()}: {msg}",
60+
fg=typer.colors.RED
61+
)
62+
63+
if any(f.get("level", "").lower() in ("error", "high") for f in findings):
3864
raise Exit(code=1)
65+
else:
66+
raise Exit(code=0)

devolv/iam/validator/core.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
11
import json
22
import yaml
3-
from pathlib import Path
43
from devolv.iam.validator.rules import RULES
54

65
def load_policy(path: str):
76
with open(path, "r") as f:
87
content = f.read()
98
if not content.strip():
109
raise ValueError("Policy file is empty.")
11-
f.seek(0) # reset file pointer
10+
f.seek(0)
1211
if path.endswith((".yaml", ".yml")):
13-
return yaml.safe_load(f)
14-
return json.load(f)
15-
12+
return yaml.safe_load(f), content.splitlines()
13+
return json.load(f), content.splitlines()
1614

1715
def validate_policy_file(path: str):
18-
data = load_policy(path)
16+
data, raw_lines = load_policy(path)
1917
findings = []
2018
for rule in RULES:
21-
result = rule["check"](data)
19+
result = rule["check"](data, raw_lines=raw_lines)
2220
if result:
2321
finding = {
2422
"id": rule["id"],
@@ -27,5 +25,3 @@ def validate_policy_file(path: str):
2725
}
2826
findings.append(finding)
2927
return findings
30-
31-

devolv/iam/validator/rules.py

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,109 @@
1-
def check_wildcard_actions(policy):
1+
import json
2+
3+
def _find_statement_line(stmt, raw_lines):
4+
if not raw_lines:
5+
return None
6+
7+
effect = stmt.get("Effect")
8+
actions = stmt.get("Action", [])
9+
resources = stmt.get("Resource", [])
10+
11+
if isinstance(actions, str):
12+
actions = [actions]
13+
if isinstance(resources, str):
14+
resources = [resources]
15+
16+
# Scan for Action lines directly
17+
for i in range(len(raw_lines)):
18+
if any(a in raw_lines[i] for a in actions):
19+
# Look ahead for Resource in next few lines
20+
block = "\n".join(raw_lines[i:i+5])
21+
if any(r in block for r in resources):
22+
return i + 1
23+
24+
# Fallback: look for Effect + Action in block
25+
for i in range(len(raw_lines)):
26+
if f'"Effect": "{effect}"' in raw_lines[i] or f"'Effect': '{effect}'" in raw_lines[i]:
27+
block = "\n".join(raw_lines[i:i+10])
28+
if any(a in block for a in actions) and any(r in block for r in resources):
29+
return i + 1
30+
31+
return None
32+
33+
def check_wildcard_actions(policy, raw_lines=None):
34+
findings = []
235
statements = policy.get("Statement", [])
336
if not isinstance(statements, list):
437
statements = [statements]
38+
539
for stmt in statements:
640
if stmt.get("Effect", "Allow") != "Allow":
7-
continue # Skip Deny statements
41+
continue
42+
843
actions = stmt.get("Action", [])
44+
resources = stmt.get("Resource", [])
45+
946
if isinstance(actions, str):
1047
actions = [actions]
11-
if any(a == "*" or a.endswith(":*") for a in actions):
12-
return "Policy uses wildcard in Action, which is overly permissive."
13-
return None
48+
if isinstance(resources, str):
49+
resources = [resources]
1450

15-
def check_passrole_wildcard(policy):
51+
for a in actions:
52+
if a == "*" or a.endswith(":*"):
53+
line_num = _find_statement_line(stmt, raw_lines)
54+
findings.append({
55+
"id": "IAM001",
56+
"level": "high",
57+
"message": (
58+
f"Policy uses overly permissive action '{a}' "
59+
+ (f"with resource {resources}" if resources else "without resource scope")
60+
+ (f". Statement starts at line {line_num}." if line_num else "")
61+
)
62+
})
63+
return findings
64+
65+
66+
def check_passrole_wildcard(policy, raw_lines=None):
67+
findings = []
1668
statements = policy.get("Statement", [])
1769
if not isinstance(statements, list):
1870
statements = [statements]
71+
1972
for stmt in statements:
2073
if stmt.get("Effect", "Allow") != "Allow":
21-
continue # Skip Deny statements
74+
continue
75+
2276
actions = stmt.get("Action", [])
2377
resources = stmt.get("Resource", [])
78+
2479
if isinstance(actions, str):
2580
actions = [actions]
2681
if isinstance(resources, str):
2782
resources = [resources]
28-
if "iam:PassRole" in actions and "*" in resources:
29-
return "iam:PassRole with wildcard resource can lead to privilege escalation."
30-
return None
83+
84+
if any(a.lower() == "iam:passrole" for a in actions) and "*" in resources:
85+
line_num = _find_statement_line(stmt, raw_lines)
86+
findings.append({
87+
"id": "IAM002",
88+
"level": "high",
89+
"message": (
90+
f"iam:PassRole with wildcard Resource ('*') can lead to privilege escalation."
91+
+ (f" Statement starts at line {line_num}." if line_num else "")
92+
)
93+
})
94+
return findings
3195

3296
RULES = [
3397
{
3498
"id": "IAM001",
3599
"level": "high",
36-
"description": "Wildcard in Action",
37-
"check": check_wildcard_actions
100+
"description": "Wildcard in Action (e.g. * or service:*) is overly permissive",
101+
"check": check_wildcard_actions,
38102
},
39103
{
40104
"id": "IAM002",
41105
"level": "high",
42106
"description": "PassRole with wildcard Resource",
43-
"check": check_passrole_wildcard
44-
}
107+
"check": check_passrole_wildcard,
108+
},
45109
]

tests/test_folder_validator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ def test_validate_folder_some_invalid(temp_policy_dir):
4545

4646
result = validate_policy_folder(str(temp_policy_dir))
4747
assert any(f["level"] == "high" for f in result)
48-
assert any("wildcard" in f["message"] for f in result)
48+
assert any("overly permissive" in str(f.get("message", "")).lower() for f in result)
49+
4950

5051
def test_validate_folder_empty(temp_policy_dir):
5152
result = validate_policy_folder(str(temp_policy_dir))
@@ -85,3 +86,4 @@ def test_folder_with_good_and_bad_files(tmp_path):
8586
(tmp_path / "broken.json").write_text('INVALID')
8687
result = validate_policy_folder(str(tmp_path))
8788
assert any("failed" in f["message"] for f in result)
89+

0 commit comments

Comments
 (0)