diff --git a/flusher.go b/flusher.go index 367db725c..24c5d2cb6 100644 --- a/flusher.go +++ b/flusher.go @@ -102,7 +102,7 @@ func (s *Server) FlushLocal(ctx context.Context) { // we cannot do this until we're done using tempMetrics within this function, // since not everything in tempMetrics is safe for sharing - go s.flushForward(span.Attach(ctx), tempMetrics) + go s.flushForward(span.Attach(ctx), s.Tags, tempMetrics) go func() { for _, p := range s.getPlugins() { @@ -345,7 +345,7 @@ func (s *Server) flushPart(ctx context.Context, metricSlice []samplers.DDMetric, }, "flush", true) } -func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { +func (s *Server) flushForward(ctx context.Context, tags []string, wms []WorkerMetrics) { span, _ := trace.StartSpanFromContext(ctx, "") defer span.Finish() jmLength := 0 @@ -368,6 +368,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { }).Error("Could not export metric") continue } + jm.Tags = append(jm.Tags, tags...) jsonMetrics = append(jsonMetrics, jm) } for _, histo := range wm.histograms { @@ -380,6 +381,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { }).Error("Could not export metric") continue } + jm.Tags = append(jm.Tags, tags...) jsonMetrics = append(jsonMetrics, jm) } for _, set := range wm.sets { @@ -392,6 +394,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { }).Error("Could not export metric") continue } + jm.Tags = append(jm.Tags, tags...) jsonMetrics = append(jsonMetrics, jm) } for _, timer := range wm.timers { @@ -406,6 +409,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) { } // the exporter doesn't know that these two are "different" jm.Type = "timer" + jm.Tags = append(jm.Tags, tags...) jsonMetrics = append(jsonMetrics, jm) } } diff --git a/server_test.go b/server_test.go index 5b9dbb6df..031eb7573 100644 --- a/server_test.go +++ b/server_test.go @@ -187,6 +187,7 @@ type fixture struct { api *httptest.Server server Server ddmetrics chan DDMetricsRequest + importmetrics chan []samplers.JSONMetric interval time.Duration flushMaxPerBody int } @@ -196,26 +197,49 @@ func newFixture(t *testing.T, config Config) *fixture { assert.NoError(t, err) // Set up a remote server (the API that we're sending the data to) - // (e.g. Datadog) - f := &fixture{nil, Server{}, make(chan DDMetricsRequest, 10), interval, config.FlushMaxPerBody} + // (e.g. Datadog) and our own /import + f := &fixture{nil, Server{}, make(chan DDMetricsRequest, 10), make(chan []samplers.JSONMetric, 10), interval, config.FlushMaxPerBody} f.api = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - zr, err := zlib.NewReader(r.Body) - if err != nil { - t.Fatal(err) - } - var ddmetrics DDMetricsRequest + if r.URL.Path == "/api/v1/series" { + zr, err := zlib.NewReader(r.Body) + if err != nil { + t.Fatal(err) + } - err = json.NewDecoder(zr).Decode(&ddmetrics) - if err != nil { - t.Fatal(err) - } + var ddmetrics DDMetricsRequest - f.ddmetrics <- ddmetrics + err = json.NewDecoder(zr).Decode(&ddmetrics) + if err != nil { + t.Fatal(err) + } + + f.ddmetrics <- ddmetrics + } else if r.URL.Path == "/import" { + rz, err := zlib.NewReader(r.Body) + if err != nil { + t.Fatal(err) + } + + var importmetrics []samplers.JSONMetric + err = json.NewDecoder(rz).Decode(&importmetrics) + if err != nil { + t.Fatal(err) + } + + f.importmetrics <- importmetrics + } else { + t.Fatalf("Got unexpected HTTP request to %s", r.URL.Path) + } w.WriteHeader(http.StatusAccepted) })) config.APIHostname = f.api.URL + // If the config is aiming to do it's own import, let's replace the config + // with that of our fixture. + if config.ForwardAddress == "http://localhost" { + config.ForwardAddress = f.api.URL + } config.NumWorkers = 1 f.server = setupVeneurServer(t, config, nil) return f @@ -223,7 +247,7 @@ func newFixture(t *testing.T, config Config) *fixture { func (f *fixture) Close() { // make Close safe to call multiple times - if f.ddmetrics == nil { + if f.ddmetrics == nil && f.importmetrics == nil { return } @@ -231,6 +255,7 @@ func (f *fixture) Close() { f.server.Shutdown() close(f.ddmetrics) f.ddmetrics = nil + f.importmetrics = nil } // TestLocalServerUnaggregatedMetrics tests the behavior of @@ -288,6 +313,32 @@ func TestGlobalServerFlush(t *testing.T) { assertMetrics(t, ddmetrics, expectedMetrics) } +func TestLocalAddsImportTags(t *testing.T) { + metricValues, _ := generateMetrics() + config := localConfig() + config.Tags = []string{"foo:bar"} + f := newFixture(t, config) + defer f.Close() + + for _, value := range metricValues { + f.server.Workers[0].ProcessMetric(&samplers.UDPMetric{ + MetricKey: samplers.MetricKey{ + Name: "a.b.c", + Type: "histogram", + }, + Value: value, + Digest: 12345, + SampleRate: 1.0, + }) + } + + f.server.Flush() + + importmetrics := <-f.importmetrics + assert.Equal(t, 1, len(importmetrics), 1, "import received the histogram") + assert.Equal(t, "foo:bar", importmetrics[0].Tags[0], "imported histogram has sender's tags") +} + func TestLocalServerMixedMetrics(t *testing.T) { // The exact gob stream that we will receive might differ, so we can't // test against the bytestream directly. But the two streams should unmarshal