Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import pytest
from unittest.mock import MagicMock
from botocore.exceptions import ClientError
from tevico.engine.entities.report.check_model import (
CheckStatus,
CheckMetadata,
Remediation,
RemediationCode,
RemediationRecommendation,
)
from library.aws.checks.inspector.inspector_ec2_scan_enabled import inspector_ec2_scan_enabled


class TestInspectorEC2ScanEnabled:
def setup_method(self):
metadata = CheckMetadata(
Provider="aws",
CheckID="inspector_ec2_scan_enabled",
CheckTitle="Ensure Inspector EC2 standard scan is enabled",
CheckType=["security"],
ServiceName="inspector2",
SubServiceName="ec2",
ResourceIdTemplate="",
Severity="high",
ResourceType="account",
Risk="If Inspector EC2 scans are not enabled, you might miss vulnerabilities on your EC2 instances.",
RelatedUrl="https://docs.aws.amazon.com/inspector/latest/user/ec2-scan.html",
Remediation=Remediation(
Code=RemediationCode(
CLI="aws inspector2 enable --resource-types EC2",
Terraform=None,
NativeIaC=None,
Other=None,
),
Recommendation=RemediationRecommendation(
Text="Enable Inspector2 EC2 scanning.",
Url="https://docs.aws.amazon.com/inspector/latest/user/ec2-scan.html",
),
),
Description="Checks whether Inspector EC2 standard scan is enabled.",
Categories=["security", "compliance"],
)

self.check = inspector_ec2_scan_enabled(metadata)
self.mock_session = MagicMock()
self.mock_inspector_client = MagicMock()
self.mock_sts_client = MagicMock()

self.mock_session.client.side_effect = lambda service: {
"inspector2": self.mock_inspector_client,
"sts": self.mock_sts_client,
}[service]

self.mock_sts_client.get_caller_identity.return_value = {
"Account": "123456789012"
}

def set_inspector_response(self, status):
self.mock_inspector_client.batch_get_account_status.return_value = {
"accounts": [
{
"resourceState": {
"ec2": {
"status": status
}
}
}
]
}

def test_ec2_scan_enabled(self):
self.set_inspector_response("ENABLED")
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.PASSED
assert any(r.summary and "enabled" in r.summary.lower() for r in report.resource_ids_status)

def test_ec2_scan_disabled(self):
self.set_inspector_response("DISABLED")
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.FAILED
assert any(r.summary and "not enabled" in r.summary.lower() for r in report.resource_ids_status)

def test_ec2_scan_suspended(self):
self.set_inspector_response("SUSPENDED")
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.FAILED
assert any(r.summary and "suspended" in r.summary.lower() for r in report.resource_ids_status)

def test_ec2_scan_transitional(self):
self.set_inspector_response("TRANSITIONING")
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.UNKNOWN
assert any(r.summary and "transitional" in r.summary.lower() for r in report.resource_ids_status)

def test_api_failure(self):
self.mock_inspector_client.batch_get_account_status.side_effect = Exception("Simulated API failure")
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.UNKNOWN
assert any("error checking" in r.summary.lower() for r in report.resource_ids_status)

def test_ec2_scan_status_missing(self):
"""Test when 'ec2' key is completely missing from resourceState."""
self.mock_inspector_client.batch_get_account_status.return_value = {
"accounts": [{"resourceState": {}}] # ec2 key missing
}
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.UNKNOWN
assert any(
"transitional" in r.summary.lower() or r.status == CheckStatus.UNKNOWN
for r in report.resource_ids_status
)

def test_client_error_handling(self):
"""Test AWS ClientError (e.g., access denied)."""
self.mock_inspector_client.batch_get_account_status.side_effect = ClientError(
error_response={"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
operation_name="BatchGetAccountStatus"
)
report = self.check.execute(self.mock_session)

assert report.status == CheckStatus.UNKNOWN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠 Areas for Improvement

⚠️ 1. Missing a Case Where No ec2 Key Exists in resourceState

In real-world scenarios, it’s possible that the ec2 key is not present at all (e.g., misconfigured Inspector2 response or service not onboarded yet).

Recommendation:
Add a test like this:

def test_ec2_scan_status_missing(self):
    self.mock_inspector_client.batch_get_account_status.return_value = {
        "accounts": [{}]
    }

    report = self.check.execute(self.mock_session)
    assert report.status == CheckStatus.UNKNOWN
    assert any("transitional" in r.summary.lower() or "unknown" in r.status.name.lower()
               for r in report.resource_ids_status)

This would ensure the check handles partial or malformed responses gracefully.


⚠️ 2. ClientError Not Explicitly Tested

While you do simulate a generic Exception, testing for AWS-specific ClientError would align with how other AWS SDKs (like IAM or S3) often fail.

Why it matters: It helps ensure your except Exception block is robust enough to catch and format structured errors cleanly.

Suggestion:

from botocore.exceptions import ClientError

def test_client_error_handling(self):
    self.mock_inspector_client.batch_get_account_status.side_effect = ClientError(
        error_response={"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
        operation_name="BatchGetAccountStatus"
    )
    report = self.check.execute(self.mock_session)
    assert report.status == CheckStatus.UNKNOWN
    assert any("access denied" in r.summary.lower() for r in report.resource_ids_status)

⚠️ 3. Account ID Not Used in Summary

The check fetches account_id from STS, but that value is never surfaced in the summary. Including it might help users correlate findings, especially in org-level scanning setups.

Not mandatory, but worth considering for clarity.

assert any("access denied" in r.summary.lower() for r in report.resource_ids_status)