From 6c35433e453ca393cb4d462c1ef4d909ddad8690 Mon Sep 17 00:00:00 2001 From: Akansha Agarwal Date: Wed, 31 Dec 2025 04:35:25 +0000 Subject: [PATCH] Reapply "Add default log publishing concurrency (#1770)" (#1819) This reverts commit 562f735d6744bf9e9ef2b633a6b43938736f244b. --- .../sampleConfig/compass_linux_config.json | 1 + .../sampleConfig/complete_darwin_config.json | 1 + .../sampleConfig/complete_linux_config.json | 1 + .../sampleConfig/complete_windows_config.json | 1 + .../sampleConfig/config_with_env.json | 1 + .../tocwconfig/sampleConfig/log_filter.json | 2 +- .../sampleConfig/log_only_config_windows.json | 2 +- .../logs_and_kubernetes_config.json | 1 + .../sampleConfig/no_skip_log_timestamp.json | 1 + .../no_skip_log_timestamp_windows.json | 2 +- .../sampleConfig/skip_log_timestamp.json | 1 + .../skip_log_timestamp_default.json | 1 + .../skip_log_timestamp_default_windows.json | 1 + .../skip_log_timestamp_windows.json | 1 + .../windows_eventlog_only_config.json | 1 + .../translate/logs/constants/constants.go | 24 +++ .../files/collect_list/collect_list.go | 14 +- .../files/collect_list/ruleTimestampFormat.go | 13 +- .../logs/logs_collected/files/files.go | 3 +- .../logs/logs_collected/logs_collected.go | 15 +- translator/translate/logs/logs_test.go | 58 +++++++ translator/translate/logs/ruleConcurrency.go | 78 ++++++++- .../default_concurrency/logfile_only.json | 29 ++++ .../missing_timestamp.json | 36 ++++ .../windows_events_only.json | 29 ++++ translator/translate/util/visit.go | 124 ++++++++++++++ translator/translate/util/visit_test.go | 159 ++++++++++++++++++ 27 files changed, 567 insertions(+), 33 deletions(-) create mode 100644 translator/translate/logs/constants/constants.go create mode 100644 translator/translate/logs/testdata/default_concurrency/logfile_only.json create mode 100644 translator/translate/logs/testdata/default_concurrency/missing_timestamp.json create mode 100644 translator/translate/logs/testdata/default_concurrency/windows_events_only.json create mode 100644 translator/translate/util/visit.go create mode 100644 translator/translate/util/visit_test.go diff --git a/translator/tocwconfig/sampleConfig/compass_linux_config.json b/translator/tocwconfig/sampleConfig/compass_linux_config.json index 8dc32109b8..08e6732499 100755 --- a/translator/tocwconfig/sampleConfig/compass_linux_config.json +++ b/translator/tocwconfig/sampleConfig/compass_linux_config.json @@ -15,6 +15,7 @@ "deployment.environment": "agent-level-environment" }, "logs": { + "concurrency": 1, "metrics_collected": { "otlp": { "service.name": "otlp-level-service", diff --git a/translator/tocwconfig/sampleConfig/complete_darwin_config.json b/translator/tocwconfig/sampleConfig/complete_darwin_config.json index e01cf72349..ababbd1037 100755 --- a/translator/tocwconfig/sampleConfig/complete_darwin_config.json +++ b/translator/tocwconfig/sampleConfig/complete_darwin_config.json @@ -143,6 +143,7 @@ "endpoint_override": "https://monitoring-fips.us-west-2.amazonaws.com" }, "logs": { + "concurrency": 1, "metrics_collected": { "emf": { "service_address": "udp://127.0.0.1:25888" diff --git a/translator/tocwconfig/sampleConfig/complete_linux_config.json b/translator/tocwconfig/sampleConfig/complete_linux_config.json index 0513768c04..b5b7b6f530 100755 --- a/translator/tocwconfig/sampleConfig/complete_linux_config.json +++ b/translator/tocwconfig/sampleConfig/complete_linux_config.json @@ -246,6 +246,7 @@ "endpoint_override": "https://monitoring-fips.us-west-2.amazonaws.com" }, "logs": { + "concurrency": 1, "metrics_collected": { "emf": { "service_address": "udp://127.0.0.1:25888" diff --git a/translator/tocwconfig/sampleConfig/complete_windows_config.json b/translator/tocwconfig/sampleConfig/complete_windows_config.json index f528ca3428..ea1067fd07 100755 --- a/translator/tocwconfig/sampleConfig/complete_windows_config.json +++ b/translator/tocwconfig/sampleConfig/complete_windows_config.json @@ -150,6 +150,7 @@ "endpoint_override": "https://monitoring-fips.us-west-2.amazonaws.com" }, "logs": { + "concurrency": 1, "metrics_collected": { "emf": { "service_address": "udp://127.0.0.1:25888" diff --git a/translator/tocwconfig/sampleConfig/config_with_env.json b/translator/tocwconfig/sampleConfig/config_with_env.json index 097108921a..395c5c45a2 100644 --- a/translator/tocwconfig/sampleConfig/config_with_env.json +++ b/translator/tocwconfig/sampleConfig/config_with_env.json @@ -3,6 +3,7 @@ "region": "${ENV_REGION}" }, "logs": { + "concurrency": 1, "credentials": { "role_arn": "${ENV_CREDENTIALS_ROLE_ARN}" }, diff --git a/translator/tocwconfig/sampleConfig/log_filter.json b/translator/tocwconfig/sampleConfig/log_filter.json index 8ce5494f3b..126948bc46 100644 --- a/translator/tocwconfig/sampleConfig/log_filter.json +++ b/translator/tocwconfig/sampleConfig/log_filter.json @@ -3,6 +3,7 @@ "region": "us-east-1" }, "logs": { + "concurrency": 10, "logs_collected": { "files": { "collect_list": [ @@ -31,7 +32,6 @@ ] } }, - "concurrency": 10, "log_stream_name": "LOG_STREAM_NAME" } } diff --git a/translator/tocwconfig/sampleConfig/log_only_config_windows.json b/translator/tocwconfig/sampleConfig/log_only_config_windows.json index 549f3a9215..a01e9c45b7 100644 --- a/translator/tocwconfig/sampleConfig/log_only_config_windows.json +++ b/translator/tocwconfig/sampleConfig/log_only_config_windows.json @@ -1,5 +1,6 @@ { "logs": { + "concurrency": 10, "logs_collected": { "files": { "collect_list": [ @@ -36,7 +37,6 @@ ] } }, - "concurrency": 10, "log_stream_name": "LOG_STREAM_NAME" } } \ No newline at end of file diff --git a/translator/tocwconfig/sampleConfig/logs_and_kubernetes_config.json b/translator/tocwconfig/sampleConfig/logs_and_kubernetes_config.json index 5c536cc48b..a004c944d2 100644 --- a/translator/tocwconfig/sampleConfig/logs_and_kubernetes_config.json +++ b/translator/tocwconfig/sampleConfig/logs_and_kubernetes_config.json @@ -3,6 +3,7 @@ "region": "us-east-1" }, "logs": { + "concurrency": 1, "metrics_collected": { "emf": { }, diff --git a/translator/tocwconfig/sampleConfig/no_skip_log_timestamp.json b/translator/tocwconfig/sampleConfig/no_skip_log_timestamp.json index 4e04deb3de..1583e5f1bc 100644 --- a/translator/tocwconfig/sampleConfig/no_skip_log_timestamp.json +++ b/translator/tocwconfig/sampleConfig/no_skip_log_timestamp.json @@ -1,5 +1,6 @@ { "logs": { + "concurrency": 1, "logs_collected": { "files": { "collect_list": [ diff --git a/translator/tocwconfig/sampleConfig/no_skip_log_timestamp_windows.json b/translator/tocwconfig/sampleConfig/no_skip_log_timestamp_windows.json index fb7bf89559..68cdb07052 100644 --- a/translator/tocwconfig/sampleConfig/no_skip_log_timestamp_windows.json +++ b/translator/tocwconfig/sampleConfig/no_skip_log_timestamp_windows.json @@ -1,6 +1,6 @@ { - "logs": { + "concurrency": 1, "logs_collected": { "files": { "collect_list": [ diff --git a/translator/tocwconfig/sampleConfig/skip_log_timestamp.json b/translator/tocwconfig/sampleConfig/skip_log_timestamp.json index db134e261e..fdfdfdf4dc 100644 --- a/translator/tocwconfig/sampleConfig/skip_log_timestamp.json +++ b/translator/tocwconfig/sampleConfig/skip_log_timestamp.json @@ -3,6 +3,7 @@ "logfile": "/opt/tmp/a.log" }, "logs": { + "concurrency": 1, "logs_collected": { "files": { "collect_list": [ diff --git a/translator/tocwconfig/sampleConfig/skip_log_timestamp_default.json b/translator/tocwconfig/sampleConfig/skip_log_timestamp_default.json index 734df33a1b..62b2c14368 100644 --- a/translator/tocwconfig/sampleConfig/skip_log_timestamp_default.json +++ b/translator/tocwconfig/sampleConfig/skip_log_timestamp_default.json @@ -1,5 +1,6 @@ { "logs": { + "concurrency": 1, "logs_collected": { "files": { "collect_list": [ diff --git a/translator/tocwconfig/sampleConfig/skip_log_timestamp_default_windows.json b/translator/tocwconfig/sampleConfig/skip_log_timestamp_default_windows.json index 0ba8b224fb..a1ecca69cf 100644 --- a/translator/tocwconfig/sampleConfig/skip_log_timestamp_default_windows.json +++ b/translator/tocwconfig/sampleConfig/skip_log_timestamp_default_windows.json @@ -1,5 +1,6 @@ { "logs": { + "concurrency": 1, "logs_collected": { "files": { "collect_list": [ diff --git a/translator/tocwconfig/sampleConfig/skip_log_timestamp_windows.json b/translator/tocwconfig/sampleConfig/skip_log_timestamp_windows.json index 66394e40c7..751490ee62 100644 --- a/translator/tocwconfig/sampleConfig/skip_log_timestamp_windows.json +++ b/translator/tocwconfig/sampleConfig/skip_log_timestamp_windows.json @@ -3,6 +3,7 @@ "logfile": "c:\\tmp\\am.log" }, "logs": { + "concurrency": 1, "logs_collected": { "files": { "collect_list": [ diff --git a/translator/tocwconfig/sampleConfig/windows_eventlog_only_config.json b/translator/tocwconfig/sampleConfig/windows_eventlog_only_config.json index cc37ebf35a..647b82a37c 100644 --- a/translator/tocwconfig/sampleConfig/windows_eventlog_only_config.json +++ b/translator/tocwconfig/sampleConfig/windows_eventlog_only_config.json @@ -1,5 +1,6 @@ { "logs": { + "concurrency": 1, "logs_collected": { "windows_events": { "collect_list": [ diff --git a/translator/translate/logs/constants/constants.go b/translator/translate/logs/constants/constants.go new file mode 100644 index 0000000000..4a49313964 --- /dev/null +++ b/translator/translate/logs/constants/constants.go @@ -0,0 +1,24 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package constants + +const ( + SectionKeyLogsCollected = "logs_collected" +) + +// children of logs_collected +const ( + SectionKeyFiles = "files" +) + +// child of files +const ( + SectionKeyCollectList = "collect_list" +) + +// children of collect_list +const ( + SectionKeyFilePath = "file_path" + SectionKeyTimestampFormat = "timestamp_format" +) diff --git a/translator/translate/logs/logs_collected/files/collect_list/collect_list.go b/translator/translate/logs/logs_collected/files/collect_list/collect_list.go index aab1fb962c..ef7a8a19df 100644 --- a/translator/translate/logs/logs_collected/files/collect_list/collect_list.go +++ b/translator/translate/logs/logs_collected/files/collect_list/collect_list.go @@ -14,6 +14,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/jsonconfig/mergeJsonRule" "github.com/aws/amazon-cloudwatch-agent/translator/jsonconfig/mergeJsonUtil" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/constants" parent "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/logs_collected/files" logUtil "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util" ) @@ -21,12 +22,11 @@ import ( type Rule translator.Rule const ( - SectionKey = "collect_list" logConfigOutputFileName = "log-config.json" ) func GetCurPath() string { - curPath := parent.GetCurPath() + SectionKey + "/" + curPath := parent.GetCurPath() + constants.SectionKeyCollectList + "/" return curPath } @@ -48,8 +48,8 @@ type FileConfig struct { func (f *FileConfig) ApplyRule(input interface{}) (returnKey string, returnVal interface{}) { m := input.(map[string]interface{}) res := []interface{}{} - if translator.IsValid(input, SectionKey, GetCurPath()) { - configArr := m[SectionKey].([]interface{}) + if translator.IsValid(input, constants.SectionKeyCollectList, GetCurPath()) { + configArr := m[constants.SectionKeyCollectList].([]interface{}) for i := 0; i < len(configArr); i++ { Index += 1 result := map[string]interface{}{} @@ -122,11 +122,11 @@ func outputLogConfig(logConfigs []interface{}) { var MergeRuleMap = map[string]mergeJsonRule.MergeRule{} func (f *FileConfig) Merge(source map[string]interface{}, result map[string]interface{}) { - mergeJsonUtil.MergeList(source, result, SectionKey) + mergeJsonUtil.MergeList(source, result, constants.SectionKeyCollectList) } func init() { f := new(FileConfig) - parent.RegisterRule(SectionKey, f) - parent.MergeRuleMap[SectionKey] = f + parent.RegisterRule(constants.SectionKeyCollectList, f) + parent.MergeRuleMap[constants.SectionKeyCollectList] = f } diff --git a/translator/translate/logs/logs_collected/files/collect_list/ruleTimestampFormat.go b/translator/translate/logs/logs_collected/files/collect_list/ruleTimestampFormat.go index 34690e88c9..8abc912022 100644 --- a/translator/translate/logs/logs_collected/files/collect_list/ruleTimestampFormat.go +++ b/translator/translate/logs/logs_collected/files/collect_list/ruleTimestampFormat.go @@ -10,6 +10,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator" "github.com/aws/amazon-cloudwatch-agent/translator/context" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/constants" ) /* @@ -145,9 +146,9 @@ func (t *TimestampRegex) ApplyRule(input interface{}) (returnKey string, returnV //Convert the input string into []rune and iterate the map and build the output []rune m := input.(map[string]interface{}) //If user not specify the timestamp_format, then no config entry for "timestamp_layout" in TOML - if val, ok := m["timestamp_format"]; !ok { + if val, ok := m[constants.SectionKeyTimestampFormat]; !ok { return "", "" - } else if m["file_path"] == context.CurrentContext().GetAgentLogFile() { + } else if m[constants.SectionKeyFilePath] == context.CurrentContext().GetAgentLogFile() { fmt.Printf("timestamp_format set file_path : %s is the same as agent log file %s thus do not use timestamp_regex \n", m["file_path"], context.CurrentContext().GetAgentLogFile()) return "", "" } else { @@ -164,7 +165,7 @@ func (t *TimestampRegex) ApplyRule(input interface{}) (returnKey string, returnV res = "(" + res + ")" returnKey = "timestamp_regex" if _, err := regexp.Compile(res); err != nil { - translator.AddErrorMessages(GetCurPath()+"timestamp_format", fmt.Sprintf("Timestamp format %s is invalid", val)) + translator.AddErrorMessages(GetCurPath()+constants.SectionKeyTimestampFormat, fmt.Sprintf("Timestamp format %s is invalid", val)) return } returnVal = res @@ -181,9 +182,9 @@ func (t *TimestampLayout) ApplyRule(input interface{}) (returnKey string, return //Convert the input string into []rune and iterate the map and build the output []rune m := input.(map[string]interface{}) //If user not specify the timestamp_format, then no config entry for "timestamp_layout" in TOML - if val, ok := m["timestamp_format"]; !ok { + if val, ok := m[constants.SectionKeyTimestampFormat]; !ok { return "", "" - } else if m["file_path"] == context.CurrentContext().GetAgentLogFile() { + } else if m[constants.SectionKeyFilePath] == context.CurrentContext().GetAgentLogFile() { fmt.Printf("timestamp_format set file_path : %s is the same as agent log file %s thus do not use timestamp_layout \n", m["file_path"], context.CurrentContext().GetAgentLogFile()) return "", "" } else { @@ -232,5 +233,5 @@ func init() { t2 := new(TimestampRegex) t3 := new(Timezone) r := []Rule{t1, t2, t3} - RegisterRule("timestamp_format", r) + RegisterRule(constants.SectionKeyTimestampFormat, r) } diff --git a/translator/translate/logs/logs_collected/files/files.go b/translator/translate/logs/logs_collected/files/files.go index df2537aedc..6720a0c294 100644 --- a/translator/translate/logs/logs_collected/files/files.go +++ b/translator/translate/logs/logs_collected/files/files.go @@ -7,6 +7,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator" "github.com/aws/amazon-cloudwatch-agent/translator/jsonconfig/mergeJsonRule" "github.com/aws/amazon-cloudwatch-agent/translator/jsonconfig/mergeJsonUtil" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/constants" parent "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/logs_collected" ) @@ -16,7 +17,7 @@ type Files struct { } const ( - SectionKey = "files" + SectionKey = constants.SectionKeyFiles SectionMappedKey = "logfile" ) diff --git a/translator/translate/logs/logs_collected/logs_collected.go b/translator/translate/logs/logs_collected/logs_collected.go index 2f6f9f9c33..8b63c8d1eb 100644 --- a/translator/translate/logs/logs_collected/logs_collected.go +++ b/translator/translate/logs/logs_collected/logs_collected.go @@ -11,6 +11,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/jsonconfig/mergeJsonRule" "github.com/aws/amazon-cloudwatch-agent/translator/jsonconfig/mergeJsonUtil" parent "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/constants" ) type Rule translator.Rule @@ -25,10 +26,8 @@ var ( darwinMetricCollectRule = map[string]Rule{} ) -const SectionKey = "logs_collected" - func GetCurPath() string { - curPath := parent.GetCurPath() + SectionKey + "/" + curPath := parent.GetCurPath() + constants.SectionKeyLogsCollected + "/" return curPath } @@ -60,12 +59,12 @@ func (l *LogsCollected) ApplyRule(input interface{}) (returnKey string, returnVa log.Panicf("E! Unknown target platform: %s ", translator.GetTargetPlatform()) } - if _, ok := im[SectionKey]; !ok { + if _, ok := im[constants.SectionKeyLogsCollected]; !ok { returnKey = "" returnVal = "" } else { for _, rule := range targetRuleMap { - key, val := rule.ApplyRule(im[SectionKey]) + key, val := rule.ApplyRule(im[constants.SectionKeyLogsCollected]) if key == "inputs" { result = translator.MergeTwoUniqueMaps(result, val.(map[string]interface{})) } @@ -79,11 +78,11 @@ func (l *LogsCollected) ApplyRule(input interface{}) (returnKey string, returnVa var MergeRuleMap = map[string]mergeJsonRule.MergeRule{} func (l *LogsCollected) Merge(source map[string]interface{}, result map[string]interface{}) { - mergeJsonUtil.MergeMap(source, result, SectionKey, MergeRuleMap, GetCurPath()) + mergeJsonUtil.MergeMap(source, result, constants.SectionKeyLogsCollected, MergeRuleMap, GetCurPath()) } func init() { obj := new(LogsCollected) - parent.RegisterRule(SectionKey, obj) - parent.MergeRuleMap[SectionKey] = obj + parent.RegisterRule(constants.SectionKeyLogsCollected, obj) + parent.MergeRuleMap[constants.SectionKeyLogsCollected] = obj } diff --git a/translator/translate/logs/logs_test.go b/translator/translate/logs/logs_test.go index fe1dc0583d..35f0234224 100644 --- a/translator/translate/logs/logs_test.go +++ b/translator/translate/logs/logs_test.go @@ -6,10 +6,12 @@ package logs import ( "encoding/json" "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/aws/amazon-cloudwatch-agent/internal/util/testutil" "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" @@ -239,3 +241,59 @@ func TestLogs_ServiceAndEnvironmentMissing(t *testing.T) { assert.Equal(t, "my-service", GlobalLogConfig.ServiceName) assert.Equal(t, "ec2:group", GlobalLogConfig.DeploymentEnvironment) } + +func TestLogs_Concurrency(t *testing.T) { + l := new(Logs) + agent.Global_Config.Region = "us-east-1" + agent.Global_Config.RegionType = "any" + context.ResetContext() + + var input interface{} + err := json.Unmarshal([]byte(`{"logs":{"concurrency":10}}`), &input) + if err != nil { + assert.Fail(t, err.Error()) + } + _, _ = l.ApplyRule(input) + + assert.Equal(t, 10, GlobalLogConfig.Concurrency) +} + +func TestLogs_Concurrency_Default(t *testing.T) { + l := new(Logs) + agent.Global_Config.Region = "us-east-1" + agent.Global_Config.RegionType = "any" + context.ResetContext() + context.CurrentContext().SetAgentLogFile("/tmp/amazon-cloudwatch-agent.log") + + testCases := map[string]struct { + input map[string]any + want int + }{ + "WithMissingLogsCollected": { + input: map[string]any{ + "logs": map[string]any{}, + }, + want: -1, + }, + "WithLogFileOnly": { + input: testutil.GetJson(t, filepath.Join("testdata", "default_concurrency", "logfile_only.json")), + want: defaultConcurrency, + }, + "WithWindowsEventsOnly": { + input: testutil.GetJson(t, filepath.Join("testdata", "default_concurrency", "windows_events_only.json")), + want: defaultConcurrency, + }, + "WithMissingTimestampFormat/LogFile": { + input: testutil.GetJson(t, filepath.Join("testdata", "default_concurrency", "missing_timestamp.json")), + want: -1, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + _, _ = l.ApplyRule(testCase.input) + + assert.Equal(t, testCase.want, GlobalLogConfig.Concurrency) + }) + } +} diff --git a/translator/translate/logs/ruleConcurrency.go b/translator/translate/logs/ruleConcurrency.go index 04264ac5e1..fa680f3271 100644 --- a/translator/translate/logs/ruleConcurrency.go +++ b/translator/translate/logs/ruleConcurrency.go @@ -3,25 +3,87 @@ package logs -import "github.com/aws/amazon-cloudwatch-agent/translator" +import ( + "errors" + "runtime" -const ConcurrencySectionKey = "concurrency" + "github.com/aws/amazon-cloudwatch-agent/translator/context" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/constants" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/util" +) + +const ( + ConcurrencySectionKey = "concurrency" + disableConcurrency = -1 +) + +var ( + logFileCollectListPath = util.Path(constants.SectionKeyLogsCollected, constants.SectionKeyFiles, constants.SectionKeyCollectList) + checkTimestampFormatVisitor = util.NewSliceVisitor(util.NewVisitor(isMissingTimestampFormat)) + // based on AWS Java SDK default + defaultConcurrency = max(runtime.NumCPU(), 8) +) type Concurrency struct { } func (c *Concurrency) ApplyRule(input any) (string, any) { - result := map[string]interface{}{} - _, val := translator.DefaultCase(ConcurrencySectionKey, float64(0), input) - var concurrency int - if v, ok := val.(float64); ok && v > 1 { - concurrency = int(v) + result := map[string]any{} + concurrency := getConcurrency(input) + if concurrency > 1 { result[ConcurrencySectionKey] = concurrency + GlobalLogConfig.Concurrency = concurrency + } else { + GlobalLogConfig.Concurrency = disableConcurrency } - GlobalLogConfig.Concurrency = concurrency return Output_Cloudwatch_Logs, result } +func getConcurrency(input any) int { + m, ok := input.(map[string]any) + if !ok { + return disableConcurrency + } + v, ok := m[ConcurrencySectionKey].(float64) + if ok { + return int(v) + } + if _, ok = m[constants.SectionKeyLogsCollected]; !ok { + return disableConcurrency + } + return determineDefault(m) +} + +// determineDefault determines the default concurrency if not set. Will not set a default if timestamp_format is +// missing in the configuration for the files being collected. +func determineDefault(input any) int { + if isMissingAnyTimestampFormat(input, logFileCollectListPath) { + return disableConcurrency + } + return defaultConcurrency +} + +func isMissingAnyTimestampFormat(input any, path string) bool { + return errors.Is(util.Visit(input, path, checkTimestampFormatVisitor), util.ErrTargetNotFound) +} + +func isMissingTimestampFormat(input any) error { + m, ok := input.(map[string]any) + if !ok { + return util.ErrTargetNotFound + } + filePath, ok := m[constants.SectionKeyFilePath] + // skip the agent log file if configured as timestamp format is not supported https://github.com/aws/amazon-cloudwatch-agent/pull/885 + if ok && filePath == context.CurrentContext().GetAgentLogFile() { + return nil + } + _, ok = m[constants.SectionKeyTimestampFormat] + if !ok { + return util.ErrTargetNotFound + } + return nil +} + func init() { RegisterRule(ConcurrencySectionKey, new(Concurrency)) } diff --git a/translator/translate/logs/testdata/default_concurrency/logfile_only.json b/translator/translate/logs/testdata/default_concurrency/logfile_only.json new file mode 100644 index 0000000000..2c8e166eac --- /dev/null +++ b/translator/translate/logs/testdata/default_concurrency/logfile_only.json @@ -0,0 +1,29 @@ +{ + "logs": { + "logs_collected": { + "files": { + "collect_list": [ + { + "file_path": "/tmp/test1.log", + "log_group_name": "test.log", + "log_stream_name": "test.log", + "timezone": "UTC", + "timestamp_format": "%H:%M:%S %y %b %d" + }, + { + "file_path": "/tmp/amazon-cloudwatch-agent.log", + "log_group_name": "amazon-cloudwatch-agent.log", + "log_stream_name": "amazon-cloudwatch-agent.log", + "timezone": "UTC" + }, + { + "file_path": "/tmp/test3.log", + "log_group_name": "test.log", + "log_stream_name": "test.log", + "timestamp_format": "%H:%M:%S %y %b %d" + } + ] + } + } + } +} diff --git a/translator/translate/logs/testdata/default_concurrency/missing_timestamp.json b/translator/translate/logs/testdata/default_concurrency/missing_timestamp.json new file mode 100644 index 0000000000..cffa69f3e8 --- /dev/null +++ b/translator/translate/logs/testdata/default_concurrency/missing_timestamp.json @@ -0,0 +1,36 @@ +{ + "agent": { + "region": "us-east-1" + }, + "logs": { + "logs_collected": { + "files": { + "collect_list": [ + { + "file_path": "/tmp/amazon-cloudwatch-agent.log", + "log_group_name": "amazon-cloudwatch-agent.log", + "log_stream_name": "amazon-cloudwatch-agent.log" + }, + { + "file_path": "/tmp/test1.log", + "log_group_name": "test.log", + "log_stream_name": "test.log", + "timezone": "UTC", + "timestamp_format": "%H:%M:%S %y %b %d" + }, + { + "file_path": "/tmp/test2.log", + "log_group_name": "test.log", + "log_stream_name": "test.log" + }, + { + "file_path": "/tmp/test3.log", + "log_group_name": "test.log", + "log_stream_name": "test.log", + "timestamp_format": "%H:%M:%S %y %b %d" + } + ] + } + } + } +} diff --git a/translator/translate/logs/testdata/default_concurrency/windows_events_only.json b/translator/translate/logs/testdata/default_concurrency/windows_events_only.json new file mode 100644 index 0000000000..cc37ebf35a --- /dev/null +++ b/translator/translate/logs/testdata/default_concurrency/windows_events_only.json @@ -0,0 +1,29 @@ +{ + "logs": { + "logs_collected": { + "windows_events": { + "collect_list": [ + { + "event_name": "System", + "event_levels": [ + "INFORMATION", + "ERROR" + ], + "log_group_name": "System", + "log_stream_name": "System" + }, + { + "event_name": "Application", + "event_levels": [ + "INFORMATION", + "ERROR" + ], + "log_group_name": "Application", + "log_stream_name": "Application" + } + ] + } + }, + "log_stream_name": "log_stream_name" + } +} diff --git a/translator/translate/util/visit.go b/translator/translate/util/visit.go new file mode 100644 index 0000000000..e72033ba8d --- /dev/null +++ b/translator/translate/util/visit.go @@ -0,0 +1,124 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package util + +import ( + "errors" + "fmt" + "strings" +) + +const ( + Delimiter = "/" +) + +var ( + ErrNoKeysFound = errors.New("no keys found") + ErrUnsupportedType = errors.New("unsupported type") + ErrPathNotFound = errors.New("path element not found") + ErrTargetNotFound = errors.New("target element not found") +) + +type Visitor interface { + Visit(value any) error +} + +type funcVisitor func(value any) error + +var _ Visitor = (funcVisitor)(nil) + +func NewVisitor(fn func(value any) error) Visitor { + return funcVisitor(fn) +} + +func (v funcVisitor) Visit(value any) error { + return v(value) +} + +// SliceVisitor visits each element of a slice with the next visitor +type SliceVisitor struct { + next Visitor +} + +var _ Visitor = (*SliceVisitor)(nil) + +func NewSliceVisitor(next Visitor) *SliceVisitor { + return &SliceVisitor{next: next} +} + +func (v *SliceVisitor) Visit(value any) error { + s, ok := value.([]any) + if !ok { + return fmt.Errorf("%w: %T", ErrUnsupportedType, value) + } + var errs []error + for _, element := range s { + if err := v.next.Visit(element); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} + +// VisitPath represents a path through a nested map structure with each key separated by a Delimiter. +type VisitPath = string + +// Path creates a VisitPath based on the keys. +func Path(keys ...string) VisitPath { + return strings.Join(keys, Delimiter) +} + +// Visit traverses the input following the path and calls the visitor with the result at the end of the path. If the +// path contains a slice, traverses each branch. +func Visit(input any, path VisitPath, visitor Visitor) error { + if path == "" { + return ErrNoKeysFound + } + keys := strings.Split(path, Delimiter) + return visit(input, keys, visitor) +} + +func visit(input any, keys []string, visitor Visitor) error { + if len(keys) == 0 { + if visitor == nil { + return nil + } + return visitor.Visit(input) + } + switch current := input.(type) { + case map[string]any: + key := keys[0] + value, ok := current[keys[0]] + if !ok { + baseErr := ErrPathNotFound + if len(keys) == 1 { + baseErr = ErrTargetNotFound + } + return fmt.Errorf("%w: %s", baseErr, key) + } + return visit(value, keys[1:], visitor) + case []any: + var errs []error + for _, element := range current { + if err := visit(element, keys, visitor); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + default: + return fmt.Errorf("%w: %T", ErrUnsupportedType, current) + } + return nil +} + +// IsSet checks if the path exists in the input. +func IsSet(input any, path VisitPath) bool { + err := Visit(input, path, nil) + return err == nil +} diff --git a/translator/translate/util/visit_test.go b/translator/translate/util/visit_test.go new file mode 100644 index 0000000000..7756597ba6 --- /dev/null +++ b/translator/translate/util/visit_test.go @@ -0,0 +1,159 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +var testInput = map[string]any{ + "1": map[string]any{ + "1": []any{ + map[string]any{ + "key": "v1", + }, + map[string]any{ + "key": "v2", + "options": []any{"o1", "o2"}, + }, + }, + "2": map[string]any{ + "key": "v3", + }, + }, + "2": []any{"v4", "v5"}, +} + +type collectVisitor struct { + got []any +} + +var _ Visitor = (*collectVisitor)(nil) + +func (v *collectVisitor) Visit(input any) error { + v.got = append(v.got, input) + return nil +} + +func TestVisit(t *testing.T) { + testCases := map[string]struct { + path string + visitor Visitor + want []any + wantErr error + }{ + "EmptyPath": { + path: Path(""), + wantErr: ErrNoKeysFound, + }, + "InvalidPath": { + path: Path("1", "invalid", "2"), + wantErr: ErrPathNotFound, + }, + "InvalidTarget": { + path: Path("1", "invalid"), + wantErr: ErrTargetNotFound, + }, + "UnsupportedType": { + path: Path("1", "2", "key", "invalid"), + wantErr: ErrUnsupportedType, + }, + "ValidTarget": { + path: Path("1", "2"), + want: []any{ + map[string]any{ + "key": "v3", + }, + }, + }, + "SliceInPath": { + path: Path("1", "1", "key"), + want: []any{"v1", "v2"}, + }, + "PartialTargetMatch": { + path: Path("1", "1", "options"), + want: []any{ + []any{"o1", "o2"}, + }, + wantErr: ErrTargetNotFound, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + v := &collectVisitor{} + assert.ErrorIs(t, Visit(testInput, testCase.path, v), testCase.wantErr) + assert.Equal(t, testCase.want, v.got) + }) + } +} + +func TestIsSet(t *testing.T) { + testCases := map[string]struct { + path string + want bool + }{ + "EmptyPath": { + path: "", + want: false, + }, + "ExistingTarget": { + path: Path("1", "2", "key"), + want: true, + }, + "NonExisting/Target": { + path: Path("1", "2", "missing"), + want: false, + }, + "NonExisting/Path": { + path: Path("1", "missing", "2"), + want: false, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + assert.Equal(t, testCase.want, IsSet(testInput, testCase.path)) + }) + } +} + +func TestSliceVisitor(t *testing.T) { + cv := &collectVisitor{} + v := NewSliceVisitor(cv) + assert.NoError(t, Visit(testInput, Path("2"), v)) + assert.Equal(t, []any{"v4", "v5"}, cv.got) + + assert.ErrorIs(t, Visit(testInput, Path("1"), v), ErrUnsupportedType) + + v = NewSliceVisitor(NewVisitor(func(any) error { + return assert.AnError + })) + assert.ErrorIs(t, Visit(testInput, Path("2"), v), assert.AnError) +} + +func TestPath(t *testing.T) { + testCases := map[string]struct { + keys []string + want string + }{ + "Empty": { + keys: nil, + want: "", + }, + "SingleKey": { + keys: []string{"key"}, + want: "key", + }, + "MultipleKeys": { + keys: []string{"path", "to", "key"}, + want: "path/to/key", + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.want, Path(tc.keys...)) + }) + } +}