diff --git a/CHANGELOG.md b/CHANGELOG.md index ef01bf952..1294dfc1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -# 1.8, pending +# 1.8.0, pending + +## Improvements +* Veneur no longer **requires** the use of Datadog as a target for flushes. Veneur can now use one or more of any of it's supported sinks as a backend. This realizes our desire for Veneur to be fully vendor agnostic. Thanks [gphat](https://github.com/gphat)! ## Bugfixes * Fix a panic when using `veneur-emit` to emit metrics via `-ssf` when no tags are specified. Thanks [myndzi](https://github.com/myndzi) diff --git a/README.md b/README.md index 015ca599d..a6fb49a7d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![Build Status](https://travis-ci.org/stripe/veneur.svg?branch=master)](https://travis-ci.org/stripe/veneur) [![GoDoc](https://godoc.org/github.com/stripe/veneur?status.svg)](http://godoc.org/github.com/stripe/veneur) -Veneur (venn-urr) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the [DogStatsD protocol](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) for aggregating metrics and sending them to downstream storage, typically [Datadog](http://datadoghq.com). It can also act as a [global aggregator](#global-aggregation) for histograms, sets and counters. +Veneur (venn-urr) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the [DogStatsD protocol](http://docs.datadoghq.com/guides/dogstatsd/#datagram-format) or [SSF](https://github.com/stripe/veneur/tree/master/ssf) for aggregating metrics and sending them to downstream storage to one or more supported sinks. It can also act as a [global aggregator](#global-aggregation) for histograms, sets and counters. More generically, Veneur is a convenient sink for various observability primitives. diff --git a/flusher.go b/flusher.go index 7871e2a67..bc4b15568 100644 --- a/flusher.go +++ b/flusher.go @@ -11,6 +11,7 @@ import ( "net/http" "net/url" "runtime" + "sync" "time" "github.com/DataDog/datadog-go/statsd" @@ -19,10 +20,8 @@ import ( "github.com/stripe/veneur/trace" ) -const DatadogResourceKey = "resource" - // Flush collects sampler's metrics and passes them to sinks. -func (s *Server) Flush() { +func (s *Server) Flush(ctx context.Context) { span := tracer.StartSpan("flush").(*trace.Span) defer span.Finish() @@ -33,93 +32,57 @@ func (s *Server) Flush() { s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0) s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0) - // right now we have only one destination plugin - // but eventually, this is where we would loop over our supported - // destinations - if s.IsLocal() { - s.FlushLocal(span.Attach(context.Background())) - } else { - s.FlushGlobal(span.Attach(context.Background())) - } -} - -// FlushGlobal sends any global metrics to their destination. -func (s *Server) FlushGlobal(ctx context.Context) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.Finish() - + // TODO Why is this not in the worker the way the trace worker is set up? events, checks := s.EventWorker.Flush() s.Statsd.Count("worker.events_flushed_total", int64(len(events)), nil, 1.0) s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) - go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately - go s.flushTraces(span.Attach(ctx)) // this too! - - percentiles := s.HistogramPercentiles - - tempMetrics, ms := s.tallyMetrics(percentiles) - - // the global veneur instance is also responsible for reporting the sets - // and global counters - ms.totalLength += ms.totalSets - ms.totalLength += ms.totalGlobalCounters - - finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) - - s.reportMetricsFlushCounts(ms) - - s.reportGlobalMetricsFlushCounts(ms) - - go func() { - for _, p := range s.getPlugins() { - start := time.Now() - err := p.Flush(span.Attach(ctx), finalMetrics) - s.Statsd.TimeInMilliseconds(fmt.Sprintf("flush.plugins.%s.total_duration_ns", p.Name()), float64(time.Since(start).Nanoseconds()), []string{"part:post"}, 1.0) - if err != nil { - countName := fmt.Sprintf("flush.plugins.%s.error_total", p.Name()) - s.Statsd.Count(countName, 1, []string{}, 1.0) - } - s.Statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0) - } - }() - - // TODO Don't hardcode this - s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics) -} - -// FlushLocal takes the slices of metrics, combines then and marshals them to json -// for posting to Datadog. -func (s *Server) FlushLocal(ctx context.Context) { - span, _ := trace.StartSpanFromContext(ctx, "") - defer span.Finish() - - events, checks := s.EventWorker.Flush() - s.Statsd.Count("worker.checks_flushed_total", int64(len(checks)), nil, 1.0) + // TODO Concurrency + for _, sink := range s.metricSinks { + sink.FlushEventsChecks(span.Attach(ctx), events, checks) + } - go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately go s.flushTraces(span.Attach(ctx)) // don't publish percentiles if we're a local veneur; that's the global // veneur's job var percentiles []float64 + var finalMetrics []samplers.InterMetric + + if !s.IsLocal() { + percentiles = s.HistogramPercentiles + } tempMetrics, ms := s.tallyMetrics(percentiles) - finalMetrics := s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) + finalMetrics = s.generateInterMetrics(span.Attach(ctx), percentiles, tempMetrics, ms) s.reportMetricsFlushCounts(ms) - // we don't report totalHistograms, totalSets, or totalTimers for local veneur instances - - // we cannot do this until we're done using tempMetrics within this function, - // since not everything in tempMetrics is safe for sharing - go s.flushForward(span.Attach(ctx), tempMetrics) + if s.IsLocal() { + go s.flushForward(span.Attach(ctx), tempMetrics) + } else { + s.reportGlobalMetricsFlushCounts(ms) + } // If there's nothing to flush, don't bother calling the plugins and stuff. if len(finalMetrics) == 0 { return } + wg := sync.WaitGroup{} + for _, sink := range s.metricSinks { + wg.Add(1) + go func(ms metricSink) { + err := ms.Flush(span.Attach(ctx), finalMetrics) + if err != nil { + log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushing sink") + } + wg.Done() + }(sink) + } + wg.Wait() + go func() { for _, p := range s.getPlugins() { start := time.Now() @@ -132,9 +95,6 @@ func (s *Server) FlushLocal(ctx context.Context) { s.Statsd.Gauge(fmt.Sprintf("flush.plugins.%s.post_metrics_total", p.Name()), float64(len(finalMetrics)), nil, 1.0) } }() - - // TODO Don't harcode this - s.metricSinks[0].Flush(span.Attach(ctx), finalMetrics) } type metricsSummary struct { @@ -195,6 +155,13 @@ func (s *Server) tallyMetrics(percentiles []float64) ([]WorkerMetrics, metricsSu // 'local-only' histograms. ms.totalLocalSets + (ms.totalLocalTimers+ms.totalLocalHistograms)*(s.HistogramAggregates.Count+len(s.HistogramPercentiles)) + // Global instances also flush sets and global counters, so be sure and add + // them to the total size + if !s.IsLocal() { + ms.totalLength += ms.totalSets + ms.totalLength += ms.totalGlobalCounters + } + return tempMetrics, ms } diff --git a/flusher_test.go b/flusher_test.go index 0f63f0b22..da1fab50d 100644 --- a/flusher_test.go +++ b/flusher_test.go @@ -1,6 +1,7 @@ package veneur import ( + "context" "encoding/json" "fmt" "io" @@ -174,7 +175,7 @@ func testFlushTraceDatadog(t *testing.T, protobuf, jsn io.Reader) { assert.NoError(t, err) server.HandleTracePacket(packet) - server.Flush() + server.Flush(context.Background()) // wait for remoteServer to process the POST select { @@ -208,5 +209,5 @@ func testFlushTraceLightstep(t *testing.T, protobuf, jsn io.Reader) { server.HandleTracePacket(packet) assert.NoError(t, err) - server.Flush() + server.Flush(context.Background()) } diff --git a/metric_sink.go b/metric_sink.go index 810f575b6..bab064e58 100644 --- a/metric_sink.go +++ b/metric_sink.go @@ -14,8 +14,13 @@ import ( "github.com/stripe/veneur/trace" ) +const DatadogResourceKey = "resource" + type metricSink interface { Name() string + // Flush receives `InterMetric`s from Veneur and is responsible for "sinking" + // these metrics to whatever it's backend wants. Note that the sink must + // **not** mutate the incoming metrics as they are shared with other sinks. Flush(context.Context, []samplers.InterMetric) error // This one is temporary? FlushEventsChecks(ctx context.Context, events []samplers.UDPEvent, checks []samplers.UDPServiceCheck) diff --git a/plugin_test.go b/plugin_test.go index 65614850b..adf36197b 100644 --- a/plugin_test.go +++ b/plugin_test.go @@ -82,7 +82,7 @@ func TestGlobalServerPluginFlush(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.Background()) } // TestLocalFilePluginRegister tests that we are able to register @@ -164,7 +164,7 @@ func TestGlobalServerS3PluginFlush(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.Background()) } func parseGzipTSV(r io.Reader) ([][]string, error) { diff --git a/server.go b/server.go index 2d2c783d9..97ed3a70b 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package veneur import ( "bufio" "bytes" + "context" "crypto/tls" "crypto/x509" "crypto/x509/pkix" @@ -442,7 +443,7 @@ func (s *Server) Start() { ticker.Stop() return case <-ticker.C: - s.Flush() + s.Flush(context.TODO()) } } }() diff --git a/server_test.go b/server_test.go index 0cc7d61d6..cfb48d243 100644 --- a/server_test.go +++ b/server_test.go @@ -3,6 +3,7 @@ package veneur import ( "bytes" "compress/zlib" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -252,7 +253,7 @@ func TestLocalServerUnaggregatedMetrics(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) ddmetrics := <-f.ddmetrics assert.Equal(t, 6, len(ddmetrics.Series), "incorrect number of elements in the flushed series on the remote server") @@ -280,7 +281,7 @@ func TestGlobalServerFlush(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) ddmetrics := <-f.ddmetrics assert.Equal(t, len(expectedMetrics), len(ddmetrics.Series), "incorrect number of elements in the flushed series on the remote server") @@ -390,7 +391,7 @@ func TestLocalServerMixedMetrics(t *testing.T) { }) } - f.server.Flush() + f.server.Flush(context.TODO()) // the global veneur instance should get valid data td := <-globalTD @@ -540,7 +541,7 @@ func sendTCPMetrics(addr string, tlsConfig *tls.Config, f *fixture) error { // check that the server received the stats; HACK: sleep to ensure workers process before flush time.Sleep(20 * time.Millisecond) - f.server.Flush() + f.server.Flush(context.TODO()) select { case ddmetrics := <-f.ddmetrics: if len(ddmetrics.Series) != 1 {