Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 179 additions & 8 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/schahal/datadog-agent/pkg/dogstatsd/parser"

"github.com/influxdata/telegraf/plugins/parsers/graphite"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -58,17 +61,27 @@ type Statsd struct {
Percentiles []int
PercentileLimit int

DeleteGauges bool
DeleteCounters bool
DeleteSets bool
DeleteTimings bool
ConvertNames bool
DeleteGauges bool
DeleteCounters bool
DeleteSets bool
DeleteTimings bool
DeleteDSDEvents bool
ConvertNames bool

// MetricSeparator is the separator between parts of the metric name.
MetricSeparator string
// This flag enables parsing of tags in the dogstatsd extension to the
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool
// This flag enables parsing of events in the dogstatsd extension to the
// statsd protocol. 'ParseDataDogTags' must also be set to 'true for this.
// (https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events)
// Will set the title and text as values, and rest as tags
ParseDataDogEvents bool
// If set, value of this event tag will be used as measurement name
EventMeasurementNameTag string
// Datadog events will fallback/default to this measurement name
DefaultEventMeasurement string

// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
Expand Down Expand Up @@ -103,6 +116,9 @@ type Statsd struct {
sets map[string]cachedset
timings map[string]cachedtimings

// needed if dogstatsd (datadog) events are to be aggregated
dsdevents map[string]cacheddsdevent

// bucket -> influx templates
Templates []string

Expand Down Expand Up @@ -154,7 +170,8 @@ type cachedset struct {
}

type cachedgauge struct {
name string
name string

fields map[string]interface{}
tags map[string]string
}
Expand All @@ -171,6 +188,13 @@ type cachedtimings struct {
tags map[string]string
}

type cacheddsdevent struct {
name string
fields map[string]interface{}
tags map[string]string
ts time.Time
}

func (_ *Statsd) Description() string {
return "Statsd UDP/TCP Server"
}
Expand Down Expand Up @@ -215,6 +239,17 @@ const sampleConfig = `
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false

## Parses events in the datadog statsd format to statsd protocol,
## by making 'title' and 'text' fields and the rest as tags.
## parse_data_dog_tags must also be set to true for this.
## https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events
parse_data_dog_events = false
## If set, value of this event tag will be used as measurement name
event_measurement_name_tag = "measurement_name"
## Datadog events will fallback/default to this measurement name
## if no value given for event_measurement_name_tag value's tag key
default_event_measurement = "dogstatsd_events"

## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
# templates = [
Expand Down Expand Up @@ -293,6 +328,13 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
s.sets = make(map[string]cachedset)
}

for _, metric := range s.dsdevents {
acc.AddFields(metric.name, metric.fields, metric.tags, metric.ts)
}
if s.DeleteDSDEvents {
s.dsdevents = make(map[string]cacheddsdevent)
}

return nil
}

Expand All @@ -302,6 +344,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
s.dsdevents = make(map[string]cacheddsdevent)

s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -466,22 +509,52 @@ func (s *Statsd) parser() error {
}
}

// parseStatsdLine will parse the given statsd line, validating it as it goes.
// isDogStatsDEvent will check the passed in segment to validate
// that it starts with the dogstatsd event indicator as described in
// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#events
func (s *Statsd) isDogStatsDEvent(segment string) bool {
// dogstatsd events begin with this
dsdEventIndicator := "_e{"
l := len(dsdEventIndicator)

if len(segment) > l && segment[:l] == dsdEventIndicator {
return true
}

return false
}

// parseStatsdLine will parse the given statsd ine, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()

lineTags := make(map[string]string)
if s.ParseDataDogTags {

recombinedSegments := make([]string, 0)
// datadog tags look like this:
// users.online:1|c|@0.5|#country:china,environment:production
// users.online:1|c|#sometagwithnovalue
// we will split on the pipe and remove any elements that are datadog
// tags, parse them, and rebuild the line sans the datadog tags
pipesplit := strings.Split(line, "|")
for _, segment := range pipesplit {
for i, segment := range pipesplit {
if s.ParseDataDogEvents && i == 0 && s.isDogStatsDEvent(segment) {
event, err := parser.ParseEventMessage([]byte(line), "")
if err != nil {
log.Printf("Dogstatsd: error parsing event: %s", err)
continue
}

// at this point, take the event and convert to statsd metric
e, eHash := s.transformDSDEvent(event)

s.dsdevents[eHash] = e
return nil
}

if len(segment) > 0 && segment[0] == '#' {
// we have ourselves a tag; they are comma separated
tagstr := segment[1:]
Expand Down Expand Up @@ -690,6 +763,103 @@ func parseKeyValue(keyvalue string) (string, string) {
return key, val
}

// transformDSDEvent takes a dd metrics event and transforms it into the
// canononical metric struct
func (s *Statsd) transformDSDEvent(event *metrics.Event) (cacheddsdevent, string) {
e := cacheddsdevent{}

fields := make(map[string]interface{})
tags := make(map[string]string)

// populate fields
if event.Title != "" {
fields["msg_title"] = event.Title
}
if event.Text != "" {
fields["msg_text"] = event.Text
}

// populate tags
if event.AlertType != "" {
tags["alert_type"] = string(event.AlertType)
}
if event.Priority != "" {
tags["priority"] = string(event.Priority)
}
if event.AggregationKey != "" {
tags["aggregation_key"] = event.AggregationKey
}
if event.SourceTypeName != "" {
tags["source_type_name"] = event.SourceTypeName
}
if event.EventType != "" {
tags["event_type"] = event.EventType
}
if event.Tags != nil {
lineTags := s.ddTagMap(event.Tags)
for k, v := range lineTags {
tags[k] = v
}
}

// measurement name is determinded by tag value of key
// specified in config, otherwise it defaults to fallback value
if measurement, ok := tags[s.EventMeasurementNameTag]; ok {
e.name = measurement
} else {
e.name = s.DefaultEventMeasurement
}
e.tags = tags
e.fields = fields

// set the timestamp (otherwise we may lose events because Gather()
// will set to same timestamp
if event.Ts > 0 {
e.ts = time.Unix(event.Ts, 0)
} else {
// if none is given, set to "right now"
e.ts = time.Now()
}

// Make a unique key for the measurement name/tags
var tg []string
for k, v := range e.tags {
tg = append(tg, k+"="+v)
}
sort.Strings(tg)
tg = append(tg, e.name, e.ts.String())
hash := strings.Join(tg, "")

return e, hash
}

// takes a slice of dd tags (e.g., ["dc:us-east", "tag2", "state:ny"), and
// returns a map of all tag key-value pairs
//
// assumes k-v are separated by colon (:)
func (s *Statsd) ddTagMap(tags []string) map[string]string {
lineTags := make(map[string]string)

for _, tag := range tags {
ts := strings.SplitN(tag, ":", 2)
var k, v string
switch len(ts) {
case 1:
// just a tag
k = ts[0]
v = ""
case 2:
k = ts[0]
v = ts[1]
}
if k != "" {
lineTags[k] = v
}
}

return lineTags
}

// aggregate takes in a metric. It then
// aggregates and caches the current value(s). It does not deal with the
// Delete* options, because those are dealt with in the Gather function.
Expand Down Expand Up @@ -899,6 +1069,7 @@ func init() {
DeleteGauges: true,
DeleteSets: true,
DeleteTimings: true,
DeleteDSDEvents: true,
}
})
}
44 changes: 44 additions & 0 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func NewTestStatsd() *Statsd {
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
s.dsdevents = make(map[string]cacheddsdevent)

s.MetricSeparator = "_"

Expand Down Expand Up @@ -918,6 +919,45 @@ func TestParse_DataDogTags(t *testing.T) {
}
}

// Test string looks like a dogstatsd event
func TestIsDogStatsDEvent(t *testing.T) {
s := NewTestStatsd()
s.ParseDataDogTags = true
s.ParseDataDogEvents = true

lines := map[string]bool{
"_e{21,36}:An exception occurred|Cannot parse CSV file from 10.0.0.17|t:warning|#err_type:bad_file": true,
"_e{4,8}:test|some msg|t:info|#dc:us-west": true,
"_ee{21,36}:An exception occurred|Cannot parse CSV file from 10.0.0.17|t:warning|#err_type:bad_file": false,
"_z{4,8}:test|some msg|t:info|#dc:us-west": false,
}

for line, expected := range lines {
if s.isDogStatsDEvent(line) != expected {
t.Errorf("Error, this line expected to be a %t dogstatsd event: %s", expected, line)
}
}
}

// Test if slice of datadog tags return a map of all tag key-value pairs
func TestDDTagMap(t *testing.T) {
s := NewTestStatsd()

ddTags := []string{"dc:us-east", "tag2", "state:ny"}

tags := s.ddTagMap(ddTags)

expected := map[string]string{
"dc": "us-east",
"tag2": "",
"state": "ny",
}

if len(tags) != len(expected) || tags["dc"] != expected["dc"] || tags["tag2"] != expected["tag2"] || tags["state"] != expected["state"] {
t.Errorf("Expected: %v, got %v", expected, tags)
}
}

func tagsForItem(m interface{}) map[string]string {
switch m.(type) {
case map[string]cachedcounter:
Expand All @@ -936,6 +976,10 @@ func tagsForItem(m interface{}) map[string]string {
for _, v := range m.(map[string]cachedtimings) {
return v.tags
}
case map[string]cacheddsdevent:
for _, v := range m.(map[string]cacheddsdevent) {
return v.tags
}
}
return nil
}
Expand Down