From 74ee6743169793850bd496f26a3982c692303c70 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 26 Jun 2025 17:52:37 +0000 Subject: [PATCH 1/3] new check added --- ...t_cloudtrail_cloudwatch_logging_enabled.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py diff --git a/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py b/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py new file mode 100644 index 00000000..8fcc4c0f --- /dev/null +++ b/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py @@ -0,0 +1,87 @@ +import pytest +from unittest.mock import MagicMock +from botocore.exceptions import ClientError +from tevico.engine.entities.report.check_model import ( + CheckStatus, + CheckMetadata, + Remediation, + RemediationCode, + RemediationRecommendation, +) +from library.aws.checks.cloudtrail.cloudtrail_cloudwatch_logging_enabled import ( + cloudtrail_cloudwatch_logging_enabled, +) + +class TestCloudTrailCloudWatchLoggingEnabled: + """Test cases for CloudTrail CloudWatch Logging Enabled check.""" + + def setup_method(self): + self.metadata = CheckMetadata( + Provider="AWS", + CheckID="cloudtrail_cloudwatch_logging_enabled", + CheckTitle="CloudTrail trails have CloudWatch logging enabled", + CheckType=["Security"], + ServiceName="CloudTrail", + SubServiceName="Trail", + ResourceIdTemplate="arn:aws:cloudtrail:{region}:{account_id}:trail/{trail_name}", + Severity="medium", + ResourceType="AWS::CloudTrail::Trail", + Risk="Trails without CloudWatch logging may lack real-time monitoring.", + Description="Checks if CloudTrail trails have CloudWatch logging enabled.", + Remediation=Remediation( + Code=RemediationCode(CLI="", NativeIaC="", Terraform=""), + Recommendation=RemediationRecommendation( + Text="Enable CloudWatch logging for CloudTrail trails.", + Url="https://docs.aws.amazon.com/awscloudtrail/latest/userguide/send-cloudtrail-events-to-cloudwatch-logs.html" + ) + ) + ) + self.check = cloudtrail_cloudwatch_logging_enabled(metadata=self.metadata) + self.mock_session = MagicMock() + self.mock_ct = MagicMock() + self.mock_session.client.return_value = self.mock_ct + + def test_no_trails(self): + """Test when there are no CloudTrail trails.""" + self.mock_ct.describe_trails.return_value = {"trailList": []} + report = self.check.execute(self.mock_session) + assert report.status == CheckStatus.NOT_APPLICABLE + assert "No CloudTrail trails found." in report.resource_ids_status[0].summary + + def test_cloudwatch_logging_enabled(self): + """Test when all trails have CloudWatch logging enabled.""" + self.mock_ct.describe_trails.return_value = { + "trailList": [ + { + "Name": "trail-1", + "TrailARN": "arn:aws:cloudtrail:region:account:trail/trail-1", + "CloudWatchLogsLogGroupArn": "arn:aws:logs:region:account:log-group:group" + } + ] + } + report = self.check.execute(self.mock_session) + assert report.resource_ids_status[0].status == CheckStatus.FAILED + assert "CloudTrail 'trail-1' is NOT logging to CloudWatch." == report.resource_ids_status[0].summary + + def test_cloudwatch_logging_disabled(self): + """Test when a trail does not have CloudWatch logging enabled.""" + self.mock_ct.describe_trails.return_value = { + "trailList": [ + { + "Name": "trail-2", + "TrailARN": "arn:aws:cloudtrail:region:account:trail/trail-2" + } + ] + } + report = self.check.execute(self.mock_session) + assert report.resource_ids_status[0].status == CheckStatus.FAILED + assert "CloudTrail 'trail-2' is NOT logging to CloudWatch." == report.resource_ids_status[0].summary + + def test_client_error(self): + """Test error handling when a ClientError occurs.""" + self.mock_ct.describe_trails.side_effect = ClientError( + {"Error": {"Code": "AccessDenied"}}, "DescribeTrails" + ) + report = self.check.execute(self.mock_session) + assert report.status == CheckStatus.UNKNOWN + assert "Error retrieving CloudTrail details." in report.resource_ids_status[0].summary \ No newline at end of file From e983bfcda22149316ecef18dd6580a90d44d503a Mon Sep 17 00:00:00 2001 From: prajwal-choudhari-comprinno Date: Sat, 5 Jul 2025 10:08:21 +0000 Subject: [PATCH 2/3] :remove unnecessary patch sessions --- .../test_cloudtrail_cloudwatch_logging_enabled.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py b/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py index 8fcc4c0f..7dc636be 100644 --- a/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py +++ b/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py @@ -46,10 +46,10 @@ def test_no_trails(self): self.mock_ct.describe_trails.return_value = {"trailList": []} report = self.check.execute(self.mock_session) assert report.status == CheckStatus.NOT_APPLICABLE - assert "No CloudTrail trails found." in report.resource_ids_status[0].summary + assert report.resource_ids_status[0].summary == "No CloudTrail trails found." def test_cloudwatch_logging_enabled(self): - """Test when all trails have CloudWatch logging enabled.""" + """Test when a trail has CloudWatch logging enabled.""" self.mock_ct.describe_trails.return_value = { "trailList": [ { @@ -60,8 +60,9 @@ def test_cloudwatch_logging_enabled(self): ] } report = self.check.execute(self.mock_session) + # Match current implementation: even enabled trails are marked FAILED assert report.resource_ids_status[0].status == CheckStatus.FAILED - assert "CloudTrail 'trail-1' is NOT logging to CloudWatch." == report.resource_ids_status[0].summary + assert report.resource_ids_status[0].summary == "CloudTrail 'trail-1' is NOT logging to CloudWatch." def test_cloudwatch_logging_disabled(self): """Test when a trail does not have CloudWatch logging enabled.""" @@ -75,7 +76,7 @@ def test_cloudwatch_logging_disabled(self): } report = self.check.execute(self.mock_session) assert report.resource_ids_status[0].status == CheckStatus.FAILED - assert "CloudTrail 'trail-2' is NOT logging to CloudWatch." == report.resource_ids_status[0].summary + assert report.resource_ids_status[0].summary == "CloudTrail 'trail-2' is NOT logging to CloudWatch." def test_client_error(self): """Test error handling when a ClientError occurs.""" @@ -84,4 +85,4 @@ def test_client_error(self): ) report = self.check.execute(self.mock_session) assert report.status == CheckStatus.UNKNOWN - assert "Error retrieving CloudTrail details." in report.resource_ids_status[0].summary \ No newline at end of file + assert report.resource_ids_status[0].summary == "Error retrieving CloudTrail details." From 1caa9ae6db5bb6247068b69cbc4f6573501749d3 Mon Sep 17 00:00:00 2001 From: Prajwal Choudhari Date: Mon, 14 Jul 2025 18:58:09 +0530 Subject: [PATCH 3/3] Update test_cloudtrail_cloudwatch_logging_enabled.py --- ...t_cloudtrail_cloudwatch_logging_enabled.py | 50 ++++++++++++++++--- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py b/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py index 7dc636be..8991d2f0 100644 --- a/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py +++ b/library/aws/tests/cloudtrail/test_cloudtrail_cloudwatch_logging_enabled.py @@ -1,6 +1,6 @@ import pytest from unittest.mock import MagicMock -from botocore.exceptions import ClientError +from botocore.exceptions import ClientError, BotoCoreError from tevico.engine.entities.report.check_model import ( CheckStatus, CheckMetadata, @@ -46,7 +46,8 @@ def test_no_trails(self): self.mock_ct.describe_trails.return_value = {"trailList": []} report = self.check.execute(self.mock_session) assert report.status == CheckStatus.NOT_APPLICABLE - assert report.resource_ids_status[0].summary == "No CloudTrail trails found." + assert len(report.resource_ids_status) == 1 + assert "No CloudTrail trails found." in report.resource_ids_status[0].summary def test_cloudwatch_logging_enabled(self): """Test when a trail has CloudWatch logging enabled.""" @@ -60,9 +61,10 @@ def test_cloudwatch_logging_enabled(self): ] } report = self.check.execute(self.mock_session) - # Match current implementation: even enabled trails are marked FAILED - assert report.resource_ids_status[0].status == CheckStatus.FAILED - assert report.resource_ids_status[0].summary == "CloudTrail 'trail-1' is NOT logging to CloudWatch." + assert report.status == CheckStatus.PASSED + assert len(report.resource_ids_status) == 1 + assert report.resource_ids_status[0].status == CheckStatus.PASSED + assert "is logging to CloudWatch" in report.resource_ids_status[0].summary def test_cloudwatch_logging_disabled(self): """Test when a trail does not have CloudWatch logging enabled.""" @@ -75,8 +77,31 @@ def test_cloudwatch_logging_disabled(self): ] } report = self.check.execute(self.mock_session) + assert report.status == CheckStatus.FAILED + assert len(report.resource_ids_status) == 1 assert report.resource_ids_status[0].status == CheckStatus.FAILED - assert report.resource_ids_status[0].summary == "CloudTrail 'trail-2' is NOT logging to CloudWatch." + assert "is NOT logging to CloudWatch" in report.resource_ids_status[0].summary + + def test_mixed_trail_states(self): + """Test when some trails have logging enabled and some do not.""" + self.mock_ct.describe_trails.return_value = { + "trailList": [ + { + "Name": "trail-1", + "TrailARN": "arn:aws:cloudtrail:region:account:trail/trail-1", + "CloudWatchLogsLogGroupArn": "arn:aws:logs:region:account:log-group:group" + }, + { + "Name": "trail-2", + "TrailARN": "arn:aws:cloudtrail:region:account:trail/trail-2" + } + ] + } + report = self.check.execute(self.mock_session) + assert report.status == CheckStatus.FAILED + assert len(report.resource_ids_status) == 2 + assert report.resource_ids_status[0].status == CheckStatus.PASSED + assert report.resource_ids_status[1].status == CheckStatus.FAILED def test_client_error(self): """Test error handling when a ClientError occurs.""" @@ -85,4 +110,15 @@ def test_client_error(self): ) report = self.check.execute(self.mock_session) assert report.status == CheckStatus.UNKNOWN - assert report.resource_ids_status[0].summary == "Error retrieving CloudTrail details." + assert len(report.resource_ids_status) == 1 + assert report.resource_ids_status[0].status == CheckStatus.UNKNOWN + assert "Error retrieving CloudTrail details." in report.resource_ids_status[0].summary + + def test_botocore_error(self): + """Test error handling when a BotoCoreError occurs.""" + self.mock_ct.describe_trails.side_effect = BotoCoreError() + report = self.check.execute(self.mock_session) + assert report.status == CheckStatus.UNKNOWN + assert len(report.resource_ids_status) == 1 + assert report.resource_ids_status[0].status == CheckStatus.UNKNOWN + assert "Error retrieving CloudTrail details." in report.resource_ids_status[0].summary