From c9a7c1abd7676ca22e4718a93b487354bc960cb7 Mon Sep 17 00:00:00 2001 From: Gunjan-Katre-Comprinno Date: Mon, 9 Jun 2025 13:55:41 +0000 Subject: [PATCH 1/2] Added Test Case for check inspector_ec2_scan_enabled --- .../test_inspector_ec2_scan_enabled.py | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py diff --git a/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py b/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py new file mode 100644 index 00000000..d35f348c --- /dev/null +++ b/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py @@ -0,0 +1,94 @@ +import pytest +from unittest.mock import MagicMock +from tevico.engine.entities.report.check_model import CheckStatus, CheckMetadata, Remediation, RemediationCode, RemediationRecommendation +from library.aws.checks.inspector.inspector_ec2_scan_enabled import inspector_ec2_scan_enabled + + +class TestInspectorEC2ScanEnabled: + def setup_method(self): + metadata = CheckMetadata( + Provider="aws", + CheckID="inspector_ec2_scan_enabled", + CheckTitle="Ensure Inspector EC2 standard scan is enabled", + CheckType=["security"], + ServiceName="inspector2", + SubServiceName="ec2", + ResourceIdTemplate="", + Severity="high", + ResourceType="account", + Risk="If Inspector EC2 scans are not enabled, you might miss vulnerabilities on your EC2 instances.", + RelatedUrl="https://docs.aws.amazon.com/inspector/latest/user/ec2-scan.html", + Remediation=Remediation( + Code=RemediationCode( + CLI="aws inspector2 enable --resource-types EC2", + Terraform=None, + NativeIaC=None, + Other=None, + ), + Recommendation=RemediationRecommendation( + Text="Enable Inspector2 EC2 scanning.", + Url="https://docs.aws.amazon.com/inspector/latest/user/ec2-scan.html" + ) + ), + Description="Checks whether Inspector EC2 standard scan is enabled.", + Categories=["security", "compliance"] + ) + + self.check = inspector_ec2_scan_enabled(metadata) + self.mock_session = MagicMock() + self.mock_inspector_client = MagicMock() + self.mock_sts_client = MagicMock() + + self.mock_session.client.side_effect = lambda service: { + "inspector2": self.mock_inspector_client, + "sts": self.mock_sts_client + }[service] + + self.mock_sts_client.get_caller_identity.return_value = {"Account": "123456789012"} + + def set_inspector_response(self, status): + self.mock_inspector_client.batch_get_account_status.return_value = { + "accounts": [{ + "resourceState": { + "ec2": { + "status": status + } + } + }] + } + + def test_ec2_scan_enabled(self): + self.set_inspector_response("ENABLED") + report = self.check.execute(self.mock_session) + + assert report.status == CheckStatus.PASSED + assert any(r.summary is not None and "enabled" in r.summary.lower() for r in report.resource_ids_status) + + def test_ec2_scan_disabled(self): + self.set_inspector_response("DISABLED") + report = self.check.execute(self.mock_session) + + assert report.status == CheckStatus.FAILED + assert any(r.summary is not None and "not enabled" in r.summary.lower() for r in report.resource_ids_status) + + def test_ec2_scan_suspended(self): + self.set_inspector_response("SUSPENDED") + report = self.check.execute(self.mock_session) + + assert report.status == CheckStatus.FAILED + assert any(r.summary is not None and "suspended" in r.summary.lower() for r in report.resource_ids_status) + + def test_ec2_scan_transitional(self): + self.set_inspector_response("TRANSITIONING") + report = self.check.execute(self.mock_session) + + assert report.status == CheckStatus.UNKNOWN + assert any(r.summary is not None and "transitional" in r.summary.lower() for r in report.resource_ids_status) + + def test_api_failure(self): + self.mock_inspector_client.batch_get_account_status.side_effect = Exception("Simulated API failure") + + report = self.check.execute(self.mock_session) + + assert report.status == CheckStatus.UNKNOWN + assert any(r.summary is not None and "error checking" in r.summary.lower() for r in report.resource_ids_status) From 61924ad84b12c43dc037e6c4c433548caf35f320 Mon Sep 17 00:00:00 2001 From: Gunjan-Katre-Comprinno Date: Tue, 8 Jul 2025 20:11:00 +0000 Subject: [PATCH 2/2] Addressed the comments on PR Test Case for check inspector_ec2_scan_enabled --- .../test_inspector_ec2_scan_enabled.py | 66 ++++++++++++++----- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py b/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py index d35f348c..3bb38bd8 100644 --- a/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py +++ b/library/aws/tests/inspector/test_inspector_ec2_scan_enabled.py @@ -1,6 +1,13 @@ import pytest from unittest.mock import MagicMock -from tevico.engine.entities.report.check_model import CheckStatus, CheckMetadata, Remediation, RemediationCode, RemediationRecommendation +from botocore.exceptions import ClientError +from tevico.engine.entities.report.check_model import ( + CheckStatus, + CheckMetadata, + Remediation, + RemediationCode, + RemediationRecommendation, +) from library.aws.checks.inspector.inspector_ec2_scan_enabled import inspector_ec2_scan_enabled @@ -27,11 +34,11 @@ def setup_method(self): ), Recommendation=RemediationRecommendation( Text="Enable Inspector2 EC2 scanning.", - Url="https://docs.aws.amazon.com/inspector/latest/user/ec2-scan.html" - ) + Url="https://docs.aws.amazon.com/inspector/latest/user/ec2-scan.html", + ), ), Description="Checks whether Inspector EC2 standard scan is enabled.", - Categories=["security", "compliance"] + Categories=["security", "compliance"], ) self.check = inspector_ec2_scan_enabled(metadata) @@ -41,20 +48,24 @@ def setup_method(self): self.mock_session.client.side_effect = lambda service: { "inspector2": self.mock_inspector_client, - "sts": self.mock_sts_client + "sts": self.mock_sts_client, }[service] - self.mock_sts_client.get_caller_identity.return_value = {"Account": "123456789012"} + self.mock_sts_client.get_caller_identity.return_value = { + "Account": "123456789012" + } def set_inspector_response(self, status): self.mock_inspector_client.batch_get_account_status.return_value = { - "accounts": [{ - "resourceState": { - "ec2": { - "status": status + "accounts": [ + { + "resourceState": { + "ec2": { + "status": status + } } } - }] + ] } def test_ec2_scan_enabled(self): @@ -62,33 +73,56 @@ def test_ec2_scan_enabled(self): report = self.check.execute(self.mock_session) assert report.status == CheckStatus.PASSED - assert any(r.summary is not None and "enabled" in r.summary.lower() for r in report.resource_ids_status) + assert any(r.summary and "enabled" in r.summary.lower() for r in report.resource_ids_status) def test_ec2_scan_disabled(self): self.set_inspector_response("DISABLED") report = self.check.execute(self.mock_session) assert report.status == CheckStatus.FAILED - assert any(r.summary is not None and "not enabled" in r.summary.lower() for r in report.resource_ids_status) + assert any(r.summary and "not enabled" in r.summary.lower() for r in report.resource_ids_status) def test_ec2_scan_suspended(self): self.set_inspector_response("SUSPENDED") report = self.check.execute(self.mock_session) assert report.status == CheckStatus.FAILED - assert any(r.summary is not None and "suspended" in r.summary.lower() for r in report.resource_ids_status) + assert any(r.summary and "suspended" in r.summary.lower() for r in report.resource_ids_status) def test_ec2_scan_transitional(self): self.set_inspector_response("TRANSITIONING") report = self.check.execute(self.mock_session) assert report.status == CheckStatus.UNKNOWN - assert any(r.summary is not None and "transitional" in r.summary.lower() for r in report.resource_ids_status) + assert any(r.summary and "transitional" in r.summary.lower() for r in report.resource_ids_status) def test_api_failure(self): self.mock_inspector_client.batch_get_account_status.side_effect = Exception("Simulated API failure") + report = self.check.execute(self.mock_session) + assert report.status == CheckStatus.UNKNOWN + assert any("error checking" in r.summary.lower() for r in report.resource_ids_status) + + def test_ec2_scan_status_missing(self): + """Test when 'ec2' key is completely missing from resourceState.""" + self.mock_inspector_client.batch_get_account_status.return_value = { + "accounts": [{"resourceState": {}}] # ec2 key missing + } + report = self.check.execute(self.mock_session) + + assert report.status == CheckStatus.UNKNOWN + assert any( + "transitional" in r.summary.lower() or r.status == CheckStatus.UNKNOWN + for r in report.resource_ids_status + ) + + def test_client_error_handling(self): + """Test AWS ClientError (e.g., access denied).""" + self.mock_inspector_client.batch_get_account_status.side_effect = ClientError( + error_response={"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, + operation_name="BatchGetAccountStatus" + ) report = self.check.execute(self.mock_session) assert report.status == CheckStatus.UNKNOWN - assert any(r.summary is not None and "error checking" in r.summary.lower() for r in report.resource_ids_status) + assert any("access denied" in r.summary.lower() for r in report.resource_ids_status)