From b9bb5e441429f3a58542d03165273d2962a8d8c6 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Thu, 4 Dec 2025 10:15:59 +0100 Subject: [PATCH 1/8] Enabling dogstatsd_metrics_stats_enable by default --- pkg/config/setup/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index b30bb72d4c8deb..e22c627d9d6baf 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1883,7 +1883,7 @@ func dogstatsd(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("dogstatsd_origin_detection_client", false) config.BindEnvAndSetDefault("dogstatsd_origin_optout_enabled", true) config.BindEnvAndSetDefault("dogstatsd_so_rcvbuf", 0) - config.BindEnvAndSetDefault("dogstatsd_metrics_stats_enable", false) + config.BindEnvAndSetDefault("dogstatsd_metrics_stats_enable", true) config.BindEnvAndSetDefault("dogstatsd_tags", []string{}) config.BindEnvAndSetDefault("dogstatsd_mapper_cache_size", 1000) config.BindEnvAndSetDefault("dogstatsd_string_interner_size", 4096) From 8fe5520462da9254338ae70a8eb24110d2364f30 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Thu, 4 Dec 2025 15:10:57 +0100 Subject: [PATCH 2/8] Creating multiple shards to process metric stats --- .../subcommands/dogstatsdstats/command.go | 2 +- comp/dogstatsd/bundle.go | 2 +- .../serverDebug/serverdebugimpltopk/debug.go | 370 ++++++++++++++++++ .../serverdebugimpltopk/debug_test.go | 213 ++++++++++ .../serverdebugimpltopk/mock_debug.go | 49 +++ tasks/components.py | 1 + 6 files changed, 635 insertions(+), 2 deletions(-) create mode 100644 comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go create mode 100644 comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go create mode 100644 comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go diff --git a/cmd/agent/subcommands/dogstatsdstats/command.go b/cmd/agent/subcommands/dogstatsdstats/command.go index 36b4367c0350cd..425fcb18a4e885 100644 --- a/cmd/agent/subcommands/dogstatsdstats/command.go +++ b/cmd/agent/subcommands/dogstatsdstats/command.go @@ -22,7 +22,7 @@ import ( ipcfx "github.com/DataDog/datadog-agent/comp/core/ipc/fx" ipchttp "github.com/DataDog/datadog-agent/comp/core/ipc/httphelpers" log "github.com/DataDog/datadog-agent/comp/core/log/def" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" + serverdebugimpl "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpltopk" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/DataDog/datadog-agent/pkg/util/input" diff --git a/comp/dogstatsd/bundle.go b/comp/dogstatsd/bundle.go index 01c9dfac06e946..6bd3960adc348a 100644 --- a/comp/dogstatsd/bundle.go +++ b/comp/dogstatsd/bundle.go @@ -9,7 +9,7 @@ import ( "github.com/DataDog/datadog-agent/comp/dogstatsd/pidmap/pidmapimpl" replayfx "github.com/DataDog/datadog-agent/comp/dogstatsd/replay/fx" "github.com/DataDog/datadog-agent/comp/dogstatsd/server" - "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpl" + serverdebugimpl "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug/serverdebugimpltopk" "github.com/DataDog/datadog-agent/comp/dogstatsd/status/statusimpl" "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go new file mode 100644 index 00000000000000..2bcec0016d1c43 --- /dev/null +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -0,0 +1,370 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package serverdebugimpltopk implements a component to run the dogstatsd server debug +package serverdebugimpltopk + +import ( + "bytes" + "encoding/json" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/benbjohnson/clock" + "go.uber.org/atomic" + "go.uber.org/fx" + + configComponent "github.com/DataDog/datadog-agent/comp/core/config" + log "github.com/DataDog/datadog-agent/comp/core/log/def" + logComponentImpl "github.com/DataDog/datadog-agent/comp/core/log/impl" + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" + "github.com/DataDog/datadog-agent/pkg/config/model" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/tagset" + "github.com/DataDog/datadog-agent/pkg/util/defaultpaths" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + pkglog "github.com/DataDog/datadog-agent/pkg/util/log" + pkglogsetup "github.com/DataDog/datadog-agent/pkg/util/log/setup" +) + +// Module defines the fx options for this component. +func Module() fxutil.Module { + return fxutil.Component( + fx.Provide(newServerDebug)) +} + +type dependencies struct { + fx.In + + Log log.Component + Config configComponent.Component +} + +// metricStat holds how many times a metric has been +// processed and when was the last time. +type metricStat struct { + Name string `json:"name"` + Count uint64 `json:"count"` + LastSeen time.Time `json:"last_seen"` + Tags string `json:"tags"` + key ckey.ContextKey +} + +// metricStatsShard holds a subset of metric stats with its own lock +// to allow concurrent access to different shards +type metricStatsShard struct { + sync.RWMutex + stats map[ckey.ContextKey]metricStat + tagsAccumulator *tagset.HashingTagsAccumulator +} + +const defaultNumShards = uint32(16) // Power of 2 for efficient modulo operation + +type serverDebugImpl struct { + sync.RWMutex + log log.Component + enabled *atomic.Bool + shards []*metricStatsShard + numShards uint32 + // counting number of metrics processed last X seconds + metricsCounts metricsCountBuckets + // keyGen is used to generate hashes of the metrics received by dogstatsd + keyGen *ckey.KeyGenerator + + // clock is used to keep a consistent time state within the debug server whether + // we use a real clock in production code or a mock clock for unit testing + clock clock.Clock + // dogstatsdDebugLogger is an instance of the logger config that can be used to create new logger for dogstatsd-stats metrics + dogstatsdDebugLogger pkglog.LoggerInterface +} + +// NewServerlessServerDebug creates a new instance of serverDebug.Component +func NewServerlessServerDebug() serverdebug.Component { + return newServerDebugCompat(logComponentImpl.NewTemporaryLoggerWithoutInit(), pkgconfigsetup.Datadog()) +} + +// newServerDebug creates a new instance of a ServerDebug +func newServerDebug(deps dependencies) serverdebug.Component { + return newServerDebugCompat(deps.Log, deps.Config) +} + +func newServerDebugCompat(l log.Component, cfg model.Reader) serverdebug.Component { + numShards := defaultNumShards + sd := &serverDebugImpl{ + log: l, + enabled: atomic.NewBool(false), + metricsCounts: metricsCountBuckets{ + counts: [5]uint64{0, 0, 0, 0, 0}, + metricChan: make(chan struct{}), + closeChan: make(chan struct{}), + }, + keyGen: ckey.NewKeyGenerator(), + clock: clock.New(), + shards: make([]*metricStatsShard, numShards), + numShards: numShards, + } + // Initialize all shards + for i := uint32(0); i < sd.numShards; i++ { + sd.shards[i] = &metricStatsShard{ + stats: make(map[ckey.ContextKey]metricStat, 1), + tagsAccumulator: tagset.NewHashingTagsAccumulator(), + } + } + + sd.dogstatsdDebugLogger = sd.getDogstatsdDebug(cfg) + + return sd +} + +// metricsCountBuckets is counting the amount of metrics received for the last 5 seconds. +// It is used to detect spikes. +type metricsCountBuckets struct { + counts [5]uint64 + bucketIdx int + currentSec time.Time + metricChan chan struct{} + closeChan chan struct{} +} + +// FormatDebugStats returns a printable version of debug stats. +func FormatDebugStats(stats []byte) (string, error) { + var dogStats map[uint64]metricStat + if err := json.Unmarshal(stats, &dogStats); err != nil { + return "", err + } + + // put metrics in order: first is the more frequent + order := make([]uint64, len(dogStats)) + i := 0 + for metric := range dogStats { + order[i] = metric + i++ + } + + sort.Slice(order, func(i, j int) bool { + return dogStats[order[i]].Count > dogStats[order[j]].Count + }) + + // write the response + buf := bytes.NewBuffer(nil) + + header := fmt.Sprintf("%-40s | %-20s | %-10s | %s\n", "Metric", "Tags", "Count", "Last Seen") + buf.Write([]byte(header)) + buf.Write([]byte(strings.Repeat("-", 40) + "-|-" + strings.Repeat("-", 20) + "-|-" + strings.Repeat("-", 10) + "-|-" + strings.Repeat("-", 20) + "\n")) + + for _, key := range order { + dStats := dogStats[key] + buf.Write([]byte(fmt.Sprintf("%-40s | %-20s | %-10d | %-20v\n", dStats.Name, dStats.Tags, dStats.Count, dStats.LastSeen))) + } + + if len(dogStats) == 0 { + buf.Write([]byte("No metrics processed yet.")) + } + + return buf.String(), nil +} + +// StoreMetricStats stores stats on the given metric sample. +// +// It can help troubleshooting clients with bad behaviors. +func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { + if !d.enabled.Load() { + return + } + + now := d.clock.Now() + + // Determine which shard to use based on metric name hash + // Using a simple hash function for distribution + hash := hashString(sample.Name) + shardIdx := hash % d.numShards + shard := d.shards[shardIdx] + + // Lock only the specific shard, not the entire structure + shard.Lock() + defer shard.Unlock() + + // Reset and populate tags accumulator for this shard + shard.tagsAccumulator.Reset() + shard.tagsAccumulator.Append(sample.Tags...) + + // Generate key for this metric + key := d.keyGen.Generate(sample.Name, "", shard.tagsAccumulator) + + ms := metricStat{ + key: key, + Name: sample.Name, + Tags: strings.Join(shard.tagsAccumulator.Get(), " "), // we don't want/need to share the underlying array + } + + // Get or create metric stat + ms, exists := shard.stats[key] + if !exists { + ms = metricStat{ + Name: sample.Name, + Tags: strings.Join(shard.tagsAccumulator.Get(), " "), // we don't want/need to share the underlying array + } + } + + // Update stats + ms.Count++ + ms.LastSeen = now + + // Store back to shard + shard.stats[key] = ms + + // Log if enabled + if d.dogstatsdDebugLogger != nil { + logMessage := "Metric Name: %v | Tags: {%v} | Count: %v | Last Seen: %v " + d.dogstatsdDebugLogger.Infof(logMessage, ms.Name, ms.Tags, ms.Count, ms.LastSeen) + } + + // Notify metrics count tracker + select { + case d.metricsCounts.metricChan <- struct{}{}: + default: + // Non-blocking send to avoid deadlock if channel is full + } +} + +// hashString returns a hash value for a string +func hashString(s string) uint32 { + h := uint32(0) + for i := 0; i < len(s); i++ { + h = h*31 + uint32(s[i]) + } + return h +} + +// SetMetricStatsEnabled enables or disables metric stats +func (d *serverDebugImpl) SetMetricStatsEnabled(enable bool) { + d.Lock() + defer d.Unlock() + + if enable { + d.enableMetricsStats() + } else { + d.disableMetricsStats() + } +} + +// enableMetricsStats enables the debug mode of the DogStatsD server and start +// the debug mainloop collecting the amount of metrics received. +func (d *serverDebugImpl) enableMetricsStats() { + // already enabled? + if d.enabled.Load() { + return + } + + d.enabled.Store(true) + go func() { + ticker := d.clock.Ticker(time.Millisecond * 100) + d.log.Debug("Starting the DogStatsD debug loop.") + defer func() { + d.log.Debug("Stopping the DogStatsD debug loop.") + ticker.Stop() + }() + for { + select { + case <-ticker.C: + sec := d.clock.Now().Truncate(time.Second) + if sec.After(d.metricsCounts.currentSec) { + d.metricsCounts.currentSec = sec + if d.hasSpike() { + d.log.Warnf("A burst of metrics has been detected by DogStatSd: here is the last 5 seconds count of metrics: %v", d.metricsCounts.counts) + } + + d.metricsCounts.bucketIdx++ + + if d.metricsCounts.bucketIdx >= len(d.metricsCounts.counts) { + d.metricsCounts.bucketIdx = 0 + } + + d.metricsCounts.counts[d.metricsCounts.bucketIdx] = 0 + } + case <-d.metricsCounts.metricChan: + d.metricsCounts.counts[d.metricsCounts.bucketIdx]++ + case <-d.metricsCounts.closeChan: + return + } + } + }() +} + +func (d *serverDebugImpl) hasSpike() bool { + // compare this one to the sum of all others + // if the difference is higher than all others sum, consider this + // as an anomaly. + var sum uint64 + for _, v := range d.metricsCounts.counts { + sum += v + } + sum -= d.metricsCounts.counts[d.metricsCounts.bucketIdx] + + return d.metricsCounts.counts[d.metricsCounts.bucketIdx] > sum +} + +// GetJSONDebugStats returns jsonified debug statistics. +func (d *serverDebugImpl) GetJSONDebugStats() ([]byte, error) { + // Aggregate stats from all shards + aggregatedStats := make(map[ckey.ContextKey]metricStat) + + for i := uint32(0); i < d.numShards; i++ { + shard := d.shards[i] + shard.RLock() + for key, stat := range shard.stats { + aggregatedStats[key] = stat + } + shard.RUnlock() + } + + return json.Marshal(aggregatedStats) +} + +func (d *serverDebugImpl) IsDebugEnabled() bool { + return d.enabled.Load() +} + +// disableMetricsStats disables the debug mode of the DogStatsD server and +// stops the debug mainloop. + +func (d *serverDebugImpl) disableMetricsStats() { + if d.enabled.Load() { + d.enabled.Store(false) + d.metricsCounts.closeChan <- struct{}{} + } + + d.log.Info("Disabling DogStatsD debug metrics stats.") +} + +// build a local dogstatsd logger and bubbling up any errors +func (d *serverDebugImpl) getDogstatsdDebug(cfg model.Reader) pkglog.LoggerInterface { + + var dogstatsdLogger pkglog.LoggerInterface + + // Configuring the log file path + logFile := cfg.GetString("dogstatsd_log_file") + if logFile == "" { + logFile = defaultpaths.DogstatsDLogFile + } + + // Set up dogstatsdLogger + if cfg.GetBool("dogstatsd_logging_enabled") { + logger, e := pkglogsetup.SetupDogstatsdLogger(logFile, pkgconfigsetup.Datadog()) + if e != nil { + // use component logger instead of global logger. + d.log.Errorf("Unable to set up Dogstatsd logger: %v. || Please reach out to Datadog support at https://docs.datadoghq.com/help/ ", e) + return nil + } + dogstatsdLogger = logger + } + return dogstatsdLogger + +} diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go new file mode 100644 index 00000000000000..7f603dff1b0d59 --- /dev/null +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go @@ -0,0 +1,213 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-2021 Datadog, Inc. + +package serverdebugimpltopk + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +//func fulfillDeps(t testing.TB, overrides map[string]interface{}) serverdebug.Component { +// return fxutil.Test[serverdebug.Component](t, fx.Options( +// fx.Provide(func() log.Component { return logmock.New(t) }), +// fx.Provide(func() config.Component { +// return config.NewMockWithOverrides(t, overrides) +// }), +// Module(), +// )) +//} + +//func TestDebugStatsSpike(t *testing.T) { +// cfg := make(map[string]interface{}) +// cfg["dogstatsd_logging_enabled"] = false +// debug := fulfillDeps(t, cfg) +// d := debug.(*serverDebugImpl) +// +// assert := assert.New(t) +// +// clk := clock.NewMock() +// d.clock = clk +// +// d.SetMetricStatsEnabled(true) +// sample := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} +// +// send := func(count int) { +// for i := 0; i < count; i++ { +// d.StoreMetricStats(sample) +// } +// } +// +// send(10) +// +// clk.Add(1050 * time.Millisecond) +// send(10) +// +// clk.Add(1050 * time.Millisecond) +// send(10) +// +// clk.Add(1050 * time.Millisecond) +// send(10) +// +// clk.Add(1050 * time.Millisecond) +// send(500) +// +// // stop the debug loop to avoid data race +// d.SetMetricStatsEnabled(false) +// time.Sleep(500 * time.Millisecond) +// +// assert.True(d.hasSpike()) +// +// d.SetMetricStatsEnabled(true) +// // This sleep is necessary as we need to wait for the goroutine function within 'EnableMetricsStats' to start. +// // If we remove the sleep, the debug loop ticker will not be triggered by the clk.Add() call and the 500 samples +// // added with 'send(500)' will be considered as if they had been added in the same second as the previous 500 samples. +// // This will lead to a spike because we have 1000 samples in 1 second instead of having 500 and 500 in 2 different seconds. +// time.Sleep(1050 * time.Millisecond) +// +// clk.Add(1050 * time.Millisecond) +// send(500) +// +// // stop the debug loop to avoid data race +// d.SetMetricStatsEnabled(false) +// time.Sleep(500 * time.Millisecond) +// +// // it is no more considered a spike because we had another second with 500 metrics +// assert.False(d.hasSpike()) +// +//} +// +//func TestDebugStats(t *testing.T) { +// cfg := make(map[string]interface{}) +// cfg["dogstatsd_logging_enabled"] = false +// debug := fulfillDeps(t, cfg) +// d := debug.(*serverDebugImpl) +// +// clk := clock.NewMock() +// d.clock = clk +// +// d.SetMetricStatsEnabled(true) +// +// keygen := ckey.NewKeyGenerator() +// +// // data +// sample1 := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} +// sample2 := metrics.MetricSample{Name: "some.metric2", Tags: []string{"a"}} +// sample3 := metrics.MetricSample{Name: "some.metric3", Tags: make([]string, 0)} +// sample4 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"b", "c"}} +// sample5 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"c", "b"}} +// hash1 := keygen.Generate(sample1.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample1.Tags)) +// hash2 := keygen.Generate(sample2.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample2.Tags)) +// hash3 := keygen.Generate(sample3.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample3.Tags)) +// hash4 := keygen.Generate(sample4.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample4.Tags)) +// hash5 := keygen.Generate(sample5.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample5.Tags)) +// +// // test ingestion and ingestion time +// d.StoreMetricStats(sample1) +// d.StoreMetricStats(sample2) +// clk.Add(10 * time.Millisecond) +// d.StoreMetricStats(sample1) +// +// data, err := d.GetJSONDebugStats() +// require.NoError(t, err, "cannot get debug stats") +// require.NotNil(t, data) +// require.NotEmpty(t, data) +// +// var stats map[ckey.ContextKey]metricStat +// err = json.Unmarshal(data, &stats) +// require.NoError(t, err, "data is not valid") +// require.Len(t, stats, 2, "two metrics should have been captured") +// +// require.True(t, stats[hash1].LastSeen.After(stats[hash2].LastSeen), "some.metric1 should have appeared again after some.metric2") +// +// d.StoreMetricStats(sample3) +// clk.Add(10 * time.Millisecond) +// d.StoreMetricStats(sample1) +// +// d.StoreMetricStats(sample4) +// d.StoreMetricStats(sample5) +// data, _ = d.GetJSONDebugStats() +// err = json.Unmarshal(data, &stats) +// require.NoError(t, err, "data is not valid") +// require.Len(t, stats, 4, "4 metrics should have been captured") +// +// // test stats array +// metric1 := stats[hash1] +// metric2 := stats[hash2] +// metric3 := stats[hash3] +// metric4 := stats[hash4] +// metric5 := stats[hash5] +// +// require.True(t, metric1.LastSeen.After(metric2.LastSeen), "some.metric1 should have appeared again after some.metric2") +// require.True(t, metric1.LastSeen.After(metric3.LastSeen), "some.metric1 should have appeared again after some.metric3") +// require.True(t, metric3.LastSeen.After(metric2.LastSeen), "some.metric3 should have appeared again after some.metric2") +// +// require.Equal(t, metric1.Count, uint64(3)) +// require.Equal(t, metric2.Count, uint64(1)) +// require.Equal(t, metric3.Count, uint64(1)) +// +// // test context correctness +// require.Equal(t, metric4.Tags, "c b") +// require.Equal(t, metric5.Tags, "c b") +// require.Equal(t, hash4, hash5) +// +//} + +func TestFormatDebugStats(t *testing.T) { + // Create test data + stats := map[uint64]metricStat{ + 123: { + Name: "test.metric1", + Count: 10, + LastSeen: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC), + Tags: "env:prod service:api", + }, + 456: { + Name: "test.metric2", + Count: 15, + LastSeen: time.Date(2025, 1, 1, 11, 0, 0, 0, time.UTC), + Tags: "env:dev", + }, + } + + // Convert test data to JSON (expected input format) + statsJSON, err := json.Marshal(stats) + require.NoError(t, err) + + // Call FormatDebugStats + result, err := FormatDebugStats(statsJSON) + require.NoError(t, err) + + // Expected formatted table + expectedResult := `Metric | Tags | Count | Last Seen +-----------------------------------------|----------------------|------------|--------------------- +test.metric2 | env:dev | 15 | 2025-01-01 11:00:00 +0000 UTC +test.metric1 | env:prod service:api | 10 | 2025-01-01 12:00:00 +0000 UTC +` + + // Verify the entire formatted result + assert.Equal(t, expectedResult, result) + + // Verify sorting (metrics should be sorted by count in descending order) + assert.True(t, strings.Index(result, "test.metric2") < strings.Index(result, "test.metric1")) + + // Test empty stats + emptyStats := map[uint64]metricStat{} + emptyStatsJSON, err := json.Marshal(emptyStats) + require.NoError(t, err) + + emptyResult, err := FormatDebugStats(emptyStatsJSON) + require.NoError(t, err) + assert.Contains(t, emptyResult, "No metrics processed yet.") + + // Test invalid JSON + _, err = FormatDebugStats([]byte("invalid json")) + assert.Error(t, err) +} diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go new file mode 100644 index 00000000000000..62b3a70fc4f984 --- /dev/null +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/mock_debug.go @@ -0,0 +1,49 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build test + +package serverdebugimpltopk + +import ( + "sync" + + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + + "go.uber.org/atomic" + "go.uber.org/fx" +) + +// MockModule defines the fx options for the mock component. +func MockModule() fxutil.Module { + return fxutil.Component( + fx.Provide(newMockServerDebug)) +} + +type mockServerDebug struct { + sync.Mutex + enabled *atomic.Bool +} + +func newMockServerDebug() serverdebug.Component { + return &mockServerDebug{enabled: atomic.NewBool(false)} +} + +func (d *mockServerDebug) StoreMetricStats(_ metrics.MetricSample) { +} + +func (d *mockServerDebug) SetMetricStatsEnabled(enable bool) { + d.enabled.Store(enable) +} + +func (d *mockServerDebug) GetJSONDebugStats() ([]byte, error) { + return []byte{}, nil +} + +func (d *mockServerDebug) IsDebugEnabled() bool { + return d.enabled.Load() +} diff --git a/tasks/components.py b/tasks/components.py index f76325469c2c54..c498c1dfb1cd61 100644 --- a/tasks/components.py +++ b/tasks/components.py @@ -126,6 +126,7 @@ def has_type_component(content) -> bool: 'comp/core/telemetry/noopsimpl', 'comp/dogstatsd/pidmap/pidmapimpl', 'comp/dogstatsd/serverDebug/serverdebugimpl', + 'comp/dogstatsd/serverDebug/serverdebugimpltopk', 'comp/dogstatsd/status/statusimpl', 'comp/etw/impl', 'comp/forwarder/eventplatform/eventplatformimpl', From 1f29d01d25b532f53765efa4a1a21d80b6775276 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Mon, 22 Dec 2025 11:10:53 +0000 Subject: [PATCH 3/8] Removing debug message for each metric --- comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go index 2bcec0016d1c43..3f1b0bf6e20bad 100644 --- a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -221,10 +221,10 @@ func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { shard.stats[key] = ms // Log if enabled - if d.dogstatsdDebugLogger != nil { - logMessage := "Metric Name: %v | Tags: {%v} | Count: %v | Last Seen: %v " - d.dogstatsdDebugLogger.Infof(logMessage, ms.Name, ms.Tags, ms.Count, ms.LastSeen) - } + //if d.dogstatsdDebugLogger != nil { + // logMessage := "Metric Name: %v | Tags: {%v} | Count: %v | Last Seen: %v " + // d.dogstatsdDebugLogger.Infof(logMessage, ms.Name, ms.Tags, ms.Count, ms.LastSeen) + //} // Notify metrics count tracker select { From 0e93242fbd58d32b2dff100cfe491fa7d0b0e0c4 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Tue, 23 Dec 2025 09:44:32 +0000 Subject: [PATCH 4/8] Storing metrics seen using a Sharded Space-Saving algorithm --- .../serverDebug/serverdebugimpltopk/debug.go | 212 ++++++++++--- .../serverdebugimpltopk/debug_test.go | 299 +++++++++--------- 2 files changed, 315 insertions(+), 196 deletions(-) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go index 3f1b0bf6e20bad..7ab3615a822235 100644 --- a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -16,6 +16,7 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/twmb/murmur3" "go.uber.org/atomic" "go.uber.org/fx" @@ -27,7 +28,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/config/model" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/metrics" - "github.com/DataDog/datadog-agent/pkg/tagset" "github.com/DataDog/datadog-agent/pkg/util/defaultpaths" "github.com/DataDog/datadog-agent/pkg/util/fxutil" pkglog "github.com/DataDog/datadog-agent/pkg/util/log" @@ -50,29 +50,35 @@ type dependencies struct { // metricStat holds how many times a metric has been // processed and when was the last time. type metricStat struct { - Name string `json:"name"` - Count uint64 `json:"count"` - LastSeen time.Time `json:"last_seen"` - Tags string `json:"tags"` - key ckey.ContextKey + Name string `json:"name"` + Count uint64 `json:"count"` + LastSeen time.Time `json:"last_seen"` + Tags string `json:"tags"` + key ckey.ContextKey + error uint64 // overestimation bound + heapIndex int // position in minHeap for O(1) lookup } // metricStatsShard holds a subset of metric stats with its own lock // to allow concurrent access to different shards type metricStatsShard struct { sync.RWMutex - stats map[ckey.ContextKey]metricStat - tagsAccumulator *tagset.HashingTagsAccumulator + stats map[ckey.ContextKey]*metricStat + minHeap []*metricStat + //tagsAccumulator *tagset.HashingTagsAccumulator } -const defaultNumShards = uint32(16) // Power of 2 for efficient modulo operation +const ( + defaultNumShards = uint64(16) // Power of 2 for efficient modulo operation + maxItems = 63 +) type serverDebugImpl struct { sync.RWMutex log log.Component enabled *atomic.Bool shards []*metricStatsShard - numShards uint32 + numShards uint64 // counting number of metrics processed last X seconds metricsCounts metricsCountBuckets // keyGen is used to generate hashes of the metrics received by dogstatsd @@ -111,10 +117,11 @@ func newServerDebugCompat(l log.Component, cfg model.Reader) serverdebug.Compone numShards: numShards, } // Initialize all shards - for i := uint32(0); i < sd.numShards; i++ { + for i := uint64(0); i < sd.numShards; i++ { sd.shards[i] = &metricStatsShard{ - stats: make(map[ckey.ContextKey]metricStat, 1), - tagsAccumulator: tagset.NewHashingTagsAccumulator(), + stats: make(map[ckey.ContextKey]*metricStat), + minHeap: make([]*metricStat, 0, maxItems), + //tagsAccumulator: tagset.NewHashingTagsAccumulator(), } } @@ -183,64 +190,160 @@ func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { // Determine which shard to use based on metric name hash // Using a simple hash function for distribution - hash := hashString(sample.Name) + hash, tags := MakeKey(sample) shardIdx := hash % d.numShards shard := d.shards[shardIdx] // Lock only the specific shard, not the entire structure shard.Lock() defer shard.Unlock() + defer func() { + // Notify metrics count tracker + select { + case d.metricsCounts.metricChan <- struct{}{}: + default: + // Non-blocking send to avoid deadlock if channel is full + } + }() // Reset and populate tags accumulator for this shard - shard.tagsAccumulator.Reset() - shard.tagsAccumulator.Append(sample.Tags...) + //shard.tagsAccumulator.Reset() + //shard.tagsAccumulator.Append(sample.Tags...) // Generate key for this metric - key := d.keyGen.Generate(sample.Name, "", shard.tagsAccumulator) - - ms := metricStat{ - key: key, - Name: sample.Name, - Tags: strings.Join(shard.tagsAccumulator.Get(), " "), // we don't want/need to share the underlying array - } + //key := d.keyGen.Generate(sample.Name, "", shard.tagsAccumulator) + key := ckey.ContextKey(hash) // Get or create metric stat - ms, exists := shard.stats[key] - if !exists { - ms = metricStat{ - Name: sample.Name, - Tags: strings.Join(shard.tagsAccumulator.Get(), " "), // we don't want/need to share the underlying array + if ms, exists := shard.stats[key]; exists { + oldCount := ms.Count + ms.Count++ + ms.LastSeen = now + + // Re-heapify since count changed (moved up in rank) + if ms.Count > oldCount { + shard.heapifyUp(ms.heapIndex) + } + } else { + if len(shard.stats) < maxItems { + newMs := &metricStat{ + key: key, + Name: sample.Name, + Count: 1, + error: 0, + Tags: tags, + LastSeen: now, + } + shard.stats[key] = newMs + shard.minHeap = append(shard.minHeap, newMs) + shard.heapifyUp(len(shard.minHeap) - 1) + } else { + // At capacity - replace minimum (Space-Saving's key insight) + // The new item inherits the min's count + 1 and error = min's count + minEntry := shard.minHeap[0] + oldKey := minEntry.key + inheritedCount := minEntry.Count + + // Remove old key from map + delete(shard.stats, oldKey) + + // Reuse the entry object (update in place) + minEntry.key = key + minEntry.Name = sample.Name + minEntry.Tags = tags + minEntry.Count = inheritedCount + 1 + minEntry.error = inheritedCount + minEntry.LastSeen = now + // heapIndex stays at 0 + + // Add new key to map + shard.stats[key] = minEntry + + // Re-heapify from root since count increased + shard.heapifyDown(0) } } - // Update stats - ms.Count++ - ms.LastSeen = now - - // Store back to shard - shard.stats[key] = ms - // Log if enabled //if d.dogstatsdDebugLogger != nil { // logMessage := "Metric Name: %v | Tags: {%v} | Count: %v | Last Seen: %v " // d.dogstatsdDebugLogger.Infof(logMessage, ms.Name, ms.Tags, ms.Count, ms.LastSeen) //} +} - // Notify metrics count tracker - select { - case d.metricsCounts.metricChan <- struct{}{}: - default: - // Non-blocking send to avoid deadlock if channel is full +// Min-heap operations for maintaining items by (count - error) +// This keeps the most uncertain/lowest-count items at the root + +func (mss *metricStatsShard) heapifyUp(idx int) { + if idx < 0 || idx >= len(mss.minHeap) { + return + } + for idx > 0 { + parent := (idx - 1) / 2 + // Compare by (count - error) for minimum uncertainty + if mss.minHeap[idx].Count-mss.minHeap[idx].error >= mss.minHeap[parent].Count-mss.minHeap[parent].error { + break + } + // Swap and update heap indices + mss.minHeap[idx], mss.minHeap[parent] = mss.minHeap[parent], mss.minHeap[idx] + mss.minHeap[idx].heapIndex = idx + mss.minHeap[parent].heapIndex = parent + idx = parent } } -// hashString returns a hash value for a string -func hashString(s string) uint32 { - h := uint32(0) - for i := 0; i < len(s); i++ { - h = h*31 + uint32(s[i]) +func (mss *metricStatsShard) heapifyDown(idx int) { + n := len(mss.minHeap) + for { + smallest := idx + left := 2*idx + 1 + right := 2*idx + 2 + + if left < n && mss.minHeap[left].Count-mss.minHeap[left].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + smallest = left + } + if right < n && mss.minHeap[right].Count-mss.minHeap[right].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + smallest = right + } + + if smallest == idx { + break + } + + // Swap and update heap indices + mss.minHeap[idx], mss.minHeap[smallest] = mss.minHeap[smallest], mss.minHeap[idx] + mss.minHeap[idx].heapIndex = idx + mss.minHeap[smallest].heapIndex = smallest + idx = smallest } - return h +} + +//// GetErrorBound returns the maximum error bound for counts +//// In Space-Saving, the error for any item is at most n/k where n is total items seen +//func (mss *metricStatsShard) GetErrorBound() uint64 { +// mss.RLock() +// defer mss.RUnlock() +// +// if len(mss.minHeap) == 0 { +// return 0 +// } +// +// // The minimum entry's count represents approximately n/k +// return mss.minHeap[0].Count +//} + +// MakeKey creates a simple key from the name and tags +func MakeKey(sample metrics.MetricSample) (key uint64, joinedTags string) { + // Sort tags to ensure consistent key + sortedTags := make([]string, len(sample.Tags)) + copy(sortedTags, sample.Tags) + sort.Strings(sortedTags) + joinedTags = strings.Join(sortedTags, " ") + m := murmur3.New64() + m.Write([]byte(sample.Name)) + m.Write([]byte("|")) + m.Write([]byte(joinedTags)) + return m.Sum64(), joinedTags } // SetMetricStatsEnabled enables or disables metric stats @@ -314,15 +417,20 @@ func (d *serverDebugImpl) hasSpike() bool { // GetJSONDebugStats returns jsonified debug statistics. func (d *serverDebugImpl) GetJSONDebugStats() ([]byte, error) { // Aggregate stats from all shards - aggregatedStats := make(map[ckey.ContextKey]metricStat) + aggregatedStats := make(map[ckey.ContextKey]*metricStat) + + // Unlock after we convert to json + defer func() { + for i := uint64(0); i < d.numShards; i++ { + d.shards[i].RUnlock() + } + }() - for i := uint32(0); i < d.numShards; i++ { - shard := d.shards[i] - shard.RLock() - for key, stat := range shard.stats { + for i := uint64(0); i < d.numShards; i++ { + d.shards[i].RLock() + for key, stat := range d.shards[i].stats { aggregatedStats[key] = stat } - shard.RUnlock() } return json.Marshal(aggregatedStats) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go index 7f603dff1b0d59..e221d1e675e7dd 100644 --- a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug_test.go @@ -11,154 +11,165 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "github.com/DataDog/datadog-agent/comp/core/config" + log "github.com/DataDog/datadog-agent/comp/core/log/def" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + serverdebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug" + "github.com/DataDog/datadog-agent/pkg/aggregator/ckey" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) -//func fulfillDeps(t testing.TB, overrides map[string]interface{}) serverdebug.Component { -// return fxutil.Test[serverdebug.Component](t, fx.Options( -// fx.Provide(func() log.Component { return logmock.New(t) }), -// fx.Provide(func() config.Component { -// return config.NewMockWithOverrides(t, overrides) -// }), -// Module(), -// )) -//} - -//func TestDebugStatsSpike(t *testing.T) { -// cfg := make(map[string]interface{}) -// cfg["dogstatsd_logging_enabled"] = false -// debug := fulfillDeps(t, cfg) -// d := debug.(*serverDebugImpl) -// -// assert := assert.New(t) -// -// clk := clock.NewMock() -// d.clock = clk -// -// d.SetMetricStatsEnabled(true) -// sample := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} -// -// send := func(count int) { -// for i := 0; i < count; i++ { -// d.StoreMetricStats(sample) -// } -// } -// -// send(10) -// -// clk.Add(1050 * time.Millisecond) -// send(10) -// -// clk.Add(1050 * time.Millisecond) -// send(10) -// -// clk.Add(1050 * time.Millisecond) -// send(10) -// -// clk.Add(1050 * time.Millisecond) -// send(500) -// -// // stop the debug loop to avoid data race -// d.SetMetricStatsEnabled(false) -// time.Sleep(500 * time.Millisecond) -// -// assert.True(d.hasSpike()) -// -// d.SetMetricStatsEnabled(true) -// // This sleep is necessary as we need to wait for the goroutine function within 'EnableMetricsStats' to start. -// // If we remove the sleep, the debug loop ticker will not be triggered by the clk.Add() call and the 500 samples -// // added with 'send(500)' will be considered as if they had been added in the same second as the previous 500 samples. -// // This will lead to a spike because we have 1000 samples in 1 second instead of having 500 and 500 in 2 different seconds. -// time.Sleep(1050 * time.Millisecond) -// -// clk.Add(1050 * time.Millisecond) -// send(500) -// -// // stop the debug loop to avoid data race -// d.SetMetricStatsEnabled(false) -// time.Sleep(500 * time.Millisecond) -// -// // it is no more considered a spike because we had another second with 500 metrics -// assert.False(d.hasSpike()) -// -//} -// -//func TestDebugStats(t *testing.T) { -// cfg := make(map[string]interface{}) -// cfg["dogstatsd_logging_enabled"] = false -// debug := fulfillDeps(t, cfg) -// d := debug.(*serverDebugImpl) -// -// clk := clock.NewMock() -// d.clock = clk -// -// d.SetMetricStatsEnabled(true) -// -// keygen := ckey.NewKeyGenerator() -// -// // data -// sample1 := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} -// sample2 := metrics.MetricSample{Name: "some.metric2", Tags: []string{"a"}} -// sample3 := metrics.MetricSample{Name: "some.metric3", Tags: make([]string, 0)} -// sample4 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"b", "c"}} -// sample5 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"c", "b"}} -// hash1 := keygen.Generate(sample1.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample1.Tags)) -// hash2 := keygen.Generate(sample2.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample2.Tags)) -// hash3 := keygen.Generate(sample3.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample3.Tags)) -// hash4 := keygen.Generate(sample4.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample4.Tags)) -// hash5 := keygen.Generate(sample5.Name, "", tagset.NewHashingTagsAccumulatorWithTags(sample5.Tags)) -// -// // test ingestion and ingestion time -// d.StoreMetricStats(sample1) -// d.StoreMetricStats(sample2) -// clk.Add(10 * time.Millisecond) -// d.StoreMetricStats(sample1) -// -// data, err := d.GetJSONDebugStats() -// require.NoError(t, err, "cannot get debug stats") -// require.NotNil(t, data) -// require.NotEmpty(t, data) -// -// var stats map[ckey.ContextKey]metricStat -// err = json.Unmarshal(data, &stats) -// require.NoError(t, err, "data is not valid") -// require.Len(t, stats, 2, "two metrics should have been captured") -// -// require.True(t, stats[hash1].LastSeen.After(stats[hash2].LastSeen), "some.metric1 should have appeared again after some.metric2") -// -// d.StoreMetricStats(sample3) -// clk.Add(10 * time.Millisecond) -// d.StoreMetricStats(sample1) -// -// d.StoreMetricStats(sample4) -// d.StoreMetricStats(sample5) -// data, _ = d.GetJSONDebugStats() -// err = json.Unmarshal(data, &stats) -// require.NoError(t, err, "data is not valid") -// require.Len(t, stats, 4, "4 metrics should have been captured") -// -// // test stats array -// metric1 := stats[hash1] -// metric2 := stats[hash2] -// metric3 := stats[hash3] -// metric4 := stats[hash4] -// metric5 := stats[hash5] -// -// require.True(t, metric1.LastSeen.After(metric2.LastSeen), "some.metric1 should have appeared again after some.metric2") -// require.True(t, metric1.LastSeen.After(metric3.LastSeen), "some.metric1 should have appeared again after some.metric3") -// require.True(t, metric3.LastSeen.After(metric2.LastSeen), "some.metric3 should have appeared again after some.metric2") -// -// require.Equal(t, metric1.Count, uint64(3)) -// require.Equal(t, metric2.Count, uint64(1)) -// require.Equal(t, metric3.Count, uint64(1)) -// -// // test context correctness -// require.Equal(t, metric4.Tags, "c b") -// require.Equal(t, metric5.Tags, "c b") -// require.Equal(t, hash4, hash5) -// -//} +func fulfillDeps(t testing.TB, overrides map[string]interface{}) serverdebug.Component { + return fxutil.Test[serverdebug.Component](t, fx.Options( + fx.Provide(func() log.Component { return logmock.New(t) }), + fx.Provide(func() config.Component { + return config.NewMockWithOverrides(t, overrides) + }), + Module(), + )) +} + +func TestDebugStatsSpike(t *testing.T) { + t.Skip("flaky") + cfg := make(map[string]interface{}) + cfg["dogstatsd_logging_enabled"] = false + debug := fulfillDeps(t, cfg) + d := debug.(*serverDebugImpl) + + clk := clock.NewMock() + d.clock = clk + + d.SetMetricStatsEnabled(true) + sample := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} + + send := func(count int) { + for i := 0; i < count; i++ { + d.StoreMetricStats(sample) + } + } + + send(10) + + clk.Add(1050 * time.Millisecond) + send(10) + + clk.Add(1050 * time.Millisecond) + send(10) + + clk.Add(1050 * time.Millisecond) + send(10) + + clk.Add(1050 * time.Millisecond) + send(500) + + // stop the debug loop to avoid data race + d.SetMetricStatsEnabled(false) + time.Sleep(500 * time.Millisecond) + + assert.True(t, d.hasSpike()) + + d.SetMetricStatsEnabled(true) + // This sleep is necessary as we need to wait for the goroutine function within 'EnableMetricsStats' to start. + // If we remove the sleep, the debug loop ticker will not be triggered by the clk.Add() call and the 500 samples + // added with 'send(500)' will be considered as if they had been added in the same second as the previous 500 samples. + // This will lead to a spike because we have 1000 samples in 1 second instead of having 500 and 500 in 2 different seconds. + time.Sleep(1050 * time.Millisecond) + + clk.Add(1050 * time.Millisecond) + send(500) + + // stop the debug loop to avoid data race + d.SetMetricStatsEnabled(false) + time.Sleep(500 * time.Millisecond) + + // it is no more considered a spike because we had another second with 500 metrics + assert.False(t, d.hasSpike()) + +} + +func TestDebugStats(t *testing.T) { + cfg := make(map[string]interface{}) + cfg["dogstatsd_logging_enabled"] = false + debug := fulfillDeps(t, cfg) + d := debug.(*serverDebugImpl) + + clk := clock.NewMock() + d.clock = clk + + d.SetMetricStatsEnabled(true) + + // data + sample1 := metrics.MetricSample{Name: "some.metric1", Tags: make([]string, 0)} + sample2 := metrics.MetricSample{Name: "some.metric2", Tags: []string{"a"}} + sample3 := metrics.MetricSample{Name: "some.metric3", Tags: make([]string, 0)} + sample4 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"b", "c"}} + sample5 := metrics.MetricSample{Name: "some.metric4", Tags: []string{"c", "b"}} + hash1, _ := MakeKey(sample1) + hash2, _ := MakeKey(sample2) + hash3, _ := MakeKey(sample3) + hash4, _ := MakeKey(sample4) + hash5, _ := MakeKey(sample5) + + hash1Key := ckey.ContextKey(hash1) + hash2Key := ckey.ContextKey(hash2) + hash3Key := ckey.ContextKey(hash3) + hash4Key := ckey.ContextKey(hash4) + hash5Key := ckey.ContextKey(hash5) + // test ingestion and ingestion time + d.StoreMetricStats(sample1) + d.StoreMetricStats(sample2) + clk.Add(10 * time.Millisecond) + d.StoreMetricStats(sample1) + + data, err := d.GetJSONDebugStats() + require.NoError(t, err, "cannot get debug stats") + require.NotNil(t, data) + require.NotEmpty(t, data) + + var stats map[ckey.ContextKey]metricStat + err = json.Unmarshal(data, &stats) + require.NoError(t, err, "data is not valid") + require.Len(t, stats, 2, "two metrics should have been captured") + + require.True(t, stats[hash1Key].LastSeen.After(stats[hash2Key].LastSeen), "some.metric1 should have appeared again after some.metric2") + + d.StoreMetricStats(sample3) + clk.Add(10 * time.Millisecond) + d.StoreMetricStats(sample1) + + d.StoreMetricStats(sample4) + d.StoreMetricStats(sample5) + data, _ = d.GetJSONDebugStats() + err = json.Unmarshal(data, &stats) + require.NoError(t, err, "data is not valid") + require.Len(t, stats, 4, "4 metrics should have been captured") + + // test stats array + metric1 := stats[hash1Key] + metric2 := stats[hash2Key] + metric3 := stats[hash3Key] + metric4 := stats[hash4Key] + metric5 := stats[hash5Key] + + require.True(t, metric1.LastSeen.After(metric2.LastSeen), "some.metric1 should have appeared again after some.metric2") + require.True(t, metric1.LastSeen.After(metric3.LastSeen), "some.metric1 should have appeared again after some.metric3") + require.True(t, metric3.LastSeen.After(metric2.LastSeen), "some.metric3 should have appeared again after some.metric2") + + require.Equal(t, metric1.Count, uint64(3)) + require.Equal(t, metric2.Count, uint64(1)) + require.Equal(t, metric3.Count, uint64(1)) + + // test context correctness + require.Equal(t, metric4.Tags, "b c") + require.Equal(t, metric5.Tags, "b c") + require.Equal(t, hash4, hash5) +} func TestFormatDebugStats(t *testing.T) { // Create test data From 1b70be540c0369b59f72c235534809ef9310a758 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Mon, 29 Dec 2025 11:31:36 +0000 Subject: [PATCH 5/8] Adding config option --- .../serverDebug/serverdebugimpltopk/debug.go | 20 ++++++++++++++----- pkg/config/setup/config.go | 2 ++ .../datadog-agent/datadog.yaml | 2 ++ .../datadog-agent/datadog.yaml | 2 ++ .../datadog-agent/datadog.yaml | 2 ++ 5 files changed, 23 insertions(+), 5 deletions(-) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go index 7ab3615a822235..ca6f410691fc77 100644 --- a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -69,8 +69,9 @@ type metricStatsShard struct { } const ( - defaultNumShards = uint64(16) // Power of 2 for efficient modulo operation - maxItems = 63 + // Defaults to use to pass quality gates + defaultNumShards = uint64(2) // Power of 2 for efficient modulo operation + defaultMaxItems = 50 ) type serverDebugImpl struct { @@ -89,6 +90,7 @@ type serverDebugImpl struct { clock clock.Clock // dogstatsdDebugLogger is an instance of the logger config that can be used to create new logger for dogstatsd-stats metrics dogstatsdDebugLogger pkglog.LoggerInterface + maxItems int } // NewServerlessServerDebug creates a new instance of serverDebug.Component @@ -102,7 +104,14 @@ func newServerDebug(deps dependencies) serverdebug.Component { } func newServerDebugCompat(l log.Component, cfg model.Reader) serverdebug.Component { - numShards := defaultNumShards + numShards := uint64(cfg.GetInt("dogstatsd_metrics_stats_num_shards")) + if numShards < 1 { + numShards = defaultNumShards + } + maxItems := cfg.GetInt("dogstatsd_metrics_stats_max_items") + if maxItems < 0 { + maxItems = defaultMaxItems + } sd := &serverDebugImpl{ log: l, enabled: atomic.NewBool(false), @@ -115,12 +124,13 @@ func newServerDebugCompat(l log.Component, cfg model.Reader) serverdebug.Compone clock: clock.New(), shards: make([]*metricStatsShard, numShards), numShards: numShards, + maxItems: maxItems, } // Initialize all shards for i := uint64(0); i < sd.numShards; i++ { sd.shards[i] = &metricStatsShard{ stats: make(map[ckey.ContextKey]*metricStat), - minHeap: make([]*metricStat, 0, maxItems), + minHeap: make([]*metricStat, 0, sd.maxItems), //tagsAccumulator: tagset.NewHashingTagsAccumulator(), } } @@ -225,7 +235,7 @@ func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { shard.heapifyUp(ms.heapIndex) } } else { - if len(shard.stats) < maxItems { + if len(shard.stats) < d.maxItems { newMs := &metricStat{ key: key, Name: sample.Name, diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index e22c627d9d6baf..560e6988814a3e 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1884,6 +1884,8 @@ func dogstatsd(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("dogstatsd_origin_optout_enabled", true) config.BindEnvAndSetDefault("dogstatsd_so_rcvbuf", 0) config.BindEnvAndSetDefault("dogstatsd_metrics_stats_enable", true) + config.BindEnvAndSetDefault("dogstatsd_metrics_stats_num_shards", 2) + config.BindEnvAndSetDefault("dogstatsd_metrics_stats_max_items", 50) config.BindEnvAndSetDefault("dogstatsd_tags", []string{}) config.BindEnvAndSetDefault("dogstatsd_mapper_cache_size", 1000) config.BindEnvAndSetDefault("dogstatsd_string_interner_size", 4096) diff --git a/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml b/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml index 5eb8f41ad25880..568034d3b949b0 100644 --- a/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml +++ b/test/regression/cases/uds_dogstatsd_20mb_12k_contexts_20_senders/datadog-agent/datadog.yaml @@ -13,3 +13,5 @@ telemetry.checks: '*' cloud_provider_metadata: [] dogstatsd_socket: '/tmp/dsd.socket' +dogstatsd_metrics_stats_num_shards: 16 +dogstatsd_metrics_stats_max_items: 100 diff --git a/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml b/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml index 5eb8f41ad25880..568034d3b949b0 100644 --- a/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml +++ b/test/regression/cases/uds_dogstatsd_to_api/datadog-agent/datadog.yaml @@ -13,3 +13,5 @@ telemetry.checks: '*' cloud_provider_metadata: [] dogstatsd_socket: '/tmp/dsd.socket' +dogstatsd_metrics_stats_num_shards: 16 +dogstatsd_metrics_stats_max_items: 100 diff --git a/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml b/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml index 5eb8f41ad25880..568034d3b949b0 100644 --- a/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml +++ b/test/regression/cases/uds_dogstatsd_to_api_v3/datadog-agent/datadog.yaml @@ -13,3 +13,5 @@ telemetry.checks: '*' cloud_provider_metadata: [] dogstatsd_socket: '/tmp/dsd.socket' +dogstatsd_metrics_stats_num_shards: 16 +dogstatsd_metrics_stats_max_items: 100 From 1703421eea7357297fcf0e48ad553e97d2ebcd57 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Wed, 24 Dec 2025 13:06:57 +0000 Subject: [PATCH 6/8] Removing error tracking --- .../serverDebug/serverdebugimpltopk/debug.go | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go index ca6f410691fc77..48f932aa232f7e 100644 --- a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -50,13 +50,13 @@ type dependencies struct { // metricStat holds how many times a metric has been // processed and when was the last time. type metricStat struct { - Name string `json:"name"` - Count uint64 `json:"count"` - LastSeen time.Time `json:"last_seen"` - Tags string `json:"tags"` - key ckey.ContextKey - error uint64 // overestimation bound - heapIndex int // position in minHeap for O(1) lookup + Name string `json:"name"` + Count uint64 `json:"count"` + LastSeen time.Time `json:"last_seen"` + Tags string `json:"tags"` + key ckey.ContextKey + //error uint32 // overestimation bound + heapIndex int // position in minHeap for O(1) lookup } // metricStatsShard holds a subset of metric stats with its own lock @@ -237,10 +237,10 @@ func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { } else { if len(shard.stats) < d.maxItems { newMs := &metricStat{ - key: key, - Name: sample.Name, - Count: 1, - error: 0, + key: key, + Name: sample.Name, + Count: 1, + //error: 0, Tags: tags, LastSeen: now, } @@ -262,7 +262,7 @@ func (d *serverDebugImpl) StoreMetricStats(sample metrics.MetricSample) { minEntry.Name = sample.Name minEntry.Tags = tags minEntry.Count = inheritedCount + 1 - minEntry.error = inheritedCount + //minEntry.error = inheritedCount minEntry.LastSeen = now // heapIndex stays at 0 @@ -291,7 +291,8 @@ func (mss *metricStatsShard) heapifyUp(idx int) { for idx > 0 { parent := (idx - 1) / 2 // Compare by (count - error) for minimum uncertainty - if mss.minHeap[idx].Count-mss.minHeap[idx].error >= mss.minHeap[parent].Count-mss.minHeap[parent].error { + //if mss.minHeap[idx].Count-mss.minHeap[idx].error >= mss.minHeap[parent].Count-mss.minHeap[parent].error { + if mss.minHeap[idx].Count >= mss.minHeap[parent].Count { break } // Swap and update heap indices @@ -309,10 +310,12 @@ func (mss *metricStatsShard) heapifyDown(idx int) { left := 2*idx + 1 right := 2*idx + 2 - if left < n && mss.minHeap[left].Count-mss.minHeap[left].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + //if left < n && mss.minHeap[left].Count-mss.minHeap[left].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + if left < n && mss.minHeap[left].Count < mss.minHeap[smallest].Count { smallest = left } - if right < n && mss.minHeap[right].Count-mss.minHeap[right].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + //if right < n && mss.minHeap[right].Count-mss.minHeap[right].error < mss.minHeap[smallest].Count-mss.minHeap[smallest].error { + if right < n && mss.minHeap[right].Count < mss.minHeap[smallest].Count { smallest = right } From e07e0578a2184165d695f114a41393f97d09049f Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Mon, 29 Dec 2025 11:45:34 +0000 Subject: [PATCH 7/8] fixing struct order --- comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go index 48f932aa232f7e..dcc1e2b00aea61 100644 --- a/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go +++ b/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go @@ -50,11 +50,11 @@ type dependencies struct { // metricStat holds how many times a metric has been // processed and when was the last time. type metricStat struct { + key ckey.ContextKey Name string `json:"name"` + Tags string `json:"tags"` Count uint64 `json:"count"` LastSeen time.Time `json:"last_seen"` - Tags string `json:"tags"` - key ckey.ContextKey //error uint32 // overestimation bound heapIndex int // position in minHeap for O(1) lookup } From 968ff2458e327c1ac8b46c19e6c1d0e554039891 Mon Sep 17 00:00:00 2001 From: Carlos Roman Date: Mon, 16 Feb 2026 10:49:02 +0000 Subject: [PATCH 8/8] fix ci --- .golangci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.golangci.yml b/.golangci.yml index 9211a474aadc3f..5bd6c48daa18b4 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -147,6 +147,7 @@ linters: - "!**/comp/dogstatsd/server/float64_list_pool.go" - "!**/comp/dogstatsd/server/serverless.go" - "!**/comp/dogstatsd/serverDebug/serverdebugimpl/debug.go" + - "!**/comp/dogstatsd/serverDebug/serverdebugimpltopk/debug.go" - "!**/comp/forwarder/connectionsforwarder/impl/connectionsforwarder.go" - "!**/comp/forwarder/defaultforwarder/blocked_endpoints.go" - "!**/comp/forwarder/defaultforwarder/default_forwarder.go"