From a2430d54d0da3b648f62e3ed6714cc367753054b Mon Sep 17 00:00:00 2001 From: Jon Rosario Date: Wed, 10 Dec 2025 16:14:56 -0500 Subject: [PATCH 1/6] Limit K8s event bundle messages to an estimated length of maxEstimatedEventTextLength = 3750 --- .../kubernetesapiserver/bundled_events.go | 47 ++++++++++++------- .../kubernetes_apiserver.go | 11 +++-- .../kubernetes_eventbundle.go | 23 ++++++++- .../kubernetes_eventbundle_test.go | 35 ++++++++++++++ 4 files changed, 92 insertions(+), 24 deletions(-) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go index d8387f6be05073..84ea757310df36 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go @@ -33,7 +33,8 @@ type bundledTransformer struct { func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []error) { var errors []error - bundlesByObject := make(map[bundleID]*kubernetesEventBundle) + // Map bundleID to a slice of kubernetesEventBundles, as we can have several bundles for the same object if the event text length exceeds the maximum allowed length. + bundlesByObject := make(map[bundleID][]*kubernetesEventBundle) for _, event := range events { if event.InvolvedObject.Kind == "" || @@ -59,13 +60,21 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err id := buildBundleID(event) - bundle, found := bundlesByObject[id] + bundles, found := bundlesByObject[id] if !found { - bundle = newKubernetesEventBundler(c.clusterName, event) - bundlesByObject[id] = bundle + bundles = []*kubernetesEventBundle{newKubernetesEventBundler(c.clusterName, event)} + bundlesByObject[id] = bundles } - err := bundle.addEvent(event) + lastBundle := bundles[len(bundles)-1] + _, fits := lastBundle.fitsEvent(event) + if !fits { + bundles = append(bundles, newKubernetesEventBundler(c.clusterName, event)) + lastBundle = bundles[len(bundles)-1] + bundlesByObject[id] = bundles + } + + err := lastBundle.addEvent(event) if err != nil { errors = append(errors, err) continue @@ -74,21 +83,23 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err datadogEvs := make([]event.Event, 0, len(bundlesByObject)) - for id, bundle := range bundlesByObject { - datadogEv, err := bundle.formatEvents(c.taggerInstance) - if err != nil { - errors = append(errors, err) - continue - } + for id, bundles := range bundlesByObject { + for _, bundle := range bundles { + datadogEv, err := bundle.formatEvents(c.taggerInstance) + if err != nil { + errors = append(errors, err) + continue + } - emittedEvents.Inc( - id.kind, - id.evType, - getEventSource(bundle.reportingController, bundle.component), - "true", - ) + emittedEvents.Inc( + id.kind, + id.evType, + getEventSource(bundle.reportingController, bundle.component), + "true", + ) - datadogEvs = append(datadogEvs, datadogEv) + datadogEvs = append(datadogEvs, datadogEv) + } } return datadogEvs, errors diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go index 395d2f268be3d0..8c8c18d669171c 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go @@ -40,11 +40,12 @@ const ( // CheckName is the name of the check CheckName = "kubernetes_apiserver" - KubeControlPaneCheck = "kube_apiserver_controlplane.up" - eventTokenKey = "event" - maxEventCardinality = 300 - defaultResyncPeriodInSecond = 300 - defaultTimeoutEventCollection = 2000 + KubeControlPaneCheck = "kube_apiserver_controlplane.up" + eventTokenKey = "event" + maxEventCardinality = 300 + defaultResyncPeriodInSecond = 300 + defaultTimeoutEventCollection = 2000 + defaultMaxEstimatedEventTextLength = 3750 ) var ( diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go index 8a68d8348b3557..d21ebe4c4da94c 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go @@ -21,6 +21,14 @@ import ( "github.com/DataDog/datadog-agent/pkg/metrics/event" ) +const ( + // This is the maximum allowed length for the estimated event text. + // It's an estimate because the actual length of the event text is not known until the event is formatted by kubernetesEventBundle.formatEventText() + // Since dogweb enforced a limit of 4000 characters, we conservatively limit the estimated event text length to 3750 characters. + // See: https://github.com/DataDog/dogweb/blob/ddaf4c0ac06839f45edc19e056b2eccdc012edbc/dogweb/model/event/event.py#L41 + maxEstimatedEventTextLength = 3750 +) + type kubernetesEventBundle struct { involvedObject v1.ObjectReference // Parent object for this event bundle component string // Used to identify the Kubernetes component which generated the event @@ -48,6 +56,11 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error { return fmt.Errorf("mismatching Object UIDs: %s != %s", event.InvolvedObject.UID, b.involvedObject.UID) } + eventText, fits := b.fitsEvent(event) + if !fits { + return fmt.Errorf("event text length exceeds the maximum allowed length: %d > %d", len(eventText), maxEstimatedEventTextLength) + } + // We do not process the events in chronological order necessarily. // We only care about the first time they occurred, the last time and the count. if event.FirstTimestamp.IsZero() { @@ -62,7 +75,7 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error { b.lastTimestamp = math.Max(b.lastTimestamp, float64(event.LastTimestamp.Unix())) } - b.countByAction[fmt.Sprintf("**%s**: %s\n", event.Reason, event.Message)] += int(event.Count) + b.countByAction[eventText] += int(event.Count) return nil } @@ -115,6 +128,14 @@ func (b *kubernetesEventBundle) formatEventText() string { return eventText } +func (b *kubernetesEventBundle) fitsEvent(event *v1.Event) (string, bool) { + eventText := fmt.Sprintf("**%s**: %s\n", event.Reason, event.Message) + if len(eventText) > maxEstimatedEventTextLength { + return "", false + } + return eventText, true +} + func formatStringIntMap(input map[string]int) string { parts := make([]string, 0, len(input)) keys := make([]string, 0, len(input)) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go index ad1c7eab5cd80b..3810b8f23f9a52 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go @@ -8,6 +8,7 @@ package kubernetesapiserver import ( "fmt" + "strings" "testing" "time" @@ -254,3 +255,37 @@ func TestEventsTagging(t *testing.T) { }) } } + +func TestKubernetesEventBundle_fitsEvent(t *testing.T) { + tests := []struct { + name string + events []*v1.Event + expectedEventsFits []bool + }{ + { + name: "event text length does not exceed the maximum allowed length", + events: []*v1.Event{createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600)}, + expectedEventsFits: []bool{true}, + }, + { + name: "event text length exceeds the maximum allowed length", + events: []*v1.Event{ + createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600), + createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", strings.Repeat("a", maxEstimatedEventTextLength), "Normal", 709662600), + }, + expectedEventsFits: []bool{true, false}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, len(tt.events), len(tt.expectedEventsFits)) + + bundle := newKubernetesEventBundler("", tt.events[0]) + for i, ev := range tt.events { + _, fits := bundle.fitsEvent(ev) + assert.Equal(t, tt.expectedEventsFits[i], fits) + } + }) + } +} From 2cd1a392a5d019ff5b54f5c8883c9e31618e6495 Mon Sep 17 00:00:00 2001 From: Jon Rosario Date: Thu, 11 Dec 2025 09:55:04 -0500 Subject: [PATCH 2/6] Add release note for limiting K8s event bundle messages by estimated length --- ...ssage-by-an-estimated-length-47c1ddaba913680f.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml diff --git a/releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml b/releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml new file mode 100644 index 00000000000000..88ffe8102fad63 --- /dev/null +++ b/releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + Fixed a bug where long Kubernetes event bundles were being truncated by dogweb. From 980f9ed83dc8c46e345fdd5d235229efff011ab4 Mon Sep 17 00:00:00 2001 From: Jon Rosario Date: Fri, 12 Dec 2025 09:01:53 -0500 Subject: [PATCH 3/6] Update maxEstimatedEventTextLength comment --- .../cluster/kubernetesapiserver/kubernetes_eventbundle.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go index d21ebe4c4da94c..7eff1635fcd8e9 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go @@ -23,9 +23,9 @@ import ( const ( // This is the maximum allowed length for the estimated event text. - // It's an estimate because the actual length of the event text is not known until the event is formatted by kubernetesEventBundle.formatEventText() - // Since dogweb enforced a limit of 4000 characters, we conservatively limit the estimated event text length to 3750 characters. - // See: https://github.com/DataDog/dogweb/blob/ddaf4c0ac06839f45edc19e056b2eccdc012edbc/dogweb/model/event/event.py#L41 + // The final text isn't known until it's formatted by kubernetesEventBundle.formatEventText(). + // The Events API limits event text to 4000 characters, so we conservatively limit the estimated text to 3750 characters. + // https://docs.datadoghq.com/api/latest/events/#post-an-event-v1 maxEstimatedEventTextLength = 3750 ) From d85c02f17e2e175fdc35d65cbd8c17c3718a9c28 Mon Sep 17 00:00:00 2001 From: Jon Rosario Date: Fri, 12 Dec 2025 10:45:14 -0500 Subject: [PATCH 4/6] Update bundled_events.go --- .../corechecks/cluster/kubernetesapiserver/bundled_events.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go index 84ea757310df36..d8ead40b3fe383 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go @@ -69,8 +69,8 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err lastBundle := bundles[len(bundles)-1] _, fits := lastBundle.fitsEvent(event) if !fits { - bundles = append(bundles, newKubernetesEventBundler(c.clusterName, event)) - lastBundle = bundles[len(bundles)-1] + lastBundle = newKubernetesEventBundler(c.clusterName, event) + bundles = append(bundles, lastBundle) bundlesByObject[id] = bundles } From f0fef0b357d5527335762d39a91b0925e0bce33c Mon Sep 17 00:00:00 2001 From: Jon Rosario Date: Fri, 2 Jan 2026 10:52:39 -0500 Subject: [PATCH 5/6] Replace fmt.Sprintf usage --- .../cluster/kubernetesapiserver/kubernetes_eventbundle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go index d21ebe4c4da94c..d7bd9727339cb6 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go @@ -129,7 +129,7 @@ func (b *kubernetesEventBundle) formatEventText() string { } func (b *kubernetesEventBundle) fitsEvent(event *v1.Event) (string, bool) { - eventText := fmt.Sprintf("**%s**: %s\n", event.Reason, event.Message) + eventText := "**" + event.Reason + "**: " + event.Message + "\n" if len(eventText) > maxEstimatedEventTextLength { return "", false } From d6d6757359ab136769c5352b80e3dc9b21fab1b6 Mon Sep 17 00:00:00 2001 From: Jon Rosario Date: Wed, 7 Jan 2026 16:47:38 -0500 Subject: [PATCH 6/6] Accumulate estimated text length --- .../kubernetes_eventbundle.go | 30 ++++++++++++++++- .../kubernetes_eventbundle_test.go | 33 ++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go index 79792728e532f7..7dd7062b3a29d5 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go @@ -27,6 +27,14 @@ const ( // The Events API limits event text to 4000 characters, so we conservatively limit the estimated text to 3750 characters. // https://docs.datadoghq.com/api/latest/events/#post-an-event-v1 maxEstimatedEventTextLength = 3750 + + // bundleFixedOverhead is the formatting overhead for each bundle including: + // - Header: "%%% \n" (5 chars) + // - Footer: " \n\n %%%" (7 chars) + // - Middle: " \n _Events emitted by the seen at since _ \n" + // (timestamps: ~60 chars, static text: ~50 chars) + // Conservative estimate: 250 chars + bundleFixedOverhead = 250 ) type kubernetesEventBundle struct { @@ -38,6 +46,7 @@ type kubernetesEventBundle struct { countByAction map[string]int // Map of count per action to aggregate several events from the same ObjUid in one event alertType event.AlertType // The Datadog event type hostInfo eventHostInfo // Host information extracted from the event, where applicable + estimatedSize int // Track cumulative estimated bundle size to prevent exceeding API limit } func newKubernetesEventBundler(clusterName string, event *v1.Event) *kubernetesEventBundle { @@ -48,6 +57,7 @@ func newKubernetesEventBundler(clusterName string, event *v1.Event) *kubernetesE countByAction: make(map[string]int), alertType: getDDAlertType(event.Type), hostInfo: getEventHostInfo(clusterName, event), + estimatedSize: bundleFixedOverhead + len(event.Source.Component), } } @@ -75,6 +85,12 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error { b.lastTimestamp = math.Max(b.lastTimestamp, float64(event.LastTimestamp.Unix())) } + // If we haven't seen this action before, add the overhead for the new action + previousCount := b.countByAction[eventText] + if previousCount == 0 { + b.estimatedSize += estimateEventOverhead(eventText) + } + b.countByAction[eventText] += int(event.Count) return nil @@ -130,9 +146,12 @@ func (b *kubernetesEventBundle) formatEventText() string { func (b *kubernetesEventBundle) fitsEvent(event *v1.Event) (string, bool) { eventText := "**" + event.Reason + "**: " + event.Message + "\n" - if len(eventText) > maxEstimatedEventTextLength { + + // If we haven't seen this action before, and adding it would probably exceed the limit, deny it + if b.countByAction[eventText] == 0 && (b.estimatedSize+estimateEventOverhead(eventText) > maxEstimatedEventTextLength) { return "", false } + return eventText, true } @@ -153,3 +172,12 @@ func formatStringIntMap(input map[string]int) string { return strings.Join(parts, " ") } + +// estimateEventOverhead calculates the overhead for a single event in the bundle +// including count representation, space, and the event text +func estimateEventOverhead(eventText string) int { + // Count: worst case 10 digits (max int32 ~2 billion) + + // Space separator: 1 char + + // Event text + return 10 + 1 + len(eventText) +} diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go index 3810b8f23f9a52..881bf0210a4477 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go @@ -257,6 +257,11 @@ func TestEventsTagging(t *testing.T) { } func TestKubernetesEventBundle_fitsEvent(t *testing.T) { + reallyLongMessage := strings.Repeat("a", 3700) + mediumMessage1 := strings.Repeat("b", 1500) + mediumMessage2 := strings.Repeat("c", 1500) + mediumMessage3 := strings.Repeat("d", 1500) + tests := []struct { name string events []*v1.Event @@ -271,20 +276,46 @@ func TestKubernetesEventBundle_fitsEvent(t *testing.T) { name: "event text length exceeds the maximum allowed length", events: []*v1.Event{ createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600), - createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", strings.Repeat("a", maxEstimatedEventTextLength), "Normal", 709662600), + createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", reallyLongMessage, "Normal", 709662600), }, expectedEventsFits: []bool{true, false}, }, + { + name: "multiple medium events exceed cumulative limit", + events: []*v1.Event{ + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage1, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage2, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + }, + expectedEventsFits: []bool{true, true, false}, + }, + { + name: "duplicate events do not exceed cumulative limit", + events: []*v1.Event{ + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + }, + expectedEventsFits: []bool{true, true, true}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + fmt.Println(tt.name) assert.Equal(t, len(tt.events), len(tt.expectedEventsFits)) bundle := newKubernetesEventBundler("", tt.events[0]) for i, ev := range tt.events { _, fits := bundle.fitsEvent(ev) assert.Equal(t, tt.expectedEventsFits[i], fits) + + if fits { + err := bundle.addEvent(ev) + assert.NoError(t, err) + } + + fmt.Println(bundle.estimatedSize) } }) }