Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type bundledTransformer struct {
func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []error) {
var errors []error

bundlesByObject := make(map[bundleID]*kubernetesEventBundle)
// Map bundleID to a slice of kubernetesEventBundles, as we can have several bundles for the same object if the event text length exceeds the maximum allowed length.
bundlesByObject := make(map[bundleID][]*kubernetesEventBundle)

for _, event := range events {
if event.InvolvedObject.Kind == "" ||
Expand All @@ -59,13 +60,21 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err

id := buildBundleID(event)

bundle, found := bundlesByObject[id]
bundles, found := bundlesByObject[id]
if !found {
bundle = newKubernetesEventBundler(c.clusterName, event)
bundlesByObject[id] = bundle
bundles = []*kubernetesEventBundle{newKubernetesEventBundler(c.clusterName, event)}
bundlesByObject[id] = bundles
}

err := bundle.addEvent(event)
lastBundle := bundles[len(bundles)-1]
_, fits := lastBundle.fitsEvent(event)
if !fits {
lastBundle = newKubernetesEventBundler(c.clusterName, event)
bundles = append(bundles, lastBundle)
bundlesByObject[id] = bundles
}

err := lastBundle.addEvent(event)
if err != nil {
errors = append(errors, err)
continue
Expand All @@ -74,21 +83,23 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err

datadogEvs := make([]event.Event, 0, len(bundlesByObject))

for id, bundle := range bundlesByObject {
datadogEv, err := bundle.formatEvents(c.taggerInstance)
if err != nil {
errors = append(errors, err)
continue
}
for id, bundles := range bundlesByObject {
for _, bundle := range bundles {
datadogEv, err := bundle.formatEvents(c.taggerInstance)
if err != nil {
errors = append(errors, err)
continue
}

emittedEvents.Inc(
id.kind,
id.evType,
getEventSource(bundle.reportingController, bundle.component),
"true",
)
emittedEvents.Inc(
id.kind,
id.evType,
getEventSource(bundle.reportingController, bundle.component),
"true",
)

datadogEvs = append(datadogEvs, datadogEv)
datadogEvs = append(datadogEvs, datadogEv)
}
}

return datadogEvs, errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ const (
// CheckName is the name of the check
CheckName = "kubernetes_apiserver"

KubeControlPaneCheck = "kube_apiserver_controlplane.up"
eventTokenKey = "event"
maxEventCardinality = 300
defaultResyncPeriodInSecond = 300
defaultTimeoutEventCollection = 2000
KubeControlPaneCheck = "kube_apiserver_controlplane.up"
eventTokenKey = "event"
maxEventCardinality = 300
defaultResyncPeriodInSecond = 300
defaultTimeoutEventCollection = 2000
defaultMaxEstimatedEventTextLength = 3750
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ import (
"github.com/DataDog/datadog-agent/pkg/metrics/event"
)

const (
// This is the maximum allowed length for the estimated event text.
// The final text isn't known until it's formatted by kubernetesEventBundle.formatEventText().
// The Events API limits event text to 4000 characters, so we conservatively limit the estimated text to 3750 characters.
// https://docs.datadoghq.com/api/latest/events/#post-an-event-v1
maxEstimatedEventTextLength = 3750

// bundleFixedOverhead is the formatting overhead for each bundle including:
// - Header: "%%% \n" (5 chars)
// - Footer: " \n\n %%%" (7 chars)
// - Middle: " \n _Events emitted by the <component> seen at <timestamp> since <timestamp>_ \n"
// (timestamps: ~60 chars, static text: ~50 chars)
// Conservative estimate: 250 chars
bundleFixedOverhead = 250
)

type kubernetesEventBundle struct {
involvedObject v1.ObjectReference // Parent object for this event bundle
component string // Used to identify the Kubernetes component which generated the event
Expand All @@ -30,6 +46,7 @@ type kubernetesEventBundle struct {
countByAction map[string]int // Map of count per action to aggregate several events from the same ObjUid in one event
alertType event.AlertType // The Datadog event type
hostInfo eventHostInfo // Host information extracted from the event, where applicable
estimatedSize int // Track cumulative estimated bundle size to prevent exceeding API limit
}

func newKubernetesEventBundler(clusterName string, event *v1.Event) *kubernetesEventBundle {
Expand All @@ -40,6 +57,7 @@ func newKubernetesEventBundler(clusterName string, event *v1.Event) *kubernetesE
countByAction: make(map[string]int),
alertType: getDDAlertType(event.Type),
hostInfo: getEventHostInfo(clusterName, event),
estimatedSize: bundleFixedOverhead + len(event.Source.Component),
}
}

Expand All @@ -48,6 +66,11 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error {
return fmt.Errorf("mismatching Object UIDs: %s != %s", event.InvolvedObject.UID, b.involvedObject.UID)
}

eventText, fits := b.fitsEvent(event)
if !fits {
return fmt.Errorf("event text length exceeds the maximum allowed length: %d > %d", len(eventText), maxEstimatedEventTextLength)
}

// We do not process the events in chronological order necessarily.
// We only care about the first time they occurred, the last time and the count.
if event.FirstTimestamp.IsZero() {
Expand All @@ -62,7 +85,13 @@ func (b *kubernetesEventBundle) addEvent(event *v1.Event) error {
b.lastTimestamp = math.Max(b.lastTimestamp, float64(event.LastTimestamp.Unix()))
}

b.countByAction[fmt.Sprintf("**%s**: %s\n", event.Reason, event.Message)] += int(event.Count)
// If we haven't seen this action before, add the overhead for the new action
previousCount := b.countByAction[eventText]
if previousCount == 0 {
b.estimatedSize += estimateEventOverhead(eventText)
}

b.countByAction[eventText] += int(event.Count)

return nil
}
Expand Down Expand Up @@ -115,6 +144,17 @@ func (b *kubernetesEventBundle) formatEventText() string {
return eventText
}

func (b *kubernetesEventBundle) fitsEvent(event *v1.Event) (string, bool) {
eventText := "**" + event.Reason + "**: " + event.Message + "\n"

// If we haven't seen this action before, and adding it would probably exceed the limit, deny it
if b.countByAction[eventText] == 0 && (b.estimatedSize+estimateEventOverhead(eventText) > maxEstimatedEventTextLength) {
return "", false
}

return eventText, true
}

func formatStringIntMap(input map[string]int) string {
parts := make([]string, 0, len(input))
keys := make([]string, 0, len(input))
Expand All @@ -132,3 +172,12 @@ func formatStringIntMap(input map[string]int) string {

return strings.Join(parts, " ")
}

// estimateEventOverhead calculates the overhead for a single event in the bundle
// including count representation, space, and the event text
func estimateEventOverhead(eventText string) int {
// Count: worst case 10 digits (max int32 ~2 billion) +
// Space separator: 1 char +
// Event text
return 10 + 1 + len(eventText)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kubernetesapiserver

import (
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -254,3 +255,68 @@ func TestEventsTagging(t *testing.T) {
})
}
}

func TestKubernetesEventBundle_fitsEvent(t *testing.T) {
reallyLongMessage := strings.Repeat("a", 3700)
mediumMessage1 := strings.Repeat("b", 1500)
mediumMessage2 := strings.Repeat("c", 1500)
mediumMessage3 := strings.Repeat("d", 1500)

tests := []struct {
name string
events []*v1.Event
expectedEventsFits []bool
}{
{
name: "event text length does not exceed the maximum allowed length",
events: []*v1.Event{createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600)},
expectedEventsFits: []bool{true},
},
{
name: "event text length exceeds the maximum allowed length",
events: []*v1.Event{
createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", "Scaled up replica set nginx-b49f5958c to 1", "Normal", 709662600),
createEvent(1, "default", "nginx", "Deployment", "b85978f5-2bf2-413f-9611-0b433d2cbf30", "deployment-controller", "deployment-controller", "", "ScalingReplicaSet", reallyLongMessage, "Normal", 709662600),
},
expectedEventsFits: []bool{true, false},
},
{
name: "multiple medium events exceed cumulative limit",
events: []*v1.Event{
createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage1, "Normal", 100),
createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage2, "Normal", 100),
createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100),
},
expectedEventsFits: []bool{true, true, false},
},
{
name: "duplicate events do not exceed cumulative limit",
events: []*v1.Event{
createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100),
createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100),
createEvent(1, "default", "pod", "Pod", "uid1", "scheduler", "scheduler", "", "Reason", mediumMessage3, "Normal", 100),
},
expectedEventsFits: []bool{true, true, true},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fmt.Println(tt.name)
assert.Equal(t, len(tt.events), len(tt.expectedEventsFits))

bundle := newKubernetesEventBundler("", tt.events[0])
for i, ev := range tt.events {
_, fits := bundle.fitsEvent(ev)
assert.Equal(t, tt.expectedEventsFits[i], fits)

if fits {
err := bundle.addEvent(ev)
assert.NoError(t, err)
}

fmt.Println(bundle.estimatedSize)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
fixes:
- |
Fixed a bug where long Kubernetes event bundles were being truncated by dogweb.
Loading