From 2d642c8cfbe2ed8560a5b1fb4839fbffcd6bd73c Mon Sep 17 00:00:00 2001 From: Satbir Chahal Date: Thu, 17 Jan 2019 12:40:17 -0800 Subject: [PATCH] Initial support for dogstatsd events sent to statsd --- plugins/inputs/statsd/statsd.go | 187 +++++++++++++++++++++++++-- plugins/inputs/statsd/statsd_test.go | 44 +++++++ 2 files changed, 223 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 6b0dd0b7883ba..0b50b0d3fb53e 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -13,6 +13,9 @@ import ( "sync" "time" + "github.com/DataDog/datadog-agent/pkg/metrics" + "github.com/schahal/datadog-agent/pkg/dogstatsd/parser" + "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf" @@ -58,17 +61,27 @@ type Statsd struct { Percentiles []int PercentileLimit int - DeleteGauges bool - DeleteCounters bool - DeleteSets bool - DeleteTimings bool - ConvertNames bool + DeleteGauges bool + DeleteCounters bool + DeleteSets bool + DeleteTimings bool + DeleteDSDEvents bool + ConvertNames bool // MetricSeparator is the separator between parts of the metric name. MetricSeparator string // This flag enables parsing of tags in the dogstatsd extension to the // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) ParseDataDogTags bool + // This flag enables parsing of events in the dogstatsd extension to the + // statsd protocol. 'ParseDataDogTags' must also be set to 'true for this. + // (https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events) + // Will set the title and text as values, and rest as tags + ParseDataDogEvents bool + // If set, value of this event tag will be used as measurement name + EventMeasurementNameTag string + // Datadog events will fallback/default to this measurement name + DefaultEventMeasurement string // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need @@ -103,6 +116,9 @@ type Statsd struct { sets map[string]cachedset timings map[string]cachedtimings + // needed if dogstatsd (datadog) events are to be aggregated + dsdevents map[string]cacheddsdevent + // bucket -> influx templates Templates []string @@ -154,7 +170,8 @@ type cachedset struct { } type cachedgauge struct { - name string + name string + fields map[string]interface{} tags map[string]string } @@ -171,6 +188,13 @@ type cachedtimings struct { tags map[string]string } +type cacheddsdevent struct { + name string + fields map[string]interface{} + tags map[string]string + ts time.Time +} + func (_ *Statsd) Description() string { return "Statsd UDP/TCP Server" } @@ -215,6 +239,17 @@ const sampleConfig = ` ## http://docs.datadoghq.com/guides/dogstatsd/ parse_data_dog_tags = false + ## Parses events in the datadog statsd format to statsd protocol, + ## by making 'title' and 'text' fields and the rest as tags. + ## parse_data_dog_tags must also be set to true for this. + ## https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events + parse_data_dog_events = false + ## If set, value of this event tag will be used as measurement name + event_measurement_name_tag = "measurement_name" + ## Datadog events will fallback/default to this measurement name + ## if no value given for event_measurement_name_tag value's tag key + default_event_measurement = "dogstatsd_events" + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md # templates = [ @@ -293,6 +328,13 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { s.sets = make(map[string]cachedset) } + for _, metric := range s.dsdevents { + acc.AddFields(metric.name, metric.fields, metric.tags, metric.ts) + } + if s.DeleteDSDEvents { + s.dsdevents = make(map[string]cacheddsdevent) + } + return nil } @@ -302,6 +344,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) + s.dsdevents = make(map[string]cacheddsdevent) s.Lock() defer s.Unlock() @@ -466,7 +509,22 @@ func (s *Statsd) parser() error { } } -// parseStatsdLine will parse the given statsd line, validating it as it goes. +// isDogStatsDEvent will check the passed in segment to validate +// that it starts with the dogstatsd event indicator as described in +// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events +func (s *Statsd) isDogStatsDEvent(segment string) bool { + // dogstatsd events begin with this + dsdEventIndicator := "_e{" + l := len(dsdEventIndicator) + + if len(segment) > l && segment[:l] == dsdEventIndicator { + return true + } + + return false +} + +// parseStatsdLine will parse the given statsd ine, validating it as it goes. // If the line is valid, it will be cached for the next call to Gather() func (s *Statsd) parseStatsdLine(line string) error { s.Lock() @@ -474,6 +532,7 @@ func (s *Statsd) parseStatsdLine(line string) error { lineTags := make(map[string]string) if s.ParseDataDogTags { + recombinedSegments := make([]string, 0) // datadog tags look like this: // users.online:1|c|@0.5|#country:china,environment:production @@ -481,7 +540,21 @@ func (s *Statsd) parseStatsdLine(line string) error { // we will split on the pipe and remove any elements that are datadog // tags, parse them, and rebuild the line sans the datadog tags pipesplit := strings.Split(line, "|") - for _, segment := range pipesplit { + for i, segment := range pipesplit { + if s.ParseDataDogEvents && i == 0 && s.isDogStatsDEvent(segment) { + event, err := parser.ParseEventMessage([]byte(line), "") + if err != nil { + log.Printf("Dogstatsd: error parsing event: %s", err) + continue + } + + // at this point, take the event and convert to statsd metric + e, eHash := s.transformDSDEvent(event) + + s.dsdevents[eHash] = e + return nil + } + if len(segment) > 0 && segment[0] == '#' { // we have ourselves a tag; they are comma separated tagstr := segment[1:] @@ -690,6 +763,103 @@ func parseKeyValue(keyvalue string) (string, string) { return key, val } +// transformDSDEvent takes a dd metrics event and transforms it into the +// canononical metric struct +func (s *Statsd) transformDSDEvent(event *metrics.Event) (cacheddsdevent, string) { + e := cacheddsdevent{} + + fields := make(map[string]interface{}) + tags := make(map[string]string) + + // populate fields + if event.Title != "" { + fields["msg_title"] = event.Title + } + if event.Text != "" { + fields["msg_text"] = event.Text + } + + // populate tags + if event.AlertType != "" { + tags["alert_type"] = string(event.AlertType) + } + if event.Priority != "" { + tags["priority"] = string(event.Priority) + } + if event.AggregationKey != "" { + tags["aggregation_key"] = event.AggregationKey + } + if event.SourceTypeName != "" { + tags["source_type_name"] = event.SourceTypeName + } + if event.EventType != "" { + tags["event_type"] = event.EventType + } + if event.Tags != nil { + lineTags := s.ddTagMap(event.Tags) + for k, v := range lineTags { + tags[k] = v + } + } + + // measurement name is determinded by tag value of key + // specified in config, otherwise it defaults to fallback value + if measurement, ok := tags[s.EventMeasurementNameTag]; ok { + e.name = measurement + } else { + e.name = s.DefaultEventMeasurement + } + e.tags = tags + e.fields = fields + + // set the timestamp (otherwise we may lose events because Gather() + // will set to same timestamp + if event.Ts > 0 { + e.ts = time.Unix(event.Ts, 0) + } else { + // if none is given, set to "right now" + e.ts = time.Now() + } + + // Make a unique key for the measurement name/tags + var tg []string + for k, v := range e.tags { + tg = append(tg, k+"="+v) + } + sort.Strings(tg) + tg = append(tg, e.name, e.ts.String()) + hash := strings.Join(tg, "") + + return e, hash +} + +// takes a slice of dd tags (e.g., ["dc:us-east", "tag2", "state:ny"), and +// returns a map of all tag key-value pairs +// +// assumes k-v are separated by colon (:) +func (s *Statsd) ddTagMap(tags []string) map[string]string { + lineTags := make(map[string]string) + + for _, tag := range tags { + ts := strings.SplitN(tag, ":", 2) + var k, v string + switch len(ts) { + case 1: + // just a tag + k = ts[0] + v = "" + case 2: + k = ts[0] + v = ts[1] + } + if k != "" { + lineTags[k] = v + } + } + + return lineTags +} + // aggregate takes in a metric. It then // aggregates and caches the current value(s). It does not deal with the // Delete* options, because those are dealt with in the Gather function. @@ -899,6 +1069,7 @@ func init() { DeleteGauges: true, DeleteSets: true, DeleteTimings: true, + DeleteDSDEvents: true, } }) } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 1e50c8341f7d1..1e9b8b6188e9a 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -40,6 +40,7 @@ func NewTestStatsd() *Statsd { s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) + s.dsdevents = make(map[string]cacheddsdevent) s.MetricSeparator = "_" @@ -918,6 +919,45 @@ func TestParse_DataDogTags(t *testing.T) { } } +// Test string looks like a dogstatsd event +func TestIsDogStatsDEvent(t *testing.T) { + s := NewTestStatsd() + s.ParseDataDogTags = true + s.ParseDataDogEvents = true + + lines := map[string]bool{ + "_e{21,36}:An exception occurred|Cannot parse CSV file from 10.0.0.17|t:warning|#err_type:bad_file": true, + "_e{4,8}:test|some msg|t:info|#dc:us-west": true, + "_ee{21,36}:An exception occurred|Cannot parse CSV file from 10.0.0.17|t:warning|#err_type:bad_file": false, + "_z{4,8}:test|some msg|t:info|#dc:us-west": false, + } + + for line, expected := range lines { + if s.isDogStatsDEvent(line) != expected { + t.Errorf("Error, this line expected to be a %t dogstatsd event: %s", expected, line) + } + } +} + +// Test if slice of datadog tags return a map of all tag key-value pairs +func TestDDTagMap(t *testing.T) { + s := NewTestStatsd() + + ddTags := []string{"dc:us-east", "tag2", "state:ny"} + + tags := s.ddTagMap(ddTags) + + expected := map[string]string{ + "dc": "us-east", + "tag2": "", + "state": "ny", + } + + if len(tags) != len(expected) || tags["dc"] != expected["dc"] || tags["tag2"] != expected["tag2"] || tags["state"] != expected["state"] { + t.Errorf("Expected: %v, got %v", expected, tags) + } +} + func tagsForItem(m interface{}) map[string]string { switch m.(type) { case map[string]cachedcounter: @@ -936,6 +976,10 @@ func tagsForItem(m interface{}) map[string]string { for _, v := range m.(map[string]cachedtimings) { return v.tags } + case map[string]cacheddsdevent: + for _, v := range m.(map[string]cacheddsdevent) { + return v.tags + } } return nil }