diff --git a/.golangci.yml b/.golangci.yml index 9211a474aadc3f..5bd6c48daa18b4 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -147,6 +147,7 @@ linters: - "!**/comp/dogstatsd/server/float64_list_pool.go" - "!**/comp/dogstatsd/server/serverless.go" - "!**/comp/dogstatsd/serverDebug/serverdebugimpl/debug.go" + - "!**/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go" - "!**/comp/forwarder/connectionsforwarder/impl/connectionsforwarder.go" - "!**/comp/forwarder/defaultforwarder/blocked_endpoints.go" - "!**/comp/forwarder/defaultforwarder/default_forwarder.go" diff --git a/cmd/agent/subcommands/dogstatsdstats/command.go b/cmd/agent/subcommands/dogstatsdstats/command.go index 36b4367c0350cd..425fcb18a4e885 100644 --- a/cmd/agent/subcommands/dogstatsdstats/command.go +++ b/cmd/agent/subcommands/dogstatsdstats/command.go @@ -22,7 +22,7 @@ import ( ipcfx "github.com/DataDog/datadog-agent/comp/core/ipc/fx" ipchttp "github.com/DataDog/datadog-agent/comp/core/ipc/httphelpers" log "github.com/DataDog/datadog-agent/comp/core/log/def" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" + serverdebugimpl "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpltopk" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/DataDog/datadog-agent/pkg/util/input" diff --git a/comp/dogstatsd/bundle.go b/comp/dogstatsd/bundle.go index 01c9dfac06e946..6bd3960adc348a 100644 --- a/comp/dogstatsd/bundle.go +++ b/comp/dogstatsd/bundle.go @@ -9,7 +9,7 @@ import ( "github.com/DataDog/datadog-agent/comp/dogstatsd/pidmap/pidmapimpl" replayfx "github.com/DataDog/datadog-agent/comp/dogstatsd/replay/fx" "github.com/DataDog/datadog-agent/comp/dogstatsd/server" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" + serverdebugimpl "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpltopk" "github.com/DataDog/datadog-agent/comp/dogstatsd/status/statusimpl" "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go new file mode 100644 index 00000000000000..dcc1e2b00aea61 --- /dev/null +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -0,0 +1,491 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package serverdebugimpltopk implements a component to run the dogstatsd server debug +package serverdebugimpltopk + +import ( + "bytes" + "encoding/json" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/twmb/murmur3" + "go.uber.org/atomic" + "go.uber.org/fx" + + configComponent "github.com/DataDog/datadog-agent/comp/core/config" + log "github.com/DataDog/datadog-agent/comp/core/log/def" + logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/impl" + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" + "github.com/DataDog/datadog-agent/pkg/config/model" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/util/defaultpaths" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + pkglog "github.com/DataDog/datadog-agent/pkg/util/log" + pkglogsetup "github.com/DataDog/datadog-agent/pkg/util/log/setup" +) + +// Module defines the fx options for this component. +func Module() fxutil.Module { + return fxutil.Component( + fx.Provide(newServerDebug)) +} + +type dependencies struct { + fx.In + + Log log.Component + Config configComponent.Component +} + +// metricStat holds how many times a metric has been +// processed and when was the last time. +type metricStat struct { + key ckey.ContextKey + Name string `json:"name"` + Tags string `json:"tags"` + Count uint64 `json:"count"` + LastSeen time.Time `json:"last_seen"` + //error uint32 // overestimation bound + heapIndex int // position in minHeap for O(1) lookup +} + +// metricStatsShard holds a subset of metric stats with its own lock +// to allow concurrent access to different shards +type metricStatsShard struct { + sync.RWMutex + stats map[ckey.ContextKey]*metricStat + minHeap []*metricStat + //tagsAccumulator *tagset.HashingTagsAccumulator +} + +const ( + // Defaults to use to pass quality gates + defaultNumShards = uint64(2) // Power of 2 for efficient modulo operation + defaultMaxItems = 50 +) + +type serverDebugImpl struct { + sync.RWMutex + log log.Component + enabled *atomic.Bool + shards []*metricStatsShard + numShards uint64 + // counting number of metrics processed last X seconds + metricsCounts metricsCountBuckets + // keyGen is used to generate hashes of the metrics received by dogstatsd + keyGen *ckey.KeyGenerator + + // clock is used to keep a consistent time state within the debug server whether + // we use a real clock in production code or a mock clock for unit testing + clock clock.Clock + // dogstatsdDebugLogger is an instance of the logger config that can be used to create new logger for dogstatsd-stats metrics + dogstatsdDebugLogger pkglog.LoggerInterface + maxItems int +} + +// NewServerlessServerDebug creates a new instance of serverDebug.Component +func NewServerlessServerDebug() serverdebug.Component { + return newServerDebugCompat(logComponentImpl.NewTemporaryLoggerWithoutInit(), pkgconfigsetup.Datadog()) +} + +// newServerDebug creates a new instance of a ServerDebug +func newServerDebug(deps dependencies) serverdebug.Component { + return newServerDebugCompat(deps.Log, deps.Config) +} + +func newServerDebugCompat(l log.Component, cfg model.Reader) serverdebug.Component { + numShards := uint64(cfg.GetInt("dogstatsd_metrics_stats_num_shards")) + if numShards < 1 { + numShards = defaultNumShards + } + maxItems := cfg.GetInt("dogstatsd_metrics_stats_max_items") + if maxItems < 0 { + maxItems = defaultMaxItems + } + sd := &serverDebugImpl{ + log: l, + enabled: atomic.NewBool(false), + metricsCounts: metricsCountBuckets{ + counts: [5]uint64{0, 0, 0, 0, 0}, + metricChan: make(chan struct{}), + closeChan: make(chan struct{}), + }, + keyGen: ckey.NewKeyGenerator(), + clock: clock.New(), + shards: make([]*metricStatsShard, numShards), + numShards: numShards, + maxItems: maxItems, + } + // Initialize all shards + for i := uint64(0); i < sd.numShards; i++ { + sd.shards[i] = &metricStatsShard{ + stats: make(map[ckey.ContextKey]*metricStat), + minHeap: make([]*metricStat, 0, sd.maxItems), + //tagsAccumulator: tagset.NewHashingTagsAccumulator(), + } + } + + sd.dogstatsdDebugLogger = sd.getDogstatsdDebug(cfg) + + return sd +} + +// metricsCountBuckets is counting the amount of metrics received for the last 5 seconds. +// It is used to detect spikes. +type metricsCountBuckets struct { + counts [5]uint64 + bucketIdx int + currentSec time.Time + metricChan chan struct{} + closeChan chan struct{} +} + +// FormatDebugStats returns a printable version of debug stats. +func FormatDebugStats(stats []byte) (string, error) { + var dogStats map[uint64]metricStat + if err := json.Unmarshal(stats, &dogStats); err != nil { + return "", err + } + + // put metrics in order: first is the more frequent + order := make([]uint64, len(dogStats)) + i := 0 + for metric := range dogStats { + order[i] = metric + i++ + } + + sort.Slice(order, func(i, j int) bool { + return dogStats[order[i]].Count > dogStats[order[j]].Count + }) + + // write the response + buf := bytes.NewBuffer(nil) + + header := fmt.Sprintf("%-40s | %-20s | %-10s | %s\n", "Metric", "Tags", "Count", "Last Seen") + buf.Write([]byte(header)) + buf.Write([]byte(strings.Repeat("-", 40) + "-|-" + strings.Repeat("-", 20) + "-|-" + strings.Repeat("-", 10) + "-|-" + strings.Repeat("-", 20) + "\n")) + + for _, key := range order { + dStats := dogStats[key] + buf.Write([]byte(fmt.Sprintf("%-40s | %-20s | %-10d | %-20v\n", dStats.Name, dStats.Tags, dStats.Count, dStats.LastSeen))) + } + + if len(dogStats) == 0 { + buf.Write([]byte("No metrics processed yet.")) + } + + return buf.String(), nil +} + +// StoreMetricStats stores stats on the given metric sample. +// +// It can help troubleshooting clients with bad behaviors. +func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { + if !d.enabled.Load() { + return + } + + now := d.clock.Now() + + // Determine which shard to use based on metric name hash + // Using a simple hash function for distribution + hash, tags := MakeKey(sample) + shardIdx := hash % d.numShards + shard := d.shards[shardIdx] + + // Lock only the specific shard, not the entire structure + shard.Lock() + defer shard.Unlock() + defer func() { + // Notify metrics count tracker + select { + case d.metricsCounts.metricChan <- struct{}{}: + default: + // Non-blocking send to avoid deadlock if channel is full + } + }() + + // Reset and populate tags accumulator for this shard + //shard.tagsAccumulator.Reset() + //shard.tagsAccumulator.Append(sample.Tags...) + + // Generate key for this metric + //key := d.keyGen.Generate(sample.Name, "", shard.tagsAccumulator) + key := ckey.ContextKey(hash) + + // Get or create metric stat + if ms, exists := shard.stats[key]; exists { + oldCount := ms.Count + ms.Count++ + ms.LastSeen = now + + // Re-heapify since count changed (moved up in rank) + if ms.Count > oldCount { + shard.heapifyUp(ms.heapIndex) + } + } else { + if len(shard.stats) < d.maxItems { + newMs := &metricStat{ + key: key, + Name: sample.Name, + Count: 1, + //error: 0, + Tags: tags, + LastSeen: now, + } + shard.stats[key] = newMs + shard.minHeap = append(shard.minHeap, newMs) + shard.heapifyUp(len(shard.minHeap) - 1) + } else { + // At capacity - replace minimum (Space-Saving's key insight) + // The new item inherits the min's count + 1 and error = min's count + minEntry := shard.minHeap[0] + oldKey := minEntry.key + inheritedCount := minEntry.Count + + // Remove old key from map + delete(shard.stats, oldKey) + + // Reuse the entry object (update in place) + minEntry.key = key + minEntry.Name = sample.Name + minEntry.Tags = tags + minEntry.Count = inheritedCount + 1 + //minEntry.error = inheritedCount + minEntry.LastSeen = now + // heapIndex stays at 0 + + // Add new key to map + shard.stats[key] = minEntry + + // Re-heapify from root since count increased + shard.heapifyDown(0) + } + } + + // Log if enabled + //if d.dogstatsdDebugLogger != nil { + // logMessage := "Metric Name: %v | Tags: {%v} | Count: %v | Last Seen: %v " + // d.dogstatsdDebugLogger.Infof(logMessage, ms.Name, ms.Tags, ms.Count, ms.LastSeen) + //} +} + +// Min-heap operations for maintaining items by (count - error) +// This keeps the most uncertain/lowest-count items at the root + +func (mss *metricStatsShard) heapifyUp(idx int) { + if idx < 0 || idx >= len(mss.minHeap) { + return + } + for idx > 0 { + parent := (idx - 1) / 2 + // Compare by (count - error) for minimum uncertainty + //if mss.minHeap[idx].Count-mss.minHeap[idx].error >= mss.minHeap[parent].Count-mss.minHeap[parent].error { + if mss.minHeap[idx].Count >= mss.minHeap[parent].Count { + break + } + // Swap and update heap indices + mss.minHeap[idx], mss.minHeap[parent] = mss.minHeap[parent], mss.minHeap[idx] + mss.minHeap[idx].heapIndex = idx + mss.minHeap[parent].heapIndex = parent + idx = parent + } +} + +func (mss *metricStatsShard) heapifyDown(idx int) { + n := len(mss.minHeap) + for { + smallest := idx + left := 2*idx + 1 + right := 2*idx + 2 + + //if left < n && mss.minHeap[left].Count-mss.minHeap[left].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + if left < n && mss.minHeap[left].Count < mss.minHeap[smallest].Count { + smallest = left + } + //if right < n && mss.minHeap[right].Count-mss.minHeap[right].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + if right < n && mss.minHeap[right].Count < mss.minHeap[smallest].Count { + smallest = right + } + + if smallest == idx { + break + } + + // Swap and update heap indices + mss.minHeap[idx], mss.minHeap[smallest] = mss.minHeap[smallest], mss.minHeap[idx] + mss.minHeap[idx].heapIndex = idx + mss.minHeap[smallest].heapIndex = smallest + idx = smallest + } +} + +//// GetErrorBound returns the maximum error bound for counts +//// In Space-Saving, the error for any item is at most n/k where n is total items seen +//func (mss *metricStatsShard) GetErrorBound() uint64 { +// mss.RLock() +// defer mss.RUnlock() +// +// if len(mss.minHeap) == 0 { +// return 0 +// } +// +// // The minimum entry's count represents approximately n/k +// return mss.minHeap[0].Count +//} + +// MakeKey creates a simple key from the name and tags +func MakeKey(sample metrics.MetricSample) (key uint64, joinedTags string) { + // Sort tags to ensure consistent key + sortedTags := make([]string, len(sample.Tags)) + copy(sortedTags, sample.Tags) + sort.Strings(sortedTags) + joinedTags = strings.Join(sortedTags, " ") + m := murmur3.New64() + m.Write([]byte(sample.Name)) + m.Write([]byte("|")) + m.Write([]byte(joinedTags)) + return m.Sum64(), joinedTags +} + +// SetMetricStatsEnabled enables or disables metric stats +func (d *serverDebugImpl) SetMetricStatsEnabled(enable bool) { + d.Lock() + defer d.Unlock() + + if enable { + d.enableMetricsStats() + } else { + d.disableMetricsStats() + } +} + +// enableMetricsStats enables the debug mode of the DogStatsD server and start +// the debug mainloop collecting the amount of metrics received. +func (d *serverDebugImpl) enableMetricsStats() { + // already enabled? + if d.enabled.Load() { + return + } + + d.enabled.Store(true) + go func() { + ticker := d.clock.Ticker(time.Millisecond * 100) + d.log.Debug("Starting the DogStatsD debug loop.") + defer func() { + d.log.Debug("Stopping the DogStatsD debug loop.") + ticker.Stop() + }() + for { + select { + case <-ticker.C: + sec := d.clock.Now().Truncate(time.Second) + if sec.After(d.metricsCounts.currentSec) { + d.metricsCounts.currentSec = sec + if d.hasSpike() { + d.log.Warnf("A burst of metrics has been detected by DogStatSd: here is the last 5 seconds count of metrics: %v", d.metricsCounts.counts) + } + + d.metricsCounts.bucketIdx++ + + if d.metricsCounts.bucketIdx >= len(d.metricsCounts.counts) { + d.metricsCounts.bucketIdx = 0 + } + + d.metricsCounts.counts[d.metricsCounts.bucketIdx] = 0 + } + case <-d.metricsCounts.metricChan: + d.metricsCounts.counts[d.metricsCounts.bucketIdx]++ + case <-d.metricsCounts.closeChan: + return + } + } + }() +} + +func (d *serverDebugImpl) hasSpike() bool { + // compare this one to the sum of all others + // if the difference is higher than all others sum, consider this + // as an anomaly. + var sum uint64 + for _, v := range d.metricsCounts.counts { + sum += v + } + sum -= d.metricsCounts.counts[d.metricsCounts.bucketIdx] + + return d.metricsCounts.counts[d.metricsCounts.bucketIdx] > sum +} + +// GetJSONDebugStats returns jsonified debug statistics. +func (d *serverDebugImpl) GetJSONDebugStats() ([]byte, error) { + // Aggregate stats from all shards + aggregatedStats := make(map[ckey.ContextKey]*metricStat) + + // Unlock after we convert to json + defer func() { + for i := uint64(0); i < d.numShards; i++ { + d.shards[i].RUnlock() + } + }() + + for i := uint64(0); i < d.numShards; i++ { + d.shards[i].RLock() + for key, stat := range d.shards[i].stats { + aggregatedStats[key] = stat + } + } + + return json.Marshal(aggregatedStats) +} + +func (d *serverDebugImpl) IsDebugEnabled() bool { + return d.enabled.Load() +} + +// disableMetricsStats disables the debug mode of the DogStatsD server and +// stops the debug mainloop. + +func (d *serverDebugImpl) disableMetricsStats() { + if d.enabled.Load() { + d.enabled.Store(false) + d.metricsCounts.closeChan <- struct{}{} + } + + d.log.Info("Disabling DogStatsD debug metrics stats.") +} + +// build a local dogstatsd logger and bubbling up any errors +func (d *serverDebugImpl) getDogstatsdDebug(cfg model.Reader) pkglog.LoggerInterface { + + var dogstatsdLogger pkglog.LoggerInterface + + // Configuring the log file path + logFile := cfg.GetString("dogstatsd_log_file") + if logFile == "" { + logFile = defaultpaths.DogstatsDLogFile + } + + // Set up dogstatsdLogger + if cfg.GetBool("dogstatsd_logging_enabled") { + logger, e := pkglogsetup.SetupDogstatsdLogger(logFile, pkgconfigsetup.Datadog()) + if e != nil { + // use component logger instead of global logger. + d.log.Errorf("Unable to set up Dogstatsd logger: %v. || Please reach out to Datadog support at https://docs.datadoghq.com/help/ ", e) + return nil + } + dogstatsdLogger = logger + } + return dogstatsdLogger + +} diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go new file mode 100644 index 00000000000000..e221d1e675e7dd --- /dev/null +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go @@ -0,0 +1,224 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-2021 Datadog, Inc. + +package serverdebugimpltopk + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "github.com/DataDog/datadog-agent/comp/core/config" + log "github.com/DataDog/datadog-agent/comp/core/log/def" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" +) + +func fulfillDeps(t testing.TB, overrides map[string]interface{}) serverdebug.Component { + return fxutil.Test[serverdebug.Component](t, fx.Options( + fx.Provide(func() log.Component { return logmock.New(t) }), + fx.Provide(func() config.Component { + return config.NewMockWithOverrides(t, overrides) + }), + Module(), + )) +} + +func TestDebugStatsSpike(t *testing.T) { + t.Skip("flaky") + cfg := make(map[string]interface{}) + cfg["dogstatsd_logging_enabled"] = false + debug := fulfillDeps(t, cfg) + d := debug.(*serverDebugImpl) + + clk := clock.NewMock() + d.clock = clk + + d.SetMetricStatsEnabled(true) + sample := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} + + send := func(count int) { + for i := 0; i < count; i++ { + d.StoreMetricStats(sample) + } + } + + send(10) + + clk.Add(1050 * time.Millisecond) + send(10) + + clk.Add(1050 * time.Millisecond) + send(10) + + clk.Add(1050 * time.Millisecond) + send(10) + + clk.Add(1050 * time.Millisecond) + send(500) + + // stop the debug loop to avoid data race + d.SetMetricStatsEnabled(false) + time.Sleep(500 * time.Millisecond) + + assert.True(t, d.hasSpike()) + + d.SetMetricStatsEnabled(true) + // This sleep is necessary as we need to wait for the goroutine function within 'EnableMetricsStats' to start. + // If we remove the sleep, the debug loop ticker will not be triggered by the clk.Add() call and the 500 samples + // added with 'send(500)' will be considered as if they had been added in the same second as the previous 500 samples. + // This will lead to a spike because we have 1000 samples in 1 second instead of having 500 and 500 in 2 different seconds. + time.Sleep(1050 * time.Millisecond) + + clk.Add(1050 * time.Millisecond) + send(500) + + // stop the debug loop to avoid data race + d.SetMetricStatsEnabled(false) + time.Sleep(500 * time.Millisecond) + + // it is no more considered a spike because we had another second with 500 metrics + assert.False(t, d.hasSpike()) + +} + +func TestDebugStats(t *testing.T) { + cfg := make(map[string]interface{}) + cfg["dogstatsd_logging_enabled"] = false + debug := fulfillDeps(t, cfg) + d := debug.(*serverDebugImpl) + + clk := clock.NewMock() + d.clock = clk + + d.SetMetricStatsEnabled(true) + + // data + sample1 := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} + sample2 := metrics.MetricSample{Name: "some.metric2", Tags: []string{"a"}} + sample3 := metrics.MetricSample{Name: "some.metric3", Tags: make([]string, 0)} + sample4 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"b", "c"}} + sample5 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"c", "b"}} + hash1, _ := MakeKey(sample1) + hash2, _ := MakeKey(sample2) + hash3, _ := MakeKey(sample3) + hash4, _ := MakeKey(sample4) + hash5, _ := MakeKey(sample5) + + hash1Key := ckey.ContextKey(hash1) + hash2Key := ckey.ContextKey(hash2) + hash3Key := ckey.ContextKey(hash3) + hash4Key := ckey.ContextKey(hash4) + hash5Key := ckey.ContextKey(hash5) + // test ingestion and ingestion time + d.StoreMetricStats(sample1) + d.StoreMetricStats(sample2) + clk.Add(10 * time.Millisecond) + d.StoreMetricStats(sample1) + + data, err := d.GetJSONDebugStats() + require.NoError(t, err, "cannot get debug stats") + require.NotNil(t, data) + require.NotEmpty(t, data) + + var stats map[ckey.ContextKey]metricStat + err = json.Unmarshal(data, &stats) + require.NoError(t, err, "data is not valid") + require.Len(t, stats, 2, "two metrics should have been captured") + + require.True(t, stats[hash1Key].LastSeen.After(stats[hash2Key].LastSeen), "some.metric1 should have appeared again after some.metric2") + + d.StoreMetricStats(sample3) + clk.Add(10 * time.Millisecond) + d.StoreMetricStats(sample1) + + d.StoreMetricStats(sample4) + d.StoreMetricStats(sample5) + data, _ = d.GetJSONDebugStats() + err = json.Unmarshal(data, &stats) + require.NoError(t, err, "data is not valid") + require.Len(t, stats, 4, "4 metrics should have been captured") + + // test stats array + metric1 := stats[hash1Key] + metric2 := stats[hash2Key] + metric3 := stats[hash3Key] + metric4 := stats[hash4Key] + metric5 := stats[hash5Key] + + require.True(t, metric1.LastSeen.After(metric2.LastSeen), "some.metric1 should have appeared again after some.metric2") + require.True(t, metric1.LastSeen.After(metric3.LastSeen), "some.metric1 should have appeared again after some.metric3") + require.True(t, metric3.LastSeen.After(metric2.LastSeen), "some.metric3 should have appeared again after some.metric2") + + require.Equal(t, metric1.Count, uint64(3)) + require.Equal(t, metric2.Count, uint64(1)) + require.Equal(t, metric3.Count, uint64(1)) + + // test context correctness + require.Equal(t, metric4.Tags, "b c") + require.Equal(t, metric5.Tags, "b c") + require.Equal(t, hash4, hash5) +} + +func TestFormatDebugStats(t *testing.T) { + // Create test data + stats := map[uint64]metricStat{ + 123: { + Name: "test.metric1", + Count: 10, + LastSeen: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC), + Tags: "env:prod service:api", + }, + 456: { + Name: "test.metric2", + Count: 15, + LastSeen: time.Date(2025, 1, 1, 11, 0, 0, 0, time.UTC), + Tags: "env:dev", + }, + } + + // Convert test data to JSON (expected input format) + statsJSON, err := json.Marshal(stats) + require.NoError(t, err) + + // Call FormatDebugStats + result, err := FormatDebugStats(statsJSON) + require.NoError(t, err) + + // Expected formatted table + expectedResult := `Metric | Tags | Count | Last Seen +-----------------------------------------|----------------------|------------|--------------------- +test.metric2 | env:dev | 15 | 2025-01-01 11:00:00 +0000 UTC +test.metric1 | env:prod service:api | 10 | 2025-01-01 12:00:00 +0000 UTC +` + + // Verify the entire formatted result + assert.Equal(t, expectedResult, result) + + // Verify sorting (metrics should be sorted by count in descending order) + assert.True(t, strings.Index(result, "test.metric2") < strings.Index(result, "test.metric1")) + + // Test empty stats + emptyStats := map[uint64]metricStat{} + emptyStatsJSON, err := json.Marshal(emptyStats) + require.NoError(t, err) + + emptyResult, err := FormatDebugStats(emptyStatsJSON) + require.NoError(t, err) + assert.Contains(t, emptyResult, "No metrics processed yet.") + + // Test invalid JSON + _, err = FormatDebugStats([]byte("invalid json")) + assert.Error(t, err) +} diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go new file mode 100644 index 00000000000000..62b3a70fc4f984 --- /dev/null +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go @@ -0,0 +1,49 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build test + +package serverdebugimpltopk + +import ( + "sync" + + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + + "go.uber.org/atomic" + "go.uber.org/fx" +) + +// MockModule defines the fx options for the mock component. +func MockModule() fxutil.Module { + return fxutil.Component( + fx.Provide(newMockServerDebug)) +} + +type mockServerDebug struct { + sync.Mutex + enabled *atomic.Bool +} + +func newMockServerDebug() serverdebug.Component { + return &mockServerDebug{enabled: atomic.NewBool(false)} +} + +func (d *mockServerDebug) StoreMetricStats(_ metrics.MetricSample) { +} + +func (d *mockServerDebug) SetMetricStatsEnabled(enable bool) { + d.enabled.Store(enable) +} + +func (d *mockServerDebug) GetJSONDebugStats() ([]byte, error) { + return []byte{}, nil +} + +func (d *mockServerDebug) IsDebugEnabled() bool { + return d.enabled.Load() +} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index b30bb72d4c8deb..560e6988814a3e 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1883,7 +1883,9 @@ func dogstatsd(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("dogstatsd_origin_detection_client", false) config.BindEnvAndSetDefault("dogstatsd_origin_optout_enabled", true) config.BindEnvAndSetDefault("dogstatsd_so_rcvbuf", 0) - config.BindEnvAndSetDefault("dogstatsd_metrics_stats_enable", false) + config.BindEnvAndSetDefault("dogstatsd_metrics_stats_enable", true) + config.BindEnvAndSetDefault("dogstatsd_metrics_stats_num_shards", 2) + config.BindEnvAndSetDefault("dogstatsd_metrics_stats_max_items", 50) config.BindEnvAndSetDefault("dogstatsd_tags", []string{}) config.BindEnvAndSetDefault("dogstatsd_mapper_cache_size", 1000) config.BindEnvAndSetDefault("dogstatsd_string_interner_size", 4096) diff --git a/tasks/components.py b/tasks/components.py index f76325469c2c54..c498c1dfb1cd61 100644 --- a/tasks/components.py +++ b/tasks/components.py @@ -126,6 +126,7 @@ def has_type_component(content) -> bool: 'comp/core/telemetry/noopsimpl', 'comp/dogstatsd/pidmap/pidmapimpl', 'comp/dogstatsd/serverDebug/serverdebugimpl', + 'comp/dogstatsd/serverDebug/serverdebugimpltopk', 'comp/dogstatsd/status/statusimpl', 'comp/etw/impl', 'comp/forwarder/eventplatform/eventplatformimpl', diff --git a/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml b/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml index 5eb8f41ad25880..568034d3b949b0 100644 --- a/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml +++ b/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml @@ -13,3 +13,5 @@ telemetry.checks: '*' cloud_provider_metadata: [] dogstatsd_socket: '/tmp/dsd.socket' +dogstatsd_metrics_stats_num_shards: 16 +dogstatsd_metrics_stats_max_items: 100 diff --git a/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml b/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml index 5eb8f41ad25880..568034d3b949b0 100644 --- a/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml +++ b/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml @@ -13,3 +13,5 @@ telemetry.checks: '*' cloud_provider_metadata: [] dogstatsd_socket: '/tmp/dsd.socket' +dogstatsd_metrics_stats_num_shards: 16 +dogstatsd_metrics_stats_max_items: 100 diff --git a/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml b/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml index 5eb8f41ad25880..568034d3b949b0 100644 --- a/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml +++ b/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml @@ -13,3 +13,5 @@ telemetry.checks: '*' cloud_provider_metadata: [] dogstatsd_socket: '/tmp/dsd.socket' +dogstatsd_metrics_stats_num_shards: 16 +dogstatsd_metrics_stats_max_items: 100