diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go index d8387f6be05073..d8ead40b3fe383 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/bundled_events.go @@ -33,7 +33,8 @@ type bundledTransformer struct { func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []error) { var errors []error - bundlesByObject := make(map[bundleID]*kubernetesEventBundle) + // Map bundleID to a slice of kubernetesEventBundles, as we can have several bundles for the same object if the event text length exceeds the maximum allowed length. + bundlesByObject := make(map[bundleID][]*kubernetesEventBundle) for _, event := range events { if event.InvolvedObject.Kind == "" || @@ -59,13 +60,21 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err id := buildBundleID(event) - bundle, found := bundlesByObject[id] + bundles, found := bundlesByObject[id] if !found { - bundle = newKubernetesEventBundler(c.clusterName, event) - bundlesByObject[id] = bundle + bundles = []*kubernetesEventBundle{newKubernetesEventBundler(c.clusterName, event)} + bundlesByObject[id] = bundles } - err := bundle.addEvent(event) + lastBundle := bundles[len(bundles)-1] + _, fits := lastBundle.fitsEvent(event) + if !fits { + lastBundle = newKubernetesEventBundler(c.clusterName, event) + bundles = append(bundles, lastBundle) + bundlesByObject[id] = bundles + } + + err := lastBundle.addEvent(event) if err != nil { errors = append(errors, err) continue @@ -74,21 +83,23 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err datadogEvs := make([]event.Event, 0, len(bundlesByObject)) - for id, bundle := range bundlesByObject { - datadogEv, err := bundle.formatEvents(c.taggerInstance) - if err != nil { - errors = append(errors, err) - continue - } + for id, bundles := range bundlesByObject { + for _, bundle := range bundles { + datadogEv, err := bundle.formatEvents(c.taggerInstance) + if err != nil { + errors = append(errors, err) + continue + } - emittedEvents.Inc( - id.kind, - id.evType, - getEventSource(bundle.reportingController, bundle.component), - "true", - ) + emittedEvents.Inc( + id.kind, + id.evType, + getEventSource(bundle.reportingController, bundle.component), + "true", + ) - datadogEvs = append(datadogEvs, datadogEv) + datadogEvs = append(datadogEvs, datadogEv) + } } return datadogEvs, errors diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go index 395d2f268be3d0..8c8c18d669171c 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go @@ -40,11 +40,12 @@ const ( // CheckName is the name of the check CheckName = "kubernetes_apiserver" - KubeControlPaneCheck = "kube_apiserver_controlplane.up" - eventTokenKey = "event" - maxEventCardinality = 300 - defaultResyncPeriodInSecond = 300 - defaultTimeoutEventCollection = 2000 + KubeControlPaneCheck = "kube_apiserver_controlplane.up" + eventTokenKey = "event" + maxEventCardinality = 300 + defaultResyncPeriodInSecond = 300 + defaultTimeoutEventCollection = 2000 + defaultMaxEstimatedEventTextLength = 3750 ) var ( diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go index 8a68d8348b3557..7dd7062b3a29d5 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle.go @@ -21,6 +21,22 @@ import ( "github.com/DataDog/datadog-agent/pkg/metrics/event" ) +const ( + // This is the maximum allowed length for the estimated event text. + // The final text isn't known until it's formatted by kubernetesEventBundle.formatEventText(). + // The Events API limits event text to 4000 characters, so we conservatively limit the estimated text to 3750 characters. + // https://docs.datadoghq.com/api/latest/events/#post-an-event-v1 + maxEstimatedEventTextLength = 3750 + + // bundleFixedOverhead is the formatting overhead for each bundle including: + // - Header: "%%% \n" (5 chars) + // - Footer: " \n\n %%%" (7 chars) + // - Middle: " \n _Events emitted by the seen at since _ \n" + // (timestamps: ~60 chars, static text: ~50 chars) + // Conservative estimate: 250 chars + bundleFixedOverhead = 250 +) + type kubernetesEventBundle struct { involvedObject v1.ObjectReference // Parent object for this event bundle component string // Used to identify the Kubernetes component which generated the event @@ -30,6 +46,7 @@ type kubernetesEventBundle struct { countByAction map[string]int // Map of count per action to aggregate several events from the same ObjUid in one event alertType event.AlertType // The Datadog event type hostInfo eventHostInfo // Host information extracted from the event, where applicable + estimatedSize int // Track cumulative estimated bundle size to prevent exceeding API limit } func newKubernetesEventBundler(clusterName string, event *v1.Event) *kubernetesEventBundle { @@ -40,6 +57,7 @@ func newKubernetesEventBundler(clusterName string, event *v1.Event) *kubernetesE countByAction: make(map[string]int), alertType: getDDAlertType(event.Type), hostInfo: getEventHostInfo(clusterName, event), + estimatedSize: bundleFixedOverhead + len(event.Source.Component), } } @@ -48,6 +66,11 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error { return fmt.Errorf("mismatching Object UIDs: %s != %s", event.InvolvedObject.UID, b.involvedObject.UID) } + eventText, fits := b.fitsEvent(event) + if !fits { + return fmt.Errorf("event text length exceeds the maximum allowed length: %d > %d", len(eventText), maxEstimatedEventTextLength) + } + // We do not process the events in chronological order necessarily. // We only care about the first time they occurred, the last time and the count. if event.FirstTimestamp.IsZero() { @@ -62,7 +85,13 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error { b.lastTimestamp = math.Max(b.lastTimestamp, float64(event.LastTimestamp.Unix())) } - b.countByAction[fmt.Sprintf("**%s**: %s\n", event.Reason, event.Message)] += int(event.Count) + // If we haven't seen this action before, add the overhead for the new action + previousCount := b.countByAction[eventText] + if previousCount == 0 { + b.estimatedSize += estimateEventOverhead(eventText) + } + + b.countByAction[eventText] += int(event.Count) return nil } @@ -115,6 +144,17 @@ func (b *kubernetesEventBundle) formatEventText() string { return eventText } +func (b *kubernetesEventBundle) fitsEvent(event *v1.Event) (string, bool) { + eventText := "**" + event.Reason + "**: " + event.Message + "\n" + + // If we haven't seen this action before, and adding it would probably exceed the limit, deny it + if b.countByAction[eventText] == 0 && (b.estimatedSize+estimateEventOverhead(eventText) > maxEstimatedEventTextLength) { + return "", false + } + + return eventText, true +} + func formatStringIntMap(input map[string]int) string { parts := make([]string, 0, len(input)) keys := make([]string, 0, len(input)) @@ -132,3 +172,12 @@ func formatStringIntMap(input map[string]int) string { return strings.Join(parts, " ") } + +// estimateEventOverhead calculates the overhead for a single event in the bundle +// including count representation, space, and the event text +func estimateEventOverhead(eventText string) int { + // Count: worst case 10 digits (max int32 ~2 billion) + + // Space separator: 1 char + + // Event text + return 10 + 1 + len(eventText) +} diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go index ad1c7eab5cd80b..881bf0210a4477 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_eventbundle_test.go @@ -8,6 +8,7 @@ package kubernetesapiserver import ( "fmt" + "strings" "testing" "time" @@ -254,3 +255,68 @@ func TestEventsTagging(t *testing.T) { }) } } + +func TestKubernetesEventBundle_fitsEvent(t *testing.T) { + reallyLongMessage := strings.Repeat("a", 3700) + mediumMessage1 := strings.Repeat("b", 1500) + mediumMessage2 := strings.Repeat("c", 1500) + mediumMessage3 := strings.Repeat("d", 1500) + + tests := []struct { + name string + events []*v1.Event + expectedEventsFits []bool + }{ + { + name: "event text length does not exceed the maximum allowed length", + events: []*v1.Event{createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600)}, + expectedEventsFits: []bool{true}, + }, + { + name: "event text length exceeds the maximum allowed length", + events: []*v1.Event{ + createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600), + createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", reallyLongMessage, "Normal", 709662600), + }, + expectedEventsFits: []bool{true, false}, + }, + { + name: "multiple medium events exceed cumulative limit", + events: []*v1.Event{ + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage1, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage2, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + }, + expectedEventsFits: []bool{true, true, false}, + }, + { + name: "duplicate events do not exceed cumulative limit", + events: []*v1.Event{ + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100), + }, + expectedEventsFits: []bool{true, true, true}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fmt.Println(tt.name) + assert.Equal(t, len(tt.events), len(tt.expectedEventsFits)) + + bundle := newKubernetesEventBundler("", tt.events[0]) + for i, ev := range tt.events { + _, fits := bundle.fitsEvent(ev) + assert.Equal(t, tt.expectedEventsFits[i], fits) + + if fits { + err := bundle.addEvent(ev) + assert.NoError(t, err) + } + + fmt.Println(bundle.estimatedSize) + } + }) + } +} diff --git a/releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml b/releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml new file mode 100644 index 00000000000000..88ffe8102fad63 --- /dev/null +++ b/releasenotes/notes/Limit-K8s-event-bundle-message-by-an-estimated-length-47c1ddaba913680f.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + Fixed a bug where long Kubernetes event bundles were being truncated by dogweb.