From 3d4f7221978f95faa1ca47c92faf4326b8383ea5 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Mon, 13 Jan 2025 10:48:28 -0500 Subject: [PATCH 1/7] [promotel] Implement scraper from prometheus.Gatherer --- scrape/manager_test.go | 46 ++++++++-------- scrape/scrape.go | 116 ++++++++++++++++++++++++++++++++++++++--- scrape/scrape_test.go | 70 ++++++++++++++++--------- 3 files changed, 179 insertions(+), 53 deletions(-) diff --git a/scrape/manager_test.go b/scrape/manager_test.go index c8d9bd6980d..495e76ac482 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "net/http" - "net/http/httptest" "net/url" "os" "strconv" @@ -780,29 +779,30 @@ func TestManagerCTZeroIngestion(t *testing.T) { once := sync.Once{} // Start fake HTTP target to that allow one scrape only. - server := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fail := true - once.Do(func() { - fail = false - w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) - - ctrType := dto.MetricType_COUNTER - w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ - Name: proto.String(mName), - Type: &ctrType, - Metric: []*dto.Metric{{Counter: tc.counterSample}}, - })) - }) - - if fail { - w.WriteHeader(http.StatusInternalServerError) - } - }), - ) - defer server.Close() - serverURL, err := url.Parse(server.URL) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fail := true + once.Do(func() { + fail = false + w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + + ctrType := dto.MetricType_COUNTER + w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ + Name: proto.String(mName), + Type: &ctrType, + Metric: []*dto.Metric{{Counter: tc.counterSample}}, + })) + }) + + if fail { + w.WriteHeader(http.StatusInternalServerError) + } + }) + // This enables scraper to read metrics from the handler directly without making HTTP request + SetDefaultGathererHandler(handler) + defer SetDefaultGathererHandler(nil) + + serverURL, err := url.Parse("http://not-started:8080") require.NoError(t, err) // Add fake target directly into tsets + reload. Normally users would use diff --git a/scrape/scrape.go b/scrape/scrape.go index ccb068b6805..6687bd66838 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -27,6 +27,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/go-kit/log" @@ -36,6 +37,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/version" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" @@ -318,14 +321,14 @@ func (sp *scrapePool) restartLoops(reuseCache bool) { t := sp.activeTargets[fp] interval, timeout, err := t.intervalAndTimeout(interval, timeout) var ( - s = &targetScraper{ + s = newScraper(&targetScraper{ Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader(sp.config.ScrapeProtocols), acceptEncodingHeader: acceptEncodingHeader(enableCompression), - } + }) newLoop = sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, @@ -462,7 +465,7 @@ func (sp *scrapePool) sync(targets []*Target) { // for every target. var err error interval, timeout, err = t.intervalAndTimeout(interval, timeout) - s := &targetScraper{ + s := newScraper(&targetScraper{ Target: t, client: sp.client, timeout: timeout, @@ -470,7 +473,7 @@ func (sp *scrapePool) sync(targets []*Target) { acceptHeader: acceptHeader(sp.config.ScrapeProtocols), acceptEncodingHeader: acceptEncodingHeader(enableCompression), metrics: sp.metrics, - } + }) l := sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, @@ -709,6 +712,13 @@ type targetScraper struct { metrics *scrapeMetrics } +func newScraper(ts *targetScraper) scraper { + if handler := GetDefaultGathererHandler(); handler != nil { + return &gathererScraper{ts, handler} + } + return ts +} + var errBodySizeLimit = errors.New("body size limit exceeded") // acceptHeader transforms preference from the options into specific header values as @@ -735,7 +745,7 @@ func acceptEncodingHeader(enableCompression bool) string { var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version) -func (s *targetScraper) scrape(ctx context.Context) (*http.Response, error) { +func (s *targetScraper) scrapeRequest() (*http.Request, error) { if s.req == nil { req, err := http.NewRequest(http.MethodGet, s.URL().String(), nil) if err != nil { @@ -748,8 +758,15 @@ func (s *targetScraper) scrape(ctx context.Context) (*http.Response, error) { s.req = req } + return s.req, nil +} - return s.client.Do(s.req.WithContext(ctx)) +func (s *targetScraper) scrape(ctx context.Context) (*http.Response, error) { + req, err := s.scrapeRequest() + if err != nil { + return nil, err + } + return s.client.Do(req.WithContext(ctx)) } func (s *targetScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) { @@ -1996,3 +2013,90 @@ func pickSchema(bucketFactor float64) int32 { return int32(floor) } } + +// Scraper implementation that fetches metrics data from Gatherer http.Handler. +type gathererScraper struct { + *targetScraper + h http.Handler +} + +type scrapeResult struct { + resp *http.Response + err error +} + +func (gs *gathererScraper) scrape(ctx context.Context) (*http.Response, error) { + resCh := make(chan scrapeResult, 1) + go func() { + defer close(resCh) + req, err := gs.scrapeRequest() + if err != nil { + resCh <- scrapeResult{nil, err} + return + } + w := newResponseWriter(req) + if gs.h != nil { + gs.h.ServeHTTP(w, req) + } + resCh <- scrapeResult{w.response, nil} + }() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r := <-resCh: + return r.resp, r.err + } +} + +type responseWriter struct { + http.ResponseWriter + response *http.Response + // Writes to response body + w io.Writer +} + +func newResponseWriter(req *http.Request) *responseWriter { + buf := new(bytes.Buffer) + + return &responseWriter{ + w: io.Writer(buf), + response: &http.Response{ + Status: http.StatusText(http.StatusOK), + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(buf), + Request: req, + }, + } +} + +func (rw *responseWriter) Header() http.Header { + return rw.response.Header +} + +func (rw *responseWriter) Write(data []byte) (int, error) { + return rw.w.Write(data) +} + +func (rw *responseWriter) WriteHeader(statusCode int) { + rw.response.StatusCode = statusCode + rw.response.Status = fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)) +} + +var defaultGathererHandler atomic.Pointer[http.Handler] + +// This enables scraper to read metrics from the handler directly without making HTTP request +func SetDefaultGathererHandler(h http.Handler) { + defaultGathererHandler.Store(&h) +} + +func SetDefaultGatherer(g prometheus.Gatherer) { + SetDefaultGathererHandler(promhttp.HandlerFor(g, promhttp.HandlerOpts{})) +} + +func GetDefaultGathererHandler() http.Handler { + if h := defaultGathererHandler.Load(); h != nil { + return *h + } + return nil +} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index c5a0b8b831e..42a7813300d 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2333,6 +2333,20 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { require.Equal(t, 0, seriesAdded) } +const useGathererHandler = true + +func newHTTPTestServer(handler http.Handler) *httptest.Server { + if useGathererHandler { + server := httptest.NewUnstartedServer(handler) + server.URL = "http://not-started:8080" + SetDefaultGathererHandler(handler) + return server + } + server := httptest.NewServer(handler) + SetDefaultGathererHandler(nil) + return server +} + func TestTargetScraperScrapeOK(t *testing.T) { const ( configTimeout = 1500 * time.Millisecond @@ -2341,7 +2355,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { var protobufParsing bool - server := httptest.NewServer( + server := newHTTPTestServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if protobufParsing { accept := r.Header.Get("Accept") @@ -2357,6 +2371,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { }), ) defer server.Close() + defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { @@ -2364,7 +2379,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { } runTest := func(acceptHeader string) { - ts := &targetScraper{ + ts := newScraper(&targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2374,7 +2389,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { client: http.DefaultClient, timeout: configTimeout, acceptHeader: acceptHeader, - } + }) var buf bytes.Buffer resp, err := ts.scrape(context.Background()) @@ -2393,19 +2408,20 @@ func TestTargetScraperScrapeOK(t *testing.T) { func TestTargetScrapeScrapeCancel(t *testing.T) { block := make(chan struct{}) - server := httptest.NewServer( + server := newHTTPTestServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { <-block }), ) defer server.Close() + defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { panic(err) } - ts := &targetScraper{ + ts := newScraper(&targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2414,7 +2430,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { }, client: http.DefaultClient, acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols), - } + }) ctx, cancel := context.WithCancel(context.Background()) errc := make(chan error, 1) @@ -2448,19 +2464,20 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { } func TestTargetScrapeScrapeNotFound(t *testing.T) { - server := httptest.NewServer( + server := newHTTPTestServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) }), ) defer server.Close() + defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { panic(err) } - ts := &targetScraper{ + ts := newScraper(&targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2469,11 +2486,12 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { }, client: http.DefaultClient, acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols), - } + }) resp, err := ts.scrape(context.Background()) require.NoError(t, err) _, err = ts.readResponse(context.Background(), resp, io.Discard) + require.Error(t, err) require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err) } @@ -2483,7 +2501,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { responseBody = "metric_a 1\nmetric_b 2\n" ) var gzipResponse bool - server := httptest.NewServer( + server := newHTTPTestServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `text/plain; version=0.0.4`) if gzipResponse { @@ -2497,6 +2515,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { }), ) defer server.Close() + defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { @@ -2515,37 +2534,38 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols), metrics: newTestScrapeMetrics(t), } + s := newScraper(ts) var buf bytes.Buffer // Target response uncompressed body, scrape with body size limit. - resp, err := ts.scrape(context.Background()) + resp, err := s.scrape(context.Background()) require.NoError(t, err) - _, err = ts.readResponse(context.Background(), resp, &buf) + _, err = s.readResponse(context.Background(), resp, &buf) require.ErrorIs(t, err, errBodySizeLimit) require.Equal(t, bodySizeLimit, buf.Len()) // Target response gzip compressed body, scrape with body size limit. gzipResponse = true buf.Reset() - resp, err = ts.scrape(context.Background()) + resp, err = s.scrape(context.Background()) require.NoError(t, err) - _, err = ts.readResponse(context.Background(), resp, &buf) + _, err = s.readResponse(context.Background(), resp, &buf) require.ErrorIs(t, err, errBodySizeLimit) require.Equal(t, bodySizeLimit, buf.Len()) // Target response uncompressed body, scrape without body size limit. gzipResponse = false buf.Reset() ts.bodySizeLimit = 0 - resp, err = ts.scrape(context.Background()) + resp, err = s.scrape(context.Background()) require.NoError(t, err) - _, err = ts.readResponse(context.Background(), resp, &buf) + _, err = s.readResponse(context.Background(), resp, &buf) require.NoError(t, err) require.Len(t, responseBody, buf.Len()) // Target response gzip compressed body, scrape without body size limit. gzipResponse = true buf.Reset() - resp, err = ts.scrape(context.Background()) + resp, err = s.scrape(context.Background()) require.NoError(t, err) - _, err = ts.readResponse(context.Background(), resp, &buf) + _, err = s.readResponse(context.Background(), resp, &buf) require.NoError(t, err) require.Len(t, responseBody, buf.Len()) } @@ -3062,7 +3082,7 @@ func TestScrapeReportLimit(t *testing.T) { scrapedTwice = make(chan bool) ) - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n") scrapes++ if scrapes == 2 { @@ -3070,6 +3090,7 @@ func TestScrapeReportLimit(t *testing.T) { } })) defer ts.Close() + defer SetDefaultGathererHandler(nil) sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) @@ -3310,7 +3331,7 @@ test_summary_count 199 scrapeCount := 0 scraped := make(chan bool) - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, metricsText) scrapeCount++ if scrapeCount > 2 { @@ -3318,6 +3339,7 @@ test_summary_count 199 } })) defer ts.Close() + defer SetDefaultGathererHandler(nil) sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) @@ -3435,7 +3457,7 @@ func TestScrapeLoopCompression(t *testing.T) { t.Run(fmt.Sprintf("compression=%v,acceptEncoding=%s", tc.enableCompression, tc.acceptEncoding), func(t *testing.T) { scraped := make(chan bool) - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, tc.acceptEncoding, r.Header.Get("Accept-Encoding"), "invalid value of the Accept-Encoding header") fmt.Fprint(w, metricsText) close(scraped) @@ -3596,7 +3618,7 @@ func BenchmarkTargetScraperGzip(b *testing.B) { for _, scenario := range scenarios { b.Run(fmt.Sprintf("metrics=%d", scenario.metricsCount), func(b *testing.B) { - ts := &targetScraper{ + ts := newScraper(&targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -3606,7 +3628,7 @@ func BenchmarkTargetScraperGzip(b *testing.B) { }, client: client, timeout: time.Second, - } + }) b.ResetTimer() for i := 0; i < b.N; i++ { _, err = ts.scrape(context.Background()) @@ -3705,7 +3727,7 @@ func testNativeHistogramMaxSchemaSet(t *testing.T, minBucketFactor string, expec buffer := protoMarshalDelimited(t, histogramMetricFamily) // Create a HTTP server to serve /metrics via ProtoBuf - metricsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + metricsServer := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) w.Write(buffer) })) From a8368eb5dcf84ba7f0bbc9a63e189eccb5d05d44 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Tue, 14 Jan 2025 12:54:28 -0500 Subject: [PATCH 2/7] [promotel] ProtobufParser can read metrics data directly from Gatherer --- model/textparse/protobufparse.go | 114 ++++++++++++++++- model/textparse/protobufparse_test.go | 32 ++++- scrape/manager_test.go | 172 +++++++++++++++----------- scrape/scrape.go | 25 +++- 4 files changed, 264 insertions(+), 79 deletions(-) diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index ea3a2e1a34f..1eb4a88cb6d 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -25,6 +25,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" @@ -77,6 +78,8 @@ type ProtobufParser struct { // The following are just shenanigans to satisfy the Parser interface. metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric. + + gathererIterator *gathererIterator } // NewProtobufParser returns a parser for the payload in the byte slice. @@ -410,7 +413,7 @@ func (p *ProtobufParser) Next() (Entry, error) { case EntryInvalid: p.metricPos = 0 p.fieldPos = -2 - n, err := readDelimited(p.in[p.inPos:], p.mf) + n, err := readDelimited(p.in[p.inPos:], p.mf, p.gathererIterator) p.inPos += n if err != nil { return p.state, err @@ -586,7 +589,11 @@ var errInvalidVarint = errors.New("protobufparse: invalid varint encountered") // specific to a MetricFamily, utilizes the more efficient gogo-protobuf // unmarshaling, and acts on a byte slice directly without any additional // staging buffers. -func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { +func readDelimited(b []byte, mf *dto.MetricFamily, iter *gathererIterator) (n int, err error) { + if iter != nil { + err := iter.readNext(mf) + return 0, err + } if len(b) == 0 { return 0, io.EOF } @@ -642,3 +649,106 @@ func isNativeHistogram(h *dto.Histogram) bool { h.GetZeroThreshold() > 0 || h.GetZeroCount() > 0 } + +func convertMetricFamilyPb(srcMf *io_prometheus_client.MetricFamily, dst *dto.MetricFamily) (n int, err error) { + protoBuf, err := proto.Marshal(srcMf) + if err != nil { + return 0, err + } + dst.Reset() + err = dst.Unmarshal(protoBuf) + if err != nil { + return 0, err + } + return len(protoBuf), nil +} + +// Converts *io_prometheus_client.MetricFamily to *dto.MetricFamily +// NOTE: This is incomplete implementation +func convertMetricFamily(src *io_prometheus_client.MetricFamily, dst *dto.MetricFamily) { + dst.Name = src.GetName() + dst.Help = src.GetHelp() + dst.Type = dto.MetricType(src.GetType()) + dst.Unit = src.GetUnit() + dst.Metric = make([]dto.Metric, len(src.Metric)) + for i, m := range src.Metric { + dst.Metric[i] = dto.Metric{ + Label: make([]dto.LabelPair, 0, len(m.GetLabel())), + } + for _, lp := range m.GetLabel() { + dst.Metric[i].Label = append(dst.Metric[i].Label, dto.LabelPair{ + Name: lp.GetName(), + Value: lp.GetValue(), + }) + } + switch src.GetType() { + case io_prometheus_client.MetricType_COUNTER: + dst.Metric[i].Counter = &dto.Counter{ + Value: m.GetCounter().GetValue(), + } + case io_prometheus_client.MetricType_GAUGE: + dst.Metric[i].Gauge = &dto.Gauge{ + Value: m.GetGauge().GetValue(), + } + case io_prometheus_client.MetricType_SUMMARY: + dst.Metric[i].Summary = &dto.Summary{ + SampleCount: m.GetSummary().GetSampleCount(), + SampleSum: m.GetSummary().GetSampleSum(), + Quantile: make([]dto.Quantile, len(m.GetSummary().GetQuantile())), + } + for j, q := range m.GetSummary().GetQuantile() { + dst.Metric[i].GetSummary().Quantile[j] = dto.Quantile{ + Quantile: q.GetQuantile(), + Value: q.GetValue(), + } + } + case io_prometheus_client.MetricType_HISTOGRAM: + dst.Metric[i].Histogram = &dto.Histogram{ + SampleCount: m.GetHistogram().GetSampleCount(), + SampleSum: m.GetHistogram().GetSampleSum(), + Bucket: make([]dto.Bucket, len(m.GetHistogram().GetBucket())), + } + } + for j, b := range m.GetHistogram().GetBucket() { + dst.Metric[i].GetHistogram().Bucket[j] = dto.Bucket{ + CumulativeCount: b.GetCumulativeCount(), + UpperBound: b.GetUpperBound(), + } + } + } +} + +// write me iterator over prometheus.Gatherer.Gather() result to get all metrics +type gathererIterator struct { + mfs []*io_prometheus_client.MetricFamily + index int +} + +func (it *gathererIterator) readNext(mf *dto.MetricFamily) error { + fmt.Println("[gathererIterator] readNext is called") + if it == nil || it.index >= len(it.mfs) { + return io.EOF + } + // Copies proto message from io_prometheus_client.MetricFamily to dto.MetricFamily + _, err := convertMetricFamilyPb(it.mfs[it.index], mf) + if err != nil { + e := fmt.Errorf("failed to convert io_prometheus_client.MetricFamily to dto.MetricFamily: %w", err) + // todo:remove this + fmt.Println(e) + return e + } + it.index++ + return nil +} + +func NewGathererParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable, mfs []*io_prometheus_client.MetricFamily) Parser { + return &ProtobufParser{ + in: b, + state: EntryInvalid, + mf: &dto.MetricFamily{}, + metricBytes: &bytes.Buffer{}, + parseClassicHistograms: parseClassicHistograms, + builder: labels.NewScratchBuilderWithSymbolTable(st, 16), + gathererIterator: &gathererIterator{mfs: mfs}, + } +} diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index e323a6cc8f3..dc95f432959 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -21,9 +21,11 @@ import ( "testing" "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -32,7 +34,7 @@ import ( dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" ) -func createTestProtoBuf(t *testing.T) *bytes.Buffer { +func createTestProtoBuf(t *testing.T) (*bytes.Buffer, *testGatherer) { testMetricFamilies := []string{ `name: "go_build_info" help: "Build information about the main Go module." @@ -715,7 +717,25 @@ metric: < buf.Write(protoBuf) } - return buf + var metrics []*io_prometheus_client.MetricFamily + for _, tmf := range testMetricFamilies { + pb := &io_prometheus_client.MetricFamily{} + // From text to proto message. + require.NoError(t, proto.UnmarshalText(tmf, pb)) + metrics = append(metrics, pb) + } + + return buf, &testGatherer{metrics} +} + +type testGatherer struct { + metrics []*io_prometheus_client.MetricFamily +} + +var _ prometheus.Gatherer = &testGatherer{} + +func (m *testGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) { + return m.metrics, nil } func TestProtobufParse(t *testing.T) { @@ -734,8 +754,10 @@ func TestProtobufParse(t *testing.T) { ct int64 } - inputBuf := createTestProtoBuf(t) + _, testPromGatherer := createTestProtoBuf(t) + metrics, err := testPromGatherer.Gather() + require.NoError(t, err) scenarios := []struct { name string parser Parser @@ -743,7 +765,7 @@ func TestProtobufParse(t *testing.T) { }{ { name: "ignore classic buckets of native histograms", - parser: NewProtobufParser(inputBuf.Bytes(), false, labels.NewSymbolTable()), + parser: NewGathererParser([]byte{}, false, labels.NewSymbolTable(), metrics), expected: []parseResult{ { m: "go_build_info", @@ -1280,7 +1302,7 @@ func TestProtobufParse(t *testing.T) { }, { name: "parse classic and native buckets", - parser: NewProtobufParser(inputBuf.Bytes(), true, labels.NewSymbolTable()), + parser: NewGathererParser([]byte{}, true, labels.NewSymbolTable(), metrics), expected: []parseResult{ { // 0 m: "go_build_info", diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 495e76ac482..76b0a523f44 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "net/http" + "net/http/httptest" "net/url" "os" "strconv" @@ -713,6 +714,18 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } +type testGatherer struct { + t *testing.T + metrics []*dto.MetricFamily +} + +var _ prometheus.Gatherer = &testGatherer{} + +func (g *testGatherer) Gather() ([]*dto.MetricFamily, error) { + g.t.Log("testGatherer.Gather is called") + return g.metrics, nil +} + // TestManagerCTZeroIngestion tests scrape manager for CT cases. func TestManagerCTZeroIngestion(t *testing.T) { const mName = "expected_counter" @@ -752,84 +765,101 @@ func TestManagerCTZeroIngestion(t *testing.T) { expectedValues: []float64{1.0}, }, } { - t.Run(tc.name, func(t *testing.T) { - app := &collectResultAppender{} - scrapeManager, err := NewManager( - &Options{ - EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, - skipOffsetting: true, - }, - log.NewLogfmtLogger(os.Stderr), - &collectResultAppendable{app}, - prometheus.NewRegistry(), - ) - require.NoError(t, err) - - require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.GlobalConfig{ - // Disable regular scrapes. - ScrapeInterval: model.Duration(9999 * time.Minute), - ScrapeTimeout: model.Duration(5 * time.Second), - // Ensure the proto is chosen. We need proto as it's the only protocol - // with the CT parsing support. - ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - })) - - once := sync.Once{} - // Start fake HTTP target to that allow one scrape only. - - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fail := true - once.Do(func() { - fail = false - w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + for _, useHTTPTestServer := range []bool{true, false} { + t.Run(fmt.Sprintf("useHTTPTestServer:%t", useHTTPTestServer), func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { + app := &collectResultAppender{} + scrapeManager, err := NewManager( + &Options{ + EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, + skipOffsetting: true, + }, + log.NewLogfmtLogger(os.Stderr), + &collectResultAppendable{app}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + // Disable regular scrapes. + ScrapeInterval: model.Duration(9999 * time.Minute), + ScrapeTimeout: model.Duration(5 * time.Second), + // Ensure the proto is chosen. We need proto as it's the only protocol + // with the CT parsing support. + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + })) + once := sync.Once{} + // Start fake HTTP target to that allow one scrape only. ctrType := dto.MetricType_COUNTER - w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ + mf := &dto.MetricFamily{ Name: proto.String(mName), Type: &ctrType, Metric: []*dto.Metric{{Counter: tc.counterSample}}, - })) - }) - - if fail { - w.WriteHeader(http.StatusInternalServerError) - } - }) - // This enables scraper to read metrics from the handler directly without making HTTP request - SetDefaultGathererHandler(handler) - defer SetDefaultGathererHandler(nil) - - serverURL, err := url.Parse("http://not-started:8080") - require.NoError(t, err) + } + mfs := []*dto.MetricFamily{mf} + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fail := true + once.Do(func() { + fail = false + w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + t.Log("Received HTTP request to the test server from scraper") + // Dont write anything to the response body + // w.Write(protoMarshalDelimited(t, mf)) + }) + if fail { + w.WriteHeader(http.StatusInternalServerError) + } + }) + var serverURL *url.URL + if useHTTPTestServer { + server := httptest.NewServer(handler) + defer server.Close() + serverURL, err = url.Parse(server.URL) + require.NoError(t, err) + } else { + // This enables scraper to read metrics from the handler directly without making HTTP request + SetDefaultGathererHandler(handler) + defer SetDefaultGathererHandler(nil) + serverURL, err = url.Parse("http://not-started:8080") + require.NoError(t, err) + } - // Add fake target directly into tsets + reload. Normally users would use - // Manager.Run and wait for minimum 5s refresh interval. - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": {{ - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }}, + testPromGatherer := prometheus.Gatherer(&testGatherer{t, mfs}) + // This will cause scrapeLoop to a switch from ProtobufParser to GathererParser which reads directly from testPromGatherer + SetDefaultGatherer(testPromGatherer) + + // Add fake target directly into tsets + reload. Normally users would use + // Manager.Run and wait for minimum 5s refresh interval. + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }}, + }) + scrapeManager.reload() + + // Wait for one scrape. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + if countFloatSamples(app, mName) != len(tc.expectedValues) { + return fmt.Errorf("expected %v samples", tc.expectedValues) + } + return nil + }), "after 1 minute") + scrapeManager.Stop() + + require.Equal(t, tc.expectedValues, getResultFloats(app, mName)) + }) }) - scrapeManager.reload() - - // Wait for one scrape. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { - if countFloatSamples(app, mName) != len(tc.expectedValues) { - return fmt.Errorf("expected %v samples", tc.expectedValues) - } - return nil - }), "after 1 minute") - scrapeManager.Stop() - - require.Equal(t, tc.expectedValues, getResultFloats(app, mName)) - }) + } } } diff --git a/scrape/scrape.go b/scrape/scrape.go index 6687bd66838..48bfb3cc950 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1492,6 +1492,16 @@ type appendErrors struct { func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable) + if g := GetDefaultGatherer(); g != nil { + if mfs, err := g.Gather(); err != nil { + level.Debug(sl.l).Log( + "msg", "Failed to get metrics from Gather.", + "err", err, + ) + } else { + p = textparse.NewGathererParser(b, sl.scrapeClassicHistograms, sl.symbolTable, mfs) + } + } if err != nil { level.Debug(sl.l).Log( "msg", "Invalid content type on scrape, using prometheus parser as fallback.", @@ -2038,6 +2048,7 @@ func (gs *gathererScraper) scrape(ctx context.Context) (*http.Response, error) { if gs.h != nil { gs.h.ServeHTTP(w, req) } + fmt.Println("[gathererScraper] scraping metrics") resCh <- scrapeResult{w.response, nil} }() select { @@ -2083,7 +2094,11 @@ func (rw *responseWriter) WriteHeader(statusCode int) { rw.response.Status = fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)) } -var defaultGathererHandler atomic.Pointer[http.Handler] +var ( + defaultGathererHandler atomic.Pointer[http.Handler] + + defaultGatherer atomic.Pointer[prometheus.Gatherer] +) // This enables scraper to read metrics from the handler directly without making HTTP request func SetDefaultGathererHandler(h http.Handler) { @@ -2091,6 +2106,7 @@ func SetDefaultGathererHandler(h http.Handler) { } func SetDefaultGatherer(g prometheus.Gatherer) { + defaultGatherer.Store(&g) SetDefaultGathererHandler(promhttp.HandlerFor(g, promhttp.HandlerOpts{})) } @@ -2100,3 +2116,10 @@ func GetDefaultGathererHandler() http.Handler { } return nil } + +func GetDefaultGatherer() prometheus.Gatherer { + if g := defaultGatherer.Load(); g != nil { + return *g + } + return nil +} From 383a9c7446690eb06d60664bf5e9efc43fce2f23 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:38:53 -0500 Subject: [PATCH 3/7] Restore model/textparse/protobufparse_test.go from e6cfa720f --- model/textparse/protobufparse_test.go | 32 +++++---------------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index dc95f432959..e323a6cc8f3 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -21,11 +21,9 @@ import ( "testing" "github.com/gogo/protobuf/proto" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -34,7 +32,7 @@ import ( dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" ) -func createTestProtoBuf(t *testing.T) (*bytes.Buffer, *testGatherer) { +func createTestProtoBuf(t *testing.T) *bytes.Buffer { testMetricFamilies := []string{ `name: "go_build_info" help: "Build information about the main Go module." @@ -717,25 +715,7 @@ metric: < buf.Write(protoBuf) } - var metrics []*io_prometheus_client.MetricFamily - for _, tmf := range testMetricFamilies { - pb := &io_prometheus_client.MetricFamily{} - // From text to proto message. - require.NoError(t, proto.UnmarshalText(tmf, pb)) - metrics = append(metrics, pb) - } - - return buf, &testGatherer{metrics} -} - -type testGatherer struct { - metrics []*io_prometheus_client.MetricFamily -} - -var _ prometheus.Gatherer = &testGatherer{} - -func (m *testGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) { - return m.metrics, nil + return buf } func TestProtobufParse(t *testing.T) { @@ -754,10 +734,8 @@ func TestProtobufParse(t *testing.T) { ct int64 } - _, testPromGatherer := createTestProtoBuf(t) + inputBuf := createTestProtoBuf(t) - metrics, err := testPromGatherer.Gather() - require.NoError(t, err) scenarios := []struct { name string parser Parser @@ -765,7 +743,7 @@ func TestProtobufParse(t *testing.T) { }{ { name: "ignore classic buckets of native histograms", - parser: NewGathererParser([]byte{}, false, labels.NewSymbolTable(), metrics), + parser: NewProtobufParser(inputBuf.Bytes(), false, labels.NewSymbolTable()), expected: []parseResult{ { m: "go_build_info", @@ -1302,7 +1280,7 @@ func TestProtobufParse(t *testing.T) { }, { name: "parse classic and native buckets", - parser: NewGathererParser([]byte{}, true, labels.NewSymbolTable(), metrics), + parser: NewProtobufParser(inputBuf.Bytes(), true, labels.NewSymbolTable()), expected: []parseResult{ { // 0 m: "go_build_info", From 0ecdb27f8ce519b26a9df96fabc0177218c9e7b0 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 24 Jan 2025 09:56:27 -0500 Subject: [PATCH 4/7] Refactoring --- model/textparse/promotel.go | 61 + model/textparse/promotel_test.go | 1626 +++++++++++++++++++++++++ model/textparse/protobufparse.go | 123 +- model/textparse/protobufparse_test.go | 6 +- scrape/promotel.go | 140 +++ scrape/promotel_test.go | 250 ++++ scrape/scrape.go | 22 +- scrape/scrape_test.go | 67 +- scrape/target.go | 2 +- 9 files changed, 2121 insertions(+), 176 deletions(-) create mode 100644 model/textparse/promotel.go create mode 100644 model/textparse/promotel_test.go create mode 100644 scrape/promotel.go create mode 100644 scrape/promotel_test.go diff --git a/model/textparse/promotel.go b/model/textparse/promotel.go new file mode 100644 index 00000000000..9a272446748 --- /dev/null +++ b/model/textparse/promotel.go @@ -0,0 +1,61 @@ +package textparse + +import ( + "bytes" + "fmt" + "io" + + "github.com/gogo/protobuf/proto" + io_prometheus_client "github.com/prometheus/client_model/go" + + "github.com/prometheus/prometheus/model/labels" + dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" +) + +func convertMetricFamilyPb(srcMf *io_prometheus_client.MetricFamily, dst *dto.MetricFamily) (n int, err error) { + protoBuf, err := proto.Marshal(srcMf) + if err != nil { + return 0, err + } + dst.Reset() + err = dst.Unmarshal(protoBuf) + if err != nil { + return 0, err + } + return len(protoBuf), nil +} + +type ProtobufParserShim struct { + *ProtobufParser + mfs []*io_prometheus_client.MetricFamily + index int +} + +// Used to override readDelimited method of the ProtobufParser. +func (p *ProtobufParserShim) readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { + if p == nil || p.index >= len(p.mfs) { + return 0, io.EOF + } + // Copies proto message from io_prometheus_client.MetricFamily to dto.MetricFamily + _, err = convertMetricFamilyPb(p.mfs[p.index], mf) + if err != nil { + // todo: test this + return 0, fmt.Errorf("failed to convert io_prometheus_client.MetricFamily to dto.MetricFamily: %w", err) + } + p.index++ + return 0, nil +} + +func NewProtobufParserShim(parseClassicHistograms bool, st *labels.SymbolTable, mfs []*io_prometheus_client.MetricFamily) Parser { + p := &ProtobufParserShim{&ProtobufParser{ + in: []byte{}, + state: EntryInvalid, + mf: &dto.MetricFamily{}, + metricBytes: &bytes.Buffer{}, + parseClassicHistograms: parseClassicHistograms, + builder: labels.NewScratchBuilderWithSymbolTable(st, 16), + }, mfs, 0} + // Overrides readDelimited method of the ProtobufParser + p.ProtobufParser.readDelimitedFunc = p.readDelimited + return p +} diff --git a/model/textparse/promotel_test.go b/model/textparse/promotel_test.go new file mode 100644 index 00000000000..d6cb20c9aa9 --- /dev/null +++ b/model/textparse/promotel_test.go @@ -0,0 +1,1626 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package textparse + +import ( + "errors" + "io" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +func createTestProtoBufGatherer(t *testing.T) *testGatherer { + testMetricFamilies, _ := createTestProtoBuf(t) + + var metrics []*io_prometheus_client.MetricFamily + for _, tmf := range testMetricFamilies { + pb := &io_prometheus_client.MetricFamily{} + // From text to proto message. + require.NoError(t, proto.UnmarshalText(tmf, pb)) + metrics = append(metrics, pb) + } + + return &testGatherer{metrics} +} + +type testGatherer struct { + metrics []*io_prometheus_client.MetricFamily +} + +var _ prometheus.Gatherer = &testGatherer{} + +func (m *testGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) { + return m.metrics, nil +} + +func TestProtobufParseShim(t *testing.T) { + type parseResult struct { + lset labels.Labels + m string + t int64 + v float64 + typ model.MetricType + help string + unit string + comment string + shs *histogram.Histogram + fhs *histogram.FloatHistogram + e []exemplar.Exemplar + ct int64 + } + + testPromGatherer := createTestProtoBufGatherer(t) + + metrics, err := testPromGatherer.Gather() + require.NoError(t, err) + scenarios := []struct { + name string + parser Parser + expected []parseResult + }{ + { + name: "ignore classic buckets of native histograms", + parser: NewProtobufParserShim(false, labels.NewSymbolTable(), metrics), + expected: []parseResult{ + { + m: "go_build_info", + help: "Build information about the main Go module.", + }, + { + m: "go_build_info", + typ: model.MetricTypeGauge, + }, + { + m: "go_build_info\xFFchecksum\xFF\xFFpath\xFFgithub.com/prometheus/client_golang\xFFversion\xFF(devel)", + v: 1, + lset: labels.FromStrings( + "__name__", "go_build_info", + "checksum", "", + "path", "github.com/prometheus/client_golang", + "version", "(devel)", + ), + }, + { + m: "go_memstats_alloc_bytes_total", + help: "Total number of bytes allocated, even if freed.", + unit: "bytes", + }, + { + m: "go_memstats_alloc_bytes_total", + typ: model.MetricTypeCounter, + }, + { + m: "go_memstats_alloc_bytes_total", + v: 1.546544e+06, + lset: labels.FromStrings( + "__name__", "go_memstats_alloc_bytes_total", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "42"), Value: 12, HasTs: true, Ts: 1625851151233}, + }, + }, + { + m: "something_untyped", + help: "Just to test the untyped type.", + }, + { + m: "something_untyped", + typ: model.MetricTypeUnknown, + }, + { + m: "something_untyped", + t: 1234567, + v: 42, + lset: labels.FromStrings( + "__name__", "something_untyped", + ), + }, + { + m: "test_histogram", + help: "Test histogram with many buckets removed to keep it manageable in size.", + }, + { + m: "test_histogram", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram", + t: 1234568, + shs: &histogram.Histogram{ + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { + m: "test_gauge_histogram", + help: "Like test_histogram but as gauge histogram.", + }, + { + m: "test_gauge_histogram", + typ: model.MetricTypeGaugeHistogram, + }, + { + m: "test_gauge_histogram", + t: 1234568, + shs: &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { + m: "test_float_histogram", + help: "Test float histogram with many buckets removed to keep it manageable in size.", + }, + { + m: "test_float_histogram", + typ: model.MetricTypeHistogram, + }, + { + m: "test_float_histogram", + t: 1234568, + fhs: &histogram.FloatHistogram{ + Count: 175.0, + ZeroCount: 2.0, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []float64{1.0, 2.0, -1.0, -1.0}, + NegativeBuckets: []float64{1.0, 3.0, -2.0, -1.0, 1.0}, + }, + lset: labels.FromStrings( + "__name__", "test_float_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { + m: "test_gauge_float_histogram", + help: "Like test_float_histogram but as gauge histogram.", + }, + { + m: "test_gauge_float_histogram", + typ: model.MetricTypeGaugeHistogram, + }, + { + m: "test_gauge_float_histogram", + t: 1234568, + fhs: &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 175.0, + ZeroCount: 2.0, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []float64{1.0, 2.0, -1.0, -1.0}, + NegativeBuckets: []float64{1.0, 3.0, -2.0, -1.0, 1.0}, + }, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { + m: "test_histogram2", + help: "Similar histogram as before but now without sparse buckets.", + }, + { + m: "test_histogram2", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram2_count", + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram2_count", + ), + }, + { + m: "test_histogram2_sum", + v: 0.000828, + lset: labels.FromStrings( + "__name__", "test_histogram2_sum", + ), + }, + { + m: "test_histogram2_bucket\xffle\xff-0.00048", + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "-0.00048", + ), + }, + { + m: "test_histogram2_bucket\xffle\xff-0.00038", + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "-0.00038", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00038, HasTs: true, Ts: 1625851153146}, + }, + }, + { + m: "test_histogram2_bucket\xffle\xff1.0", + v: 16, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "1.0", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.000295, HasTs: false}, + }, + }, + { + m: "test_histogram2_bucket\xffle\xff+Inf", + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "+Inf", + ), + }, + { + m: "test_histogram_family", + help: "Test histogram metric family with two very simple histograms.", + }, + { + m: "test_histogram_family", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram_family\xfffoo\xffbar", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Count: 5, + Sum: 12.1, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 8, Length: 2}, + }, + NegativeSpans: []histogram.Span{}, + PositiveBuckets: []int64{2, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_family", + "foo", "bar", + ), + }, + { + m: "test_histogram_family\xfffoo\xffbaz", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Count: 6, + Sum: 13.1, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 8, Length: 2}, + }, + NegativeSpans: []histogram.Span{}, + PositiveBuckets: []int64{1, 4}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_family", + "foo", "baz", + ), + }, + { + m: "test_float_histogram_with_zerothreshold_zero", + help: "Test float histogram with a zero threshold of zero.", + }, + { + m: "test_float_histogram_with_zerothreshold_zero", + typ: model.MetricTypeHistogram, + }, + { + m: "test_float_histogram_with_zerothreshold_zero", + fhs: &histogram.FloatHistogram{ + Count: 5.0, + Sum: 12.1, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 8, Length: 2}, + }, + PositiveBuckets: []float64{2.0, 3.0}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "test_float_histogram_with_zerothreshold_zero", + ), + }, + { + m: "rpc_durations_seconds", + help: "RPC latency distributions.", + }, + { + m: "rpc_durations_seconds", + typ: model.MetricTypeSummary, + }, + { + m: "rpc_durations_seconds_count\xffservice\xffexponential", + v: 262, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds_count", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds_sum\xffservice\xffexponential", + v: 0.00025551262820703587, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds_sum", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5", + v: 6.442786329648548e-07, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.5", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9", + v: 1.9435742936658396e-06, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.9", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99", + v: 4.0471608667037015e-06, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.99", + "service", "exponential", + ), + }, + { + m: "without_quantiles", + help: "A summary without quantiles.", + }, + { + m: "without_quantiles", + typ: model.MetricTypeSummary, + }, + { + m: "without_quantiles_count", + v: 42, + lset: labels.FromStrings( + "__name__", "without_quantiles_count", + ), + }, + { + m: "without_quantiles_sum", + v: 1.234, + lset: labels.FromStrings( + "__name__", "without_quantiles_sum", + ), + }, + { + m: "empty_histogram", + help: "A histogram without observations and with a zero threshold of zero but with a no-op span to identify it as a native histogram.", + }, + { + m: "empty_histogram", + typ: model.MetricTypeHistogram, + }, + { + m: "empty_histogram", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "empty_histogram", + ), + }, + { + m: "test_counter_with_createdtimestamp", + help: "A counter with a created timestamp.", + }, + { + m: "test_counter_with_createdtimestamp", + typ: model.MetricTypeCounter, + }, + { + m: "test_counter_with_createdtimestamp", + v: 42, + ct: 1000, + lset: labels.FromStrings( + "__name__", "test_counter_with_createdtimestamp", + ), + }, + { + m: "test_summary_with_createdtimestamp", + help: "A summary with a created timestamp.", + }, + { + m: "test_summary_with_createdtimestamp", + typ: model.MetricTypeSummary, + }, + { + m: "test_summary_with_createdtimestamp_count", + v: 42, + ct: 1000, + lset: labels.FromStrings( + "__name__", "test_summary_with_createdtimestamp_count", + ), + }, + { + m: "test_summary_with_createdtimestamp_sum", + v: 1.234, + ct: 1000, + lset: labels.FromStrings( + "__name__", "test_summary_with_createdtimestamp_sum", + ), + }, + { + m: "test_histogram_with_createdtimestamp", + help: "A histogram with a created timestamp.", + }, + { + m: "test_histogram_with_createdtimestamp", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram_with_createdtimestamp", + ct: 1000, + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_with_createdtimestamp", + ), + }, + { + m: "test_gaugehistogram_with_createdtimestamp", + help: "A gauge histogram with a created timestamp.", + }, + { + m: "test_gaugehistogram_with_createdtimestamp", + typ: model.MetricTypeGaugeHistogram, + }, + { + m: "test_gaugehistogram_with_createdtimestamp", + ct: 1000, + shs: &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "test_gaugehistogram_with_createdtimestamp", + ), + }, + { + m: "test_histogram_with_native_histogram_exemplars", + help: "A histogram with native histogram exemplars.", + }, + { + m: "test_histogram_with_native_histogram_exemplars", + typ: model.MetricTypeHistogram, + }, + { + m: "test_histogram_with_native_histogram_exemplars", + t: 1234568, + shs: &histogram.Histogram{ + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59780"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + {Labels: labels.FromStrings("dummyID", "59772"), Value: -0.00052, HasTs: true, Ts: 1625851160156}, + }, + }, + }, + }, + { + name: "parse classic and native buckets", + parser: NewProtobufParserShim(true, labels.NewSymbolTable(), metrics), + expected: []parseResult{ + { // 0 + m: "go_build_info", + help: "Build information about the main Go module.", + }, + { // 1 + m: "go_build_info", + typ: model.MetricTypeGauge, + }, + { // 2 + m: "go_build_info\xFFchecksum\xFF\xFFpath\xFFgithub.com/prometheus/client_golang\xFFversion\xFF(devel)", + v: 1, + lset: labels.FromStrings( + "__name__", "go_build_info", + "checksum", "", + "path", "github.com/prometheus/client_golang", + "version", "(devel)", + ), + }, + { // 3 + m: "go_memstats_alloc_bytes_total", + help: "Total number of bytes allocated, even if freed.", + }, + { // 4 + m: "go_memstats_alloc_bytes_total", + typ: model.MetricTypeCounter, + }, + { // 5 + m: "go_memstats_alloc_bytes_total", + v: 1.546544e+06, + lset: labels.FromStrings( + "__name__", "go_memstats_alloc_bytes_total", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "42"), Value: 12, HasTs: true, Ts: 1625851151233}, + }, + }, + { // 6 + m: "something_untyped", + help: "Just to test the untyped type.", + }, + { // 7 + m: "something_untyped", + typ: model.MetricTypeUnknown, + }, + { // 8 + m: "something_untyped", + t: 1234567, + v: 42, + lset: labels.FromStrings( + "__name__", "something_untyped", + ), + }, + { // 9 + m: "test_histogram", + help: "Test histogram with many buckets removed to keep it manageable in size.", + }, + { // 10 + m: "test_histogram", + typ: model.MetricTypeHistogram, + }, + { // 11 + m: "test_histogram", + t: 1234568, + shs: &histogram.Histogram{ + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 12 + m: "test_histogram_count", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram_count", + ), + }, + { // 13 + m: "test_histogram_sum", + t: 1234568, + v: 0.0008280461746287094, + lset: labels.FromStrings( + "__name__", "test_histogram_sum", + ), + }, + { // 14 + m: "test_histogram_bucket\xffle\xff-0.0004899999999999998", + t: 1234568, + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram_bucket", + "le", "-0.0004899999999999998", + ), + }, + { // 15 + m: "test_histogram_bucket\xffle\xff-0.0003899999999999998", + t: 1234568, + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram_bucket", + "le", "-0.0003899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 16 + m: "test_histogram_bucket\xffle\xff-0.0002899999999999998", + t: 1234568, + v: 16, + lset: labels.FromStrings( + "__name__", "test_histogram_bucket", + "le", "-0.0002899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false}, + }, + }, + { // 17 + m: "test_histogram_bucket\xffle\xff+Inf", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram_bucket", + "le", "+Inf", + ), + }, + { // 18 + m: "test_gauge_histogram", + help: "Like test_histogram but as gauge histogram.", + }, + { // 19 + m: "test_gauge_histogram", + typ: model.MetricTypeGaugeHistogram, + }, + { // 20 + m: "test_gauge_histogram", + t: 1234568, + shs: &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 21 + m: "test_gauge_histogram_count", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram_count", + ), + }, + { // 22 + m: "test_gauge_histogram_sum", + t: 1234568, + v: 0.0008280461746287094, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram_sum", + ), + }, + { // 23 + m: "test_gauge_histogram_bucket\xffle\xff-0.0004899999999999998", + t: 1234568, + v: 2, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram_bucket", + "le", "-0.0004899999999999998", + ), + }, + { // 24 + m: "test_gauge_histogram_bucket\xffle\xff-0.0003899999999999998", + t: 1234568, + v: 4, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram_bucket", + "le", "-0.0003899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 25 + m: "test_gauge_histogram_bucket\xffle\xff-0.0002899999999999998", + t: 1234568, + v: 16, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram_bucket", + "le", "-0.0002899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false}, + }, + }, + { // 26 + m: "test_gauge_histogram_bucket\xffle\xff+Inf", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_gauge_histogram_bucket", + "le", "+Inf", + ), + }, + { // 27 + m: "test_float_histogram", + help: "Test float histogram with many buckets removed to keep it manageable in size.", + }, + { // 28 + m: "test_float_histogram", + typ: model.MetricTypeHistogram, + }, + { // 29 + m: "test_float_histogram", + t: 1234568, + fhs: &histogram.FloatHistogram{ + Count: 175.0, + ZeroCount: 2.0, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []float64{1.0, 2.0, -1.0, -1.0}, + NegativeBuckets: []float64{1.0, 3.0, -2.0, -1.0, 1.0}, + }, + lset: labels.FromStrings( + "__name__", "test_float_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 30 + m: "test_float_histogram_count", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_float_histogram_count", + ), + }, + { // 31 + m: "test_float_histogram_sum", + t: 1234568, + v: 0.0008280461746287094, + lset: labels.FromStrings( + "__name__", "test_float_histogram_sum", + ), + }, + { // 32 + m: "test_float_histogram_bucket\xffle\xff-0.0004899999999999998", + t: 1234568, + v: 2, + lset: labels.FromStrings( + "__name__", "test_float_histogram_bucket", + "le", "-0.0004899999999999998", + ), + }, + { // 33 + m: "test_float_histogram_bucket\xffle\xff-0.0003899999999999998", + t: 1234568, + v: 4, + lset: labels.FromStrings( + "__name__", "test_float_histogram_bucket", + "le", "-0.0003899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 34 + m: "test_float_histogram_bucket\xffle\xff-0.0002899999999999998", + t: 1234568, + v: 16, + lset: labels.FromStrings( + "__name__", "test_float_histogram_bucket", + "le", "-0.0002899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false}, + }, + }, + { // 35 + m: "test_float_histogram_bucket\xffle\xff+Inf", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_float_histogram_bucket", + "le", "+Inf", + ), + }, + { // 36 + m: "test_gauge_float_histogram", + help: "Like test_float_histogram but as gauge histogram.", + }, + { // 37 + m: "test_gauge_float_histogram", + typ: model.MetricTypeGaugeHistogram, + }, + { // 38 + m: "test_gauge_float_histogram", + t: 1234568, + fhs: &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Count: 175.0, + ZeroCount: 2.0, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []float64{1.0, 2.0, -1.0, -1.0}, + NegativeBuckets: []float64{1.0, 3.0, -2.0, -1.0, 1.0}, + }, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 39 + m: "test_gauge_float_histogram_count", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram_count", + ), + }, + { // 40 + m: "test_gauge_float_histogram_sum", + t: 1234568, + v: 0.0008280461746287094, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram_sum", + ), + }, + { // 41 + m: "test_gauge_float_histogram_bucket\xffle\xff-0.0004899999999999998", + t: 1234568, + v: 2, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram_bucket", + "le", "-0.0004899999999999998", + ), + }, + { // 42 + m: "test_gauge_float_histogram_bucket\xffle\xff-0.0003899999999999998", + t: 1234568, + v: 4, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram_bucket", + "le", "-0.0003899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 43 + m: "test_gauge_float_histogram_bucket\xffle\xff-0.0002899999999999998", + t: 1234568, + v: 16, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram_bucket", + "le", "-0.0002899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false}, + }, + }, + { // 44 + m: "test_gauge_float_histogram_bucket\xffle\xff+Inf", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_gauge_float_histogram_bucket", + "le", "+Inf", + ), + }, + { // 45 + m: "test_histogram2", + help: "Similar histogram as before but now without sparse buckets.", + }, + { // 46 + m: "test_histogram2", + typ: model.MetricTypeHistogram, + }, + { // 47 + m: "test_histogram2_count", + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram2_count", + ), + }, + { // 48 + m: "test_histogram2_sum", + v: 0.000828, + lset: labels.FromStrings( + "__name__", "test_histogram2_sum", + ), + }, + { // 49 + m: "test_histogram2_bucket\xffle\xff-0.00048", + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "-0.00048", + ), + }, + { // 50 + m: "test_histogram2_bucket\xffle\xff-0.00038", + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "-0.00038", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00038, HasTs: true, Ts: 1625851153146}, + }, + }, + { // 51 + m: "test_histogram2_bucket\xffle\xff1.0", + v: 16, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "1.0", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.000295, HasTs: false}, + }, + }, + { // 52 + m: "test_histogram2_bucket\xffle\xff+Inf", + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "+Inf", + ), + }, + { // 53 + m: "test_histogram_family", + help: "Test histogram metric family with two very simple histograms.", + }, + { // 54 + m: "test_histogram_family", + typ: model.MetricTypeHistogram, + }, + { // 55 + m: "test_histogram_family\xfffoo\xffbar", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Count: 5, + Sum: 12.1, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 8, Length: 2}, + }, + NegativeSpans: []histogram.Span{}, + PositiveBuckets: []int64{2, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_family", + "foo", "bar", + ), + }, + { // 56 + m: "test_histogram_family_count\xfffoo\xffbar", + v: 5, + lset: labels.FromStrings( + "__name__", "test_histogram_family_count", + "foo", "bar", + ), + }, + { // 57 + m: "test_histogram_family_sum\xfffoo\xffbar", + v: 12.1, + lset: labels.FromStrings( + "__name__", "test_histogram_family_sum", + "foo", "bar", + ), + }, + { // 58 + m: "test_histogram_family_bucket\xfffoo\xffbar\xffle\xff1.1", + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram_family_bucket", + "foo", "bar", + "le", "1.1", + ), + }, + { // 59 + m: "test_histogram_family_bucket\xfffoo\xffbar\xffle\xff2.2", + v: 3, + lset: labels.FromStrings( + "__name__", "test_histogram_family_bucket", + "foo", "bar", + "le", "2.2", + ), + }, + { // 60 + m: "test_histogram_family_bucket\xfffoo\xffbar\xffle\xff+Inf", + v: 5, + lset: labels.FromStrings( + "__name__", "test_histogram_family_bucket", + "foo", "bar", + "le", "+Inf", + ), + }, + { // 61 + m: "test_histogram_family\xfffoo\xffbaz", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Count: 6, + Sum: 13.1, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 8, Length: 2}, + }, + NegativeSpans: []histogram.Span{}, + PositiveBuckets: []int64{1, 4}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_family", + "foo", "baz", + ), + }, + { // 62 + m: "test_histogram_family_count\xfffoo\xffbaz", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram_family_count", + "foo", "baz", + ), + }, + { // 63 + m: "test_histogram_family_sum\xfffoo\xffbaz", + v: 13.1, + lset: labels.FromStrings( + "__name__", "test_histogram_family_sum", + "foo", "baz", + ), + }, + { // 64 + m: "test_histogram_family_bucket\xfffoo\xffbaz\xffle\xff1.1", + v: 1, + lset: labels.FromStrings( + "__name__", "test_histogram_family_bucket", + "foo", "baz", + "le", "1.1", + ), + }, + { // 65 + m: "test_histogram_family_bucket\xfffoo\xffbaz\xffle\xff2.2", + v: 5, + lset: labels.FromStrings( + "__name__", "test_histogram_family_bucket", + "foo", "baz", + "le", "2.2", + ), + }, + { // 66 + m: "test_histogram_family_bucket\xfffoo\xffbaz\xffle\xff+Inf", + v: 6, + lset: labels.FromStrings( + "__name__", "test_histogram_family_bucket", + "foo", "baz", + "le", "+Inf", + ), + }, + { // 67 + m: "test_float_histogram_with_zerothreshold_zero", + help: "Test float histogram with a zero threshold of zero.", + }, + { // 68 + m: "test_float_histogram_with_zerothreshold_zero", + typ: model.MetricTypeHistogram, + }, + { // 69 + m: "test_float_histogram_with_zerothreshold_zero", + fhs: &histogram.FloatHistogram{ + Count: 5.0, + Sum: 12.1, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: 8, Length: 2}, + }, + PositiveBuckets: []float64{2.0, 3.0}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "test_float_histogram_with_zerothreshold_zero", + ), + }, + { // 70 + m: "rpc_durations_seconds", + help: "RPC latency distributions.", + }, + { // 71 + m: "rpc_durations_seconds", + typ: model.MetricTypeSummary, + }, + { // 72 + m: "rpc_durations_seconds_count\xffservice\xffexponential", + v: 262, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds_count", + "service", "exponential", + ), + }, + { // 73 + m: "rpc_durations_seconds_sum\xffservice\xffexponential", + v: 0.00025551262820703587, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds_sum", + "service", "exponential", + ), + }, + { // 74 + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5", + v: 6.442786329648548e-07, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.5", + "service", "exponential", + ), + }, + { // 75 + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9", + v: 1.9435742936658396e-06, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.9", + "service", "exponential", + ), + }, + { // 76 + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99", + v: 4.0471608667037015e-06, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.99", + "service", "exponential", + ), + }, + { // 77 + m: "without_quantiles", + help: "A summary without quantiles.", + }, + { // 78 + m: "without_quantiles", + typ: model.MetricTypeSummary, + }, + { // 79 + m: "without_quantiles_count", + v: 42, + lset: labels.FromStrings( + "__name__", "without_quantiles_count", + ), + }, + { // 80 + m: "without_quantiles_sum", + v: 1.234, + lset: labels.FromStrings( + "__name__", "without_quantiles_sum", + ), + }, + { // 78 + m: "empty_histogram", + help: "A histogram without observations and with a zero threshold of zero but with a no-op span to identify it as a native histogram.", + }, + { // 79 + m: "empty_histogram", + typ: model.MetricTypeHistogram, + }, + { // 80 + m: "empty_histogram", + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "empty_histogram", + ), + }, + { // 81 + m: "test_counter_with_createdtimestamp", + help: "A counter with a created timestamp.", + }, + { // 82 + m: "test_counter_with_createdtimestamp", + typ: model.MetricTypeCounter, + }, + { // 83 + m: "test_counter_with_createdtimestamp", + v: 42, + ct: 1000, + lset: labels.FromStrings( + "__name__", "test_counter_with_createdtimestamp", + ), + }, + { // 84 + m: "test_summary_with_createdtimestamp", + help: "A summary with a created timestamp.", + }, + { // 85 + m: "test_summary_with_createdtimestamp", + typ: model.MetricTypeSummary, + }, + { // 86 + m: "test_summary_with_createdtimestamp_count", + v: 42, + ct: 1000, + lset: labels.FromStrings( + "__name__", "test_summary_with_createdtimestamp_count", + ), + }, + { // 87 + m: "test_summary_with_createdtimestamp_sum", + v: 1.234, + ct: 1000, + lset: labels.FromStrings( + "__name__", "test_summary_with_createdtimestamp_sum", + ), + }, + { // 88 + m: "test_histogram_with_createdtimestamp", + help: "A histogram with a created timestamp.", + }, + { // 89 + m: "test_histogram_with_createdtimestamp", + typ: model.MetricTypeHistogram, + }, + { // 90 + m: "test_histogram_with_createdtimestamp", + ct: 1000, + shs: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_with_createdtimestamp", + ), + }, + { // 91 + m: "test_gaugehistogram_with_createdtimestamp", + help: "A gauge histogram with a created timestamp.", + }, + { // 92 + m: "test_gaugehistogram_with_createdtimestamp", + typ: model.MetricTypeGaugeHistogram, + }, + { // 93 + m: "test_gaugehistogram_with_createdtimestamp", + ct: 1000, + shs: &histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + PositiveSpans: []histogram.Span{}, + NegativeSpans: []histogram.Span{}, + }, + lset: labels.FromStrings( + "__name__", "test_gaugehistogram_with_createdtimestamp", + ), + }, + { // 94 + m: "test_histogram_with_native_histogram_exemplars", + help: "A histogram with native histogram exemplars.", + }, + { // 95 + m: "test_histogram_with_native_histogram_exemplars", + typ: model.MetricTypeHistogram, + }, + { // 96 + m: "test_histogram_with_native_histogram_exemplars", + t: 1234568, + shs: &histogram.Histogram{ + Count: 175, + ZeroCount: 2, + Sum: 0.0008280461746287094, + ZeroThreshold: 2.938735877055719e-39, + Schema: 3, + PositiveSpans: []histogram.Span{ + {Offset: -161, Length: 1}, + {Offset: 8, Length: 3}, + }, + NegativeSpans: []histogram.Span{ + {Offset: -162, Length: 1}, + {Offset: 23, Length: 4}, + }, + PositiveBuckets: []int64{1, 2, -1, -1}, + NegativeBuckets: []int64{1, 3, -2, -1, 1}, + }, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59780"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + {Labels: labels.FromStrings("dummyID", "59772"), Value: -0.00052, HasTs: true, Ts: 1625851160156}, + }, + }, + { // 97 + m: "test_histogram_with_native_histogram_exemplars_count", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars_count", + ), + }, + { // 98 + m: "test_histogram_with_native_histogram_exemplars_sum", + t: 1234568, + v: 0.0008280461746287094, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars_sum", + ), + }, + { // 99 + m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff-0.0004899999999999998", + t: 1234568, + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars_bucket", + "le", "-0.0004899999999999998", + ), + }, + { // 100 + m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff-0.0003899999999999998", + t: 1234568, + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars_bucket", + "le", "-0.0003899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + }, + }, + { // 101 + m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff-0.0002899999999999998", + t: 1234568, + v: 16, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars_bucket", + "le", "-0.0002899999999999998", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false}, + }, + }, + { // 102 + m: "test_histogram_with_native_histogram_exemplars_bucket\xffle\xff+Inf", + t: 1234568, + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram_with_native_histogram_exemplars_bucket", + "le", "+Inf", + ), + }, + }, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + var ( + i int + res labels.Labels + p = scenario.parser + exp = scenario.expected + ) + + for { + et, err := p.Next() + if errors.Is(err, io.EOF) { + break + } + require.NoError(t, err) + + switch et { + case EntrySeries: + m, ts, v := p.Series() + + var e exemplar.Exemplar + p.Metric(&res) + eFound := p.Exemplar(&e) + ct := p.CreatedTimestamp() + require.Equal(t, exp[i].m, string(m), "i: %d", i) + if ts != nil { + require.Equal(t, exp[i].t, *ts, "i: %d", i) + } else { + require.Equal(t, int64(0), exp[i].t, "i: %d", i) + } + require.Equal(t, exp[i].v, v, "i: %d", i) + testutil.RequireEqual(t, exp[i].lset, res, "i: %d", i) + if len(exp[i].e) == 0 { + require.False(t, eFound, "i: %d", i) + } else { + require.True(t, eFound, "i: %d", i) + testutil.RequireEqual(t, exp[i].e[0], e, "i: %d", i) + require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i) + } + if exp[i].ct != 0 { + require.NotNilf(t, ct, "i: %d", i) + require.Equal(t, exp[i].ct, *ct, "i: %d", i) + } else { + require.Nilf(t, ct, "i: %d", i) + } + + case EntryHistogram: + m, ts, shs, fhs := p.Histogram() + p.Metric(&res) + require.Equal(t, exp[i].m, string(m), "i: %d", i) + if ts != nil { + require.Equal(t, exp[i].t, *ts, "i: %d", i) + } else { + require.Equal(t, int64(0), exp[i].t, "i: %d", i) + } + testutil.RequireEqual(t, exp[i].lset, res, "i: %d", i) + require.Equal(t, exp[i].m, string(m), "i: %d", i) + if shs != nil { + require.Equal(t, exp[i].shs, shs, "i: %d", i) + } else { + require.Equal(t, exp[i].fhs, fhs, "i: %d", i) + } + j := 0 + for e := (exemplar.Exemplar{}); p.Exemplar(&e); j++ { + testutil.RequireEqual(t, exp[i].e[j], e, "i: %d", i) + e = exemplar.Exemplar{} + } + require.Len(t, exp[i].e, j, "not enough exemplars found, i: %d", i) + + case EntryType: + m, typ := p.Type() + require.Equal(t, exp[i].m, string(m), "i: %d", i) + require.Equal(t, exp[i].typ, typ, "i: %d", i) + + case EntryHelp: + m, h := p.Help() + require.Equal(t, exp[i].m, string(m), "i: %d", i) + require.Equal(t, exp[i].help, string(h), "i: %d", i) + + case EntryUnit: + m, u := p.Unit() + require.Equal(t, exp[i].m, string(m), "i: %d", i) + require.Equal(t, exp[i].unit, string(u), "i: %d", i) + + case EntryComment: + require.Equal(t, exp[i].comment, string(p.Comment()), "i: %d", i) + } + + i++ + } + require.Len(t, exp, i) + }) + } +} diff --git a/model/textparse/protobufparse.go b/model/textparse/protobufparse.go index 1eb4a88cb6d..12b784f5f76 100644 --- a/model/textparse/protobufparse.go +++ b/model/textparse/protobufparse.go @@ -25,7 +25,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" - io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" @@ -79,7 +78,7 @@ type ProtobufParser struct { // The following are just shenanigans to satisfy the Parser interface. metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric. - gathererIterator *gathererIterator + readDelimitedFunc func([]byte, *dto.MetricFamily) (int, error) } // NewProtobufParser returns a parser for the payload in the byte slice. @@ -413,7 +412,7 @@ func (p *ProtobufParser) Next() (Entry, error) { case EntryInvalid: p.metricPos = 0 p.fieldPos = -2 - n, err := readDelimited(p.in[p.inPos:], p.mf, p.gathererIterator) + n, err := p.readDelimited(p.in[p.inPos:], p.mf) p.inPos += n if err != nil { return p.state, err @@ -582,18 +581,21 @@ func (p *ProtobufParser) getMagicLabel() (bool, string, string) { return false, "", "" } +func (p *ProtobufParser) readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { + if p.readDelimitedFunc != nil { + return p.readDelimitedFunc(b, mf) + } + return readDelimited(b, mf) +} + var errInvalidVarint = errors.New("protobufparse: invalid varint encountered") -// readDelimited is essentially doing what the function of the same name in +// ReadDelimited is essentially doing what the function of the same name in // github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is // specific to a MetricFamily, utilizes the more efficient gogo-protobuf // unmarshaling, and acts on a byte slice directly without any additional // staging buffers. -func readDelimited(b []byte, mf *dto.MetricFamily, iter *gathererIterator) (n int, err error) { - if iter != nil { - err := iter.readNext(mf) - return 0, err - } +func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { if len(b) == 0 { return 0, io.EOF } @@ -649,106 +651,3 @@ func isNativeHistogram(h *dto.Histogram) bool { h.GetZeroThreshold() > 0 || h.GetZeroCount() > 0 } - -func convertMetricFamilyPb(srcMf *io_prometheus_client.MetricFamily, dst *dto.MetricFamily) (n int, err error) { - protoBuf, err := proto.Marshal(srcMf) - if err != nil { - return 0, err - } - dst.Reset() - err = dst.Unmarshal(protoBuf) - if err != nil { - return 0, err - } - return len(protoBuf), nil -} - -// Converts *io_prometheus_client.MetricFamily to *dto.MetricFamily -// NOTE: This is incomplete implementation -func convertMetricFamily(src *io_prometheus_client.MetricFamily, dst *dto.MetricFamily) { - dst.Name = src.GetName() - dst.Help = src.GetHelp() - dst.Type = dto.MetricType(src.GetType()) - dst.Unit = src.GetUnit() - dst.Metric = make([]dto.Metric, len(src.Metric)) - for i, m := range src.Metric { - dst.Metric[i] = dto.Metric{ - Label: make([]dto.LabelPair, 0, len(m.GetLabel())), - } - for _, lp := range m.GetLabel() { - dst.Metric[i].Label = append(dst.Metric[i].Label, dto.LabelPair{ - Name: lp.GetName(), - Value: lp.GetValue(), - }) - } - switch src.GetType() { - case io_prometheus_client.MetricType_COUNTER: - dst.Metric[i].Counter = &dto.Counter{ - Value: m.GetCounter().GetValue(), - } - case io_prometheus_client.MetricType_GAUGE: - dst.Metric[i].Gauge = &dto.Gauge{ - Value: m.GetGauge().GetValue(), - } - case io_prometheus_client.MetricType_SUMMARY: - dst.Metric[i].Summary = &dto.Summary{ - SampleCount: m.GetSummary().GetSampleCount(), - SampleSum: m.GetSummary().GetSampleSum(), - Quantile: make([]dto.Quantile, len(m.GetSummary().GetQuantile())), - } - for j, q := range m.GetSummary().GetQuantile() { - dst.Metric[i].GetSummary().Quantile[j] = dto.Quantile{ - Quantile: q.GetQuantile(), - Value: q.GetValue(), - } - } - case io_prometheus_client.MetricType_HISTOGRAM: - dst.Metric[i].Histogram = &dto.Histogram{ - SampleCount: m.GetHistogram().GetSampleCount(), - SampleSum: m.GetHistogram().GetSampleSum(), - Bucket: make([]dto.Bucket, len(m.GetHistogram().GetBucket())), - } - } - for j, b := range m.GetHistogram().GetBucket() { - dst.Metric[i].GetHistogram().Bucket[j] = dto.Bucket{ - CumulativeCount: b.GetCumulativeCount(), - UpperBound: b.GetUpperBound(), - } - } - } -} - -// write me iterator over prometheus.Gatherer.Gather() result to get all metrics -type gathererIterator struct { - mfs []*io_prometheus_client.MetricFamily - index int -} - -func (it *gathererIterator) readNext(mf *dto.MetricFamily) error { - fmt.Println("[gathererIterator] readNext is called") - if it == nil || it.index >= len(it.mfs) { - return io.EOF - } - // Copies proto message from io_prometheus_client.MetricFamily to dto.MetricFamily - _, err := convertMetricFamilyPb(it.mfs[it.index], mf) - if err != nil { - e := fmt.Errorf("failed to convert io_prometheus_client.MetricFamily to dto.MetricFamily: %w", err) - // todo:remove this - fmt.Println(e) - return e - } - it.index++ - return nil -} - -func NewGathererParser(b []byte, parseClassicHistograms bool, st *labels.SymbolTable, mfs []*io_prometheus_client.MetricFamily) Parser { - return &ProtobufParser{ - in: b, - state: EntryInvalid, - mf: &dto.MetricFamily{}, - metricBytes: &bytes.Buffer{}, - parseClassicHistograms: parseClassicHistograms, - builder: labels.NewScratchBuilderWithSymbolTable(st, 16), - gathererIterator: &gathererIterator{mfs: mfs}, - } -} diff --git a/model/textparse/protobufparse_test.go b/model/textparse/protobufparse_test.go index e323a6cc8f3..febb9005d7f 100644 --- a/model/textparse/protobufparse_test.go +++ b/model/textparse/protobufparse_test.go @@ -32,7 +32,7 @@ import ( dto "github.com/prometheus/prometheus/prompb/io/prometheus/client" ) -func createTestProtoBuf(t *testing.T) *bytes.Buffer { +func createTestProtoBuf(t *testing.T) ([]string, *bytes.Buffer) { testMetricFamilies := []string{ `name: "go_build_info" help: "Build information about the main Go module." @@ -715,7 +715,7 @@ metric: < buf.Write(protoBuf) } - return buf + return testMetricFamilies, buf } func TestProtobufParse(t *testing.T) { @@ -734,7 +734,7 @@ func TestProtobufParse(t *testing.T) { ct int64 } - inputBuf := createTestProtoBuf(t) + _, inputBuf := createTestProtoBuf(t) scenarios := []struct { name string diff --git a/scrape/promotel.go b/scrape/promotel.go new file mode 100644 index 00000000000..47be2cba3cf --- /dev/null +++ b/scrape/promotel.go @@ -0,0 +1,140 @@ +package scrape + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/textparse" + "github.com/prometheus/prometheus/storage" +) + +type GathereLoop struct { + *scrapeLoop + g prometheus.Gatherer +} + +func (gl *GathereLoop) newParser() (textparse.Parser, error) { + mfs, err := gl.g.Gather() + if err != nil { + gl.l.Log("msg", "Error while gathering metrics", "err", err) + return nil, err + } + return textparse.NewProtobufParserShim(gl.scrapeClassicHistograms, gl.symbolTable, mfs), err + +} + +func (gl *GathereLoop) Run(errc chan<- error) { + gl.scrapeLoop.run(errc) +} + +func (gl *GathereLoop) Stop() { + gl.scrapeLoop.stop() +} + +func (gl *GathereLoop) ScrapeAndReport( + last, appendTime time.Time, errc chan<- error, +) time.Time { + return gl.scrapeAndReport(last, appendTime, errc) +} + +func noopScrapeFunc(context.Context, io.Writer) error { return nil } + +func newNoopTarget(lbls labels.Labels) *Target { + return &Target{labels: lbls} +} + +func NewGathererLoop(ctx context.Context, logger log.Logger, app storage.Appendable, reg prometheus.Registerer, g prometheus.Gatherer, interval time.Duration) (*GathereLoop, error) { + nopMutator := func(l labels.Labels) labels.Labels { return l } + metrics, err := newScrapeMetrics(reg) + if err != nil { + return nil, err + } + if logger == nil { + logger = log.NewNopLogger() + } + target := newNoopTarget([]labels.Label{ + {Name: model.JobLabel, Value: "promotel"}, // required label + {Name: model.InstanceLabel, Value: "promotel"}, // required label + {Name: model.ScrapeIntervalLabel, Value: interval.String()}, + {Name: model.MetricsPathLabel, Value: config.DefaultScrapeConfig.MetricsPath}, + {Name: model.SchemeLabel, Value: config.DefaultScrapeConfig.Scheme}, + }) + loop := &GathereLoop{ + newScrapeLoop( + ctx, + &scraperShim{scrapeFunc: noopScrapeFunc}, + logger, + nil, + nopMutator, + nopMutator, + func(ctx context.Context) storage.Appender { return app.Appender(ctx) }, + nil, + labels.NewSymbolTable(), + 0, + true, + false, + true, + 0, + 0, + histogram.ExponentialSchemaMax, + nil, + interval, + time.Hour, + false, + false, + false, + false, // todo: pass this from the opts + false, + target, + true, + metrics, + true, + ), + g, + } + // Override the newParser function to use the gatherer. + loop.scrapeLoop.newParserFunc = loop.newParser + return loop, nil +} + +// scraperShim implements the scraper interface and allows setting values +// returned by its methods. It also allows setting a custom scrape function. +type scraperShim struct { + offsetDur time.Duration + + lastStart time.Time + lastDuration time.Duration + lastError error + + scrapeErr error + scrapeFunc func(context.Context, io.Writer) error +} + +func (ts *scraperShim) offset(time.Duration, uint64) time.Duration { + return ts.offsetDur +} + +func (ts *scraperShim) Report(start time.Time, duration time.Duration, err error) { + ts.lastStart = start + ts.lastDuration = duration + ts.lastError = err +} + +func (ts *scraperShim) scrape(ctx context.Context) (*http.Response, error) { + return nil, ts.scrapeErr +} + +func (ts *scraperShim) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) { + if ts.scrapeFunc != nil { + return "", ts.scrapeFunc(ctx, w) + } + return "", ts.scrapeErr +} diff --git a/scrape/promotel_test.go b/scrape/promotel_test.go new file mode 100644 index 00000000000..3a633657899 --- /dev/null +++ b/scrape/promotel_test.go @@ -0,0 +1,250 @@ +package scrape_test + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "math/rand" + "strings" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/storage" + + "github.com/prometheus/client_golang/prometheus" +) + +// TestScrapeLoopScrapeAndReport exercises scrapeAndReport with various scenarios +// (successful scrape, failed scrape, forced error, empty body leading to staleness, etc.). +func TestScrapeLoopScrapeAndReport(t *testing.T) { + appendable := &collectResultAppendable{&testAppender{}} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + reg := prometheus.NewRegistry() + + sl, err := scrape.NewGathererLoop(ctx, nil, appendable, reg, reg, 10*time.Millisecond) + require.NoError(t, err) + + start := time.Now() + sl.ScrapeAndReport(time.Time{}, start, nil) + // The collectResultAppender holds all appended samples. Check the last appended + // for staleness or actual data, depending on if the scrape was declared OK. + allSamples := appendable.resultFloats + // We expect at least one normal sample plus the reported samples. + require.NotEmpty(t, allSamples, "Expected to see appended samples.") + + // reset the appender + appendable.testAppender = &testAppender{} + // create counter metric + counter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "metric_a", + Help: "metric_a help", + }, []string{"label_a"}) + reg.MustRegister(counter) + counter.WithLabelValues("value_a").Add(42) + + mfs, err := reg.Gather() + require.NoError(t, err) + // verify that metric_a is present in Gatherer results + var foundMetric bool + for _, mf := range mfs { + if mf.GetName() == "metric_a" { + // verify metrics value + require.Len(t, mf.GetMetric(), 1) + require.Equal(t, "value_a", mf.GetMetric()[0].GetLabel()[0].GetValue()) + require.Equal(t, 42.0, mf.GetMetric()[0].GetCounter().GetValue()) + foundMetric = true + break + } + } + require.True(t, foundMetric, "Expected to see the 'metric_a' counter metric.") + + sl.ScrapeAndReport(time.Time{}, start, nil) + // Get all appended samples + allSamples = appendable.resultFloats + // verify that the counter metric 'metric_a' was reported + var found bool + for _, s := range allSamples { + if s.metric.Get("__name__") == "metric_a" && s.metric.Get("label_a") == "value_a" { + found = true + require.Equal(t, 42.0, s.f) + } + } + require.True(t, found, "Expected to see the 'metric_a' counter metric.") +} + +type floatSample struct { + metric labels.Labels + t int64 + f float64 +} + +type histogramSample struct { + t int64 + h *histogram.Histogram + fh *histogram.FloatHistogram +} + +type collectResultAppendable struct { + *testAppender +} + +func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender { + return a +} + +// testAppender records all samples that were added through the appender. +// It can be used as its zero value or be backed by another appender it writes samples through. +type testAppender struct { + mtx sync.Mutex + + next storage.Appender + resultFloats []floatSample + pendingFloats []floatSample + rolledbackFloats []floatSample + resultHistograms []histogramSample + pendingHistograms []histogramSample + rolledbackHistograms []histogramSample + resultExemplars []exemplar.Exemplar + pendingExemplars []exemplar.Exemplar + resultMetadata []metadata.Metadata + pendingMetadata []metadata.Metadata +} + +func (a *testAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() + a.pendingFloats = append(a.pendingFloats, floatSample{ + metric: lset, + t: t, + f: v, + }) + + if ref == 0 { + ref = storage.SeriesRef(rand.Uint64()) + } + if a.next == nil { + return ref, nil + } + + ref, err := a.next.Append(ref, lset, t, v) + if err != nil { + return 0, err + } + return ref, err +} + +func (a *testAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() + a.pendingExemplars = append(a.pendingExemplars, e) + if a.next == nil { + return 0, nil + } + + return a.next.AppendExemplar(ref, l, e) +} + +func (a *testAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() + a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) + if a.next == nil { + return 0, nil + } + + return a.next.AppendHistogram(ref, l, t, h, fh) +} + +func (a *testAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() + a.pendingMetadata = append(a.pendingMetadata, m) + if ref == 0 { + ref = storage.SeriesRef(rand.Uint64()) + } + if a.next == nil { + return ref, nil + } + + return a.next.UpdateMetadata(ref, l, m) +} + +func (a *testAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + return a.Append(ref, l, ct, 0.0) +} + +func (a *testAppender) Commit() error { + a.mtx.Lock() + defer a.mtx.Unlock() + a.resultFloats = append(a.resultFloats, a.pendingFloats...) + a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) + a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...) + a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...) + a.pendingFloats = nil + a.pendingExemplars = nil + a.pendingHistograms = nil + a.pendingMetadata = nil + if a.next == nil { + return nil + } + return a.next.Commit() +} + +func (a *testAppender) Rollback() error { + a.mtx.Lock() + defer a.mtx.Unlock() + a.rolledbackFloats = a.pendingFloats + a.rolledbackHistograms = a.pendingHistograms + a.pendingFloats = nil + a.pendingHistograms = nil + if a.next == nil { + return nil + } + return a.next.Rollback() +} + +func (a *testAppender) String() string { + var sb strings.Builder + for _, s := range a.resultFloats { + sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.f, s.t)) + } + for _, s := range a.pendingFloats { + sb.WriteString(fmt.Sprintf("pending: %s %f %d\n", s.metric, s.f, s.t)) + } + for _, s := range a.rolledbackFloats { + sb.WriteString(fmt.Sprintf("rolledback: %s %f %d\n", s.metric, s.f, s.t)) + } + return sb.String() +} + +// protoMarshalDelimited marshals a MetricFamily into a delimited +// Prometheus proto exposition format bytes (known as 'encoding=delimited`) +// +// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers +func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte { + t.Helper() + + protoBuf, err := proto.Marshal(mf) + require.NoError(t, err) + + varintBuf := make([]byte, binary.MaxVarintLen32) + varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) + + buf := &bytes.Buffer{} + buf.Write(varintBuf[:varintLength]) + buf.Write(protoBuf) + return buf.Bytes() +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 48bfb3cc950..a0d133c61f8 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -879,6 +879,8 @@ type scrapeLoop struct { metrics *scrapeMetrics skipOffsetting bool // For testability. + + newParserFunc func() (textparse.Parser, error) } // scrapeCache tracks mappings of exposed metric strings to label sets and @@ -1490,18 +1492,16 @@ type appendErrors struct { numExemplarOutOfOrder int } -func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { - p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable) - if g := GetDefaultGatherer(); g != nil { - if mfs, err := g.Gather(); err != nil { - level.Debug(sl.l).Log( - "msg", "Failed to get metrics from Gather.", - "err", err, - ) - } else { - p = textparse.NewGathererParser(b, sl.scrapeClassicHistograms, sl.symbolTable, mfs) - } +func (sl *scrapeLoop) newParser(b []byte, contentType string) (textparse.Parser, error) { + if sl.newParserFunc != nil { + return sl.newParserFunc() } + return textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable) +} + +func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { + p, err := sl.newParser(b, contentType) + if err != nil { level.Debug(sl.l).Log( "msg", "Invalid content type on scrape, using prometheus parser as fallback.", diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 42a7813300d..2846d4c3d31 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -688,7 +688,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app } func TestScrapeLoopStopBeforeRun(t *testing.T) { - scraper := &testScraper{} + scraper := &scraperShim{} sl := newBasicScrapeLoop(t, context.Background(), scraper, nil, 1) // The scrape pool synchronizes on stopping scrape loops. However, new scrape @@ -740,7 +740,7 @@ func TestScrapeLoopStop(t *testing.T) { var ( signal = make(chan struct{}, 1) appender = &collectResultAppender{} - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return appender } ) @@ -795,7 +795,7 @@ func TestScrapeLoopRun(t *testing.T) { signal = make(chan struct{}, 1) errc = make(chan error) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return &nopAppender{} } scrapeMetrics = newTestScrapeMetrics(t) ) @@ -900,7 +900,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { signal = make(chan struct{}, 1) errc = make(chan error) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return &nopAppender{} } ) @@ -938,7 +938,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { func TestScrapeLoopMetadata(t *testing.T) { var ( signal = make(chan struct{}) - scraper = &testScraper{} + scraper = &scraperShim{} scrapeMetrics = newTestScrapeMetrics(t) cache = newScrapeCache(scrapeMetrics) ) @@ -1010,7 +1010,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) { t.Cleanup(func() { s.Close() }) ctx, cancel := context.WithCancel(context.Background()) - sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0) + sl := newBasicScrapeLoop(t, ctx, &scraperShim{}, s.Appender, 0) t.Cleanup(func() { cancel() }) return ctx, sl @@ -1051,7 +1051,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) { Separator: ";", Replacement: "$1", }} - sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0) + sl := newBasicScrapeLoop(t, ctx, &scraperShim{}, s.Appender, 0) sl.sampleMutator = func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, target, true, relabelConfig) } @@ -1111,7 +1111,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { appender := &collectResultAppender{} var ( signal = make(chan struct{}, 1) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return appender } ) @@ -1156,7 +1156,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { appender := &collectResultAppender{} var ( signal = make(chan struct{}, 1) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return appender } numScrapes = 0 ) @@ -1206,7 +1206,7 @@ func TestScrapeLoopCache(t *testing.T) { appender := &collectResultAppender{} var ( signal = make(chan struct{}, 1) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { appender.next = s.Appender(ctx); return appender } ) @@ -1270,7 +1270,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { appender := &collectResultAppender{next: sapp} var ( signal = make(chan struct{}, 1) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return appender } ) @@ -2234,7 +2234,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) { func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { var ( - scraper = &testScraper{} + scraper = &scraperShim{} appender = &collectResultAppender{} app = func(ctx context.Context) storage.Appender { return appender } ) @@ -2253,7 +2253,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { var ( - scraper = &testScraper{} + scraper = &scraperShim{} appender = &collectResultAppender{} app = func(ctx context.Context) storage.Appender { return appender } ) @@ -2572,37 +2572,6 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { // testScraper implements the scraper interface and allows setting values // returned by its methods. It also allows setting a custom scrape function. -type testScraper struct { - offsetDur time.Duration - - lastStart time.Time - lastDuration time.Duration - lastError error - - scrapeErr error - scrapeFunc func(context.Context, io.Writer) error -} - -func (ts *testScraper) offset(time.Duration, uint64) time.Duration { - return ts.offsetDur -} - -func (ts *testScraper) Report(start time.Time, duration time.Duration, err error) { - ts.lastStart = start - ts.lastDuration = duration - ts.lastError = err -} - -func (ts *testScraper) scrape(ctx context.Context) (*http.Response, error) { - return nil, ts.scrapeErr -} - -func (ts *testScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) { - if ts.scrapeFunc != nil { - return "", ts.scrapeFunc(ctx, w) - } - return "", ts.scrapeErr -} func TestScrapeLoop_RespectTimestamps(t *testing.T) { s := teststorage.New(t) @@ -2660,7 +2629,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { defer s.Close() ctx, cancel := context.WithCancel(context.Background()) - sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0) + sl := newBasicScrapeLoop(t, ctx, &scraperShim{}, s.Appender, 0) defer cancel() // We add a good and a bad metric to check that both are discarded. @@ -2699,7 +2668,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { app := s.Appender(context.Background()) ctx, cancel := context.WithCancel(context.Background()) - sl := newBasicScrapeLoop(t, context.Background(), &testScraper{}, func(ctx context.Context) storage.Appender { return app }, 0) + sl := newBasicScrapeLoop(t, context.Background(), &scraperShim{}, func(ctx context.Context) storage.Appender { return app }, 0) sl.sampleMutator = func(l labels.Labels) labels.Labels { if l.Has("drop") { return labels.FromStrings("no", "name") // This label set will trigger an error. @@ -2951,7 +2920,7 @@ func TestScrapeAddFast(t *testing.T) { defer s.Close() ctx, cancel := context.WithCancel(context.Background()) - sl := newBasicScrapeLoop(t, ctx, &testScraper{}, s.Appender, 0) + sl := newBasicScrapeLoop(t, ctx, &scraperShim{}, s.Appender, 0) defer cancel() slApp := sl.appender(ctx) @@ -3017,7 +2986,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { var ( signal = make(chan struct{}, 1) - scraper = &testScraper{} + scraper = &scraperShim{} ) ctx, cancel := context.WithCancel(context.Background()) @@ -3393,7 +3362,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t * appender := &collectResultAppender{} var ( signal = make(chan struct{}, 1) - scraper = &testScraper{} + scraper = &scraperShim{} app = func(ctx context.Context) storage.Appender { return appender } ) diff --git a/scrape/target.go b/scrape/target.go index 9ef4471fbd1..a2622a1a675 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -582,4 +582,4 @@ func TargetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig, noDefault } } return targets, failures -} +} \ No newline at end of file From e7b3ca9287cc5bc74ae7f43f718358ea15ed190c Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:26:10 -0500 Subject: [PATCH 5/7] Make UnregisterMetrics, NewScrapeMetrics exportable --- scrape/promotel.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/scrape/promotel.go b/scrape/promotel.go index 47be2cba3cf..8eed233ca90 100644 --- a/scrape/promotel.go +++ b/scrape/promotel.go @@ -45,28 +45,41 @@ func (gl *GathereLoop) ScrapeAndReport( return gl.scrapeAndReport(last, appendTime, errc) } +func (gl *GathereLoop) UnregisterMetrics() { + gl.scrapeLoop.metrics.Unregister() +} + func noopScrapeFunc(context.Context, io.Writer) error { return nil } func newNoopTarget(lbls labels.Labels) *Target { return &Target{labels: lbls} } +func NewScrapeMetrics(reg prometheus.Registerer) (*scrapeMetrics, error) { + return newScrapeMetrics(reg) +} + +const ( + jobLabelDefault = "promotel_job" + instanceLabelDefault = "promotel_instance" +) + func NewGathererLoop(ctx context.Context, logger log.Logger, app storage.Appendable, reg prometheus.Registerer, g prometheus.Gatherer, interval time.Duration) (*GathereLoop, error) { nopMutator := func(l labels.Labels) labels.Labels { return l } - metrics, err := newScrapeMetrics(reg) - if err != nil { - return nil, err - } if logger == nil { logger = log.NewNopLogger() } target := newNoopTarget([]labels.Label{ - {Name: model.JobLabel, Value: "promotel"}, // required label - {Name: model.InstanceLabel, Value: "promotel"}, // required label + {Name: model.JobLabel, Value: jobLabelDefault}, // required label + {Name: model.InstanceLabel, Value: instanceLabelDefault}, // required label {Name: model.ScrapeIntervalLabel, Value: interval.String()}, {Name: model.MetricsPathLabel, Value: config.DefaultScrapeConfig.MetricsPath}, {Name: model.SchemeLabel, Value: config.DefaultScrapeConfig.Scheme}, }) + metrics, err := newScrapeMetrics(reg) + if err != nil { + return nil, err + } loop := &GathereLoop{ newScrapeLoop( ctx, From d954900cd414da1004e074d9de53c5b39dc9be59 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Tue, 28 Jan 2025 10:19:42 -0500 Subject: [PATCH 6/7] Rename module to github.com/pkcll/prometheus --- go.mod | 8 +++----- go.sum | 6 ++---- scrape/promotel.go | 3 ++- scrape/promotel_test.go | 4 ++-- scrape/scrape.go | 3 ++- 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 44c0aca7a70..9070a1a7482 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/prometheus/prometheus +module github.com/pkcll/prometheus go 1.21.0 @@ -58,6 +58,7 @@ require ( github.com/prometheus/common/assets v0.2.0 github.com/prometheus/common/sigv4 v0.1.0 github.com/prometheus/exporter-toolkit v0.11.0 + github.com/prometheus/prometheus v0.54.1 github.com/scaleway/scaleway-sdk-go v1.0.0-beta.29 github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c github.com/stretchr/testify v1.9.0 @@ -100,6 +101,7 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.4.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect @@ -109,7 +111,6 @@ require ( github.com/cilium/ebpf v0.11.0 // indirect github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect - github.com/containerd/log v0.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.5.0 // indirect @@ -167,10 +168,8 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect @@ -194,7 +193,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240708141625-4ad9e859172b // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect - gotest.tools/v3 v3.0.3 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/go.sum b/go.sum index bb515753de1..64edebf490e 100644 --- a/go.sum +++ b/go.sum @@ -135,7 +135,6 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= -github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -641,6 +640,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/prometheus/prometheus v0.54.1 h1:vKuwQNjnYN2/mDoWfHXDhAsz/68q/dQDb+YbcEqU7MQ= +github.com/prometheus/prometheus v0.54.1/go.mod h1:xlLByHhk2g3ycakQGrMaU8K7OySZx98BzeCR99991NY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -674,7 +675,6 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -990,7 +990,6 @@ golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBn golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= -golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -1152,7 +1151,6 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/scrape/promotel.go b/scrape/promotel.go index 8eed233ca90..7265e6e3d84 100644 --- a/scrape/promotel.go +++ b/scrape/promotel.go @@ -12,8 +12,9 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/storage" + + "github.com/pkcll/prometheus/model/textparse" ) type GathereLoop struct { diff --git a/scrape/promotel_test.go b/scrape/promotel_test.go index 3a633657899..2eccc6562ef 100644 --- a/scrape/promotel_test.go +++ b/scrape/promotel_test.go @@ -14,15 +14,15 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/client_golang/prometheus" + "github.com/pkcll/prometheus/scrape" ) // TestScrapeLoopScrapeAndReport exercises scrapeAndReport with various scenarios diff --git a/scrape/scrape.go b/scrape/scrape.go index a0d133c61f8..712890b9077 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -46,11 +46,12 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/relabel" - "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/pool" + + "github.com/pkcll/prometheus/model/textparse" ) // ScrapeTimestampTolerance is the tolerance for scrape appends timestamps From 6e977873d1b51bf997f0f535848c8d56ef3fc41d Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Wed, 5 Feb 2025 10:28:29 -0500 Subject: [PATCH 7/7] Remove unused gathererScraper code --- scrape/manager_test.go | 15 +---- scrape/promotel_test.go | 23 -------- scrape/scrape.go | 118 ++-------------------------------------- scrape/scrape_test.go | 57 ++++++------------- 4 files changed, 23 insertions(+), 190 deletions(-) diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 76b0a523f44..3646da1548e 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -793,15 +793,8 @@ func TestManagerCTZeroIngestion(t *testing.T) { })) once := sync.Once{} - // Start fake HTTP target to that allow one scrape only. - ctrType := dto.MetricType_COUNTER - mf := &dto.MetricFamily{ - Name: proto.String(mName), - Type: &ctrType, - Metric: []*dto.Metric{{Counter: tc.counterSample}}, - } - mfs := []*dto.MetricFamily{mf} + // Start fake HTTP target to that allow one scrape only. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fail := true once.Do(func() { @@ -823,16 +816,10 @@ func TestManagerCTZeroIngestion(t *testing.T) { require.NoError(t, err) } else { // This enables scraper to read metrics from the handler directly without making HTTP request - SetDefaultGathererHandler(handler) - defer SetDefaultGathererHandler(nil) serverURL, err = url.Parse("http://not-started:8080") require.NoError(t, err) } - testPromGatherer := prometheus.Gatherer(&testGatherer{t, mfs}) - // This will cause scrapeLoop to a switch from ProtobufParser to GathererParser which reads directly from testPromGatherer - SetDefaultGatherer(testPromGatherer) - // Add fake target directly into tsets + reload. Normally users would use // Manager.Run and wait for minimum 5s refresh interval. scrapeManager.updateTsets(map[string][]*targetgroup.Group{ diff --git a/scrape/promotel_test.go b/scrape/promotel_test.go index 2eccc6562ef..ae73dea4fdd 100644 --- a/scrape/promotel_test.go +++ b/scrape/promotel_test.go @@ -1,9 +1,7 @@ package scrape_test import ( - "bytes" "context" - "encoding/binary" "fmt" "math/rand" "strings" @@ -11,11 +9,9 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -229,22 +225,3 @@ func (a *testAppender) String() string { } return sb.String() } - -// protoMarshalDelimited marshals a MetricFamily into a delimited -// Prometheus proto exposition format bytes (known as 'encoding=delimited`) -// -// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers -func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte { - t.Helper() - - protoBuf, err := proto.Marshal(mf) - require.NoError(t, err) - - varintBuf := make([]byte, binary.MaxVarintLen32) - varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) - - buf := &bytes.Buffer{} - buf.Write(varintBuf[:varintLength]) - buf.Write(protoBuf) - return buf.Bytes() -} diff --git a/scrape/scrape.go b/scrape/scrape.go index 712890b9077..38d6ae34055 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -27,7 +27,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/go-kit/log" @@ -37,8 +36,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/version" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" @@ -322,14 +319,14 @@ func (sp *scrapePool) restartLoops(reuseCache bool) { t := sp.activeTargets[fp] interval, timeout, err := t.intervalAndTimeout(interval, timeout) var ( - s = newScraper(&targetScraper{ + s = &targetScraper{ Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader(sp.config.ScrapeProtocols), acceptEncodingHeader: acceptEncodingHeader(enableCompression), - }) + } newLoop = sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, @@ -466,7 +463,7 @@ func (sp *scrapePool) sync(targets []*Target) { // for every target. var err error interval, timeout, err = t.intervalAndTimeout(interval, timeout) - s := newScraper(&targetScraper{ + s := &targetScraper{ Target: t, client: sp.client, timeout: timeout, @@ -474,7 +471,7 @@ func (sp *scrapePool) sync(targets []*Target) { acceptHeader: acceptHeader(sp.config.ScrapeProtocols), acceptEncodingHeader: acceptEncodingHeader(enableCompression), metrics: sp.metrics, - }) + } l := sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, @@ -713,13 +710,6 @@ type targetScraper struct { metrics *scrapeMetrics } -func newScraper(ts *targetScraper) scraper { - if handler := GetDefaultGathererHandler(); handler != nil { - return &gathererScraper{ts, handler} - } - return ts -} - var errBodySizeLimit = errors.New("body size limit exceeded") // acceptHeader transforms preference from the options into specific header values as @@ -2024,103 +2014,3 @@ func pickSchema(bucketFactor float64) int32 { return int32(floor) } } - -// Scraper implementation that fetches metrics data from Gatherer http.Handler. -type gathererScraper struct { - *targetScraper - h http.Handler -} - -type scrapeResult struct { - resp *http.Response - err error -} - -func (gs *gathererScraper) scrape(ctx context.Context) (*http.Response, error) { - resCh := make(chan scrapeResult, 1) - go func() { - defer close(resCh) - req, err := gs.scrapeRequest() - if err != nil { - resCh <- scrapeResult{nil, err} - return - } - w := newResponseWriter(req) - if gs.h != nil { - gs.h.ServeHTTP(w, req) - } - fmt.Println("[gathererScraper] scraping metrics") - resCh <- scrapeResult{w.response, nil} - }() - select { - case <-ctx.Done(): - return nil, ctx.Err() - case r := <-resCh: - return r.resp, r.err - } -} - -type responseWriter struct { - http.ResponseWriter - response *http.Response - // Writes to response body - w io.Writer -} - -func newResponseWriter(req *http.Request) *responseWriter { - buf := new(bytes.Buffer) - - return &responseWriter{ - w: io.Writer(buf), - response: &http.Response{ - Status: http.StatusText(http.StatusOK), - StatusCode: http.StatusOK, - Header: make(http.Header), - Body: io.NopCloser(buf), - Request: req, - }, - } -} - -func (rw *responseWriter) Header() http.Header { - return rw.response.Header -} - -func (rw *responseWriter) Write(data []byte) (int, error) { - return rw.w.Write(data) -} - -func (rw *responseWriter) WriteHeader(statusCode int) { - rw.response.StatusCode = statusCode - rw.response.Status = fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)) -} - -var ( - defaultGathererHandler atomic.Pointer[http.Handler] - - defaultGatherer atomic.Pointer[prometheus.Gatherer] -) - -// This enables scraper to read metrics from the handler directly without making HTTP request -func SetDefaultGathererHandler(h http.Handler) { - defaultGathererHandler.Store(&h) -} - -func SetDefaultGatherer(g prometheus.Gatherer) { - defaultGatherer.Store(&g) - SetDefaultGathererHandler(promhttp.HandlerFor(g, promhttp.HandlerOpts{})) -} - -func GetDefaultGathererHandler() http.Handler { - if h := defaultGathererHandler.Load(); h != nil { - return *h - } - return nil -} - -func GetDefaultGatherer() prometheus.Gatherer { - if g := defaultGatherer.Load(); g != nil { - return *g - } - return nil -} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 2846d4c3d31..3e4fce015a9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2333,20 +2333,6 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { require.Equal(t, 0, seriesAdded) } -const useGathererHandler = true - -func newHTTPTestServer(handler http.Handler) *httptest.Server { - if useGathererHandler { - server := httptest.NewUnstartedServer(handler) - server.URL = "http://not-started:8080" - SetDefaultGathererHandler(handler) - return server - } - server := httptest.NewServer(handler) - SetDefaultGathererHandler(nil) - return server -} - func TestTargetScraperScrapeOK(t *testing.T) { const ( configTimeout = 1500 * time.Millisecond @@ -2355,7 +2341,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { var protobufParsing bool - server := newHTTPTestServer( + server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if protobufParsing { accept := r.Header.Get("Accept") @@ -2371,7 +2357,6 @@ func TestTargetScraperScrapeOK(t *testing.T) { }), ) defer server.Close() - defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { @@ -2379,7 +2364,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { } runTest := func(acceptHeader string) { - ts := newScraper(&targetScraper{ + ts := &targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2389,7 +2374,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { client: http.DefaultClient, timeout: configTimeout, acceptHeader: acceptHeader, - }) + } var buf bytes.Buffer resp, err := ts.scrape(context.Background()) @@ -2408,20 +2393,19 @@ func TestTargetScraperScrapeOK(t *testing.T) { func TestTargetScrapeScrapeCancel(t *testing.T) { block := make(chan struct{}) - server := newHTTPTestServer( + server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { <-block }), ) defer server.Close() - defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { panic(err) } - ts := newScraper(&targetScraper{ + ts := &targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2430,7 +2414,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { }, client: http.DefaultClient, acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols), - }) + } ctx, cancel := context.WithCancel(context.Background()) errc := make(chan error, 1) @@ -2464,20 +2448,19 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { } func TestTargetScrapeScrapeNotFound(t *testing.T) { - server := newHTTPTestServer( + server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) }), ) defer server.Close() - defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { panic(err) } - ts := newScraper(&targetScraper{ + ts := &targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2486,7 +2469,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { }, client: http.DefaultClient, acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols), - }) + } resp, err := ts.scrape(context.Background()) require.NoError(t, err) @@ -2501,7 +2484,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { responseBody = "metric_a 1\nmetric_b 2\n" ) var gzipResponse bool - server := newHTTPTestServer( + server := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `text/plain; version=0.0.4`) if gzipResponse { @@ -2515,14 +2498,13 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { }), ) defer server.Close() - defer SetDefaultGathererHandler(nil) serverURL, err := url.Parse(server.URL) if err != nil { panic(err) } - ts := &targetScraper{ + s := &targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -2534,7 +2516,6 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols), metrics: newTestScrapeMetrics(t), } - s := newScraper(ts) var buf bytes.Buffer // Target response uncompressed body, scrape with body size limit. @@ -2554,7 +2535,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) { // Target response uncompressed body, scrape without body size limit. gzipResponse = false buf.Reset() - ts.bodySizeLimit = 0 + s.bodySizeLimit = 0 resp, err = s.scrape(context.Background()) require.NoError(t, err) _, err = s.readResponse(context.Background(), resp, &buf) @@ -3051,7 +3032,7 @@ func TestScrapeReportLimit(t *testing.T) { scrapedTwice = make(chan bool) ) - ts := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n") scrapes++ if scrapes == 2 { @@ -3059,7 +3040,6 @@ func TestScrapeReportLimit(t *testing.T) { } })) defer ts.Close() - defer SetDefaultGathererHandler(nil) sp, err := newScrapePool(cfg, s, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) @@ -3300,7 +3280,7 @@ test_summary_count 199 scrapeCount := 0 scraped := make(chan bool) - ts := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, metricsText) scrapeCount++ if scrapeCount > 2 { @@ -3308,7 +3288,6 @@ test_summary_count 199 } })) defer ts.Close() - defer SetDefaultGathererHandler(nil) sp, err := newScrapePool(config, simpleStorage, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) require.NoError(t, err) @@ -3426,7 +3405,7 @@ func TestScrapeLoopCompression(t *testing.T) { t.Run(fmt.Sprintf("compression=%v,acceptEncoding=%s", tc.enableCompression, tc.acceptEncoding), func(t *testing.T) { scraped := make(chan bool) - ts := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, tc.acceptEncoding, r.Header.Get("Accept-Encoding"), "invalid value of the Accept-Encoding header") fmt.Fprint(w, metricsText) close(scraped) @@ -3587,7 +3566,7 @@ func BenchmarkTargetScraperGzip(b *testing.B) { for _, scenario := range scenarios { b.Run(fmt.Sprintf("metrics=%d", scenario.metricsCount), func(b *testing.B) { - ts := newScraper(&targetScraper{ + ts := &targetScraper{ Target: &Target{ labels: labels.FromStrings( model.SchemeLabel, serverURL.Scheme, @@ -3597,7 +3576,7 @@ func BenchmarkTargetScraperGzip(b *testing.B) { }, client: client, timeout: time.Second, - }) + } b.ResetTimer() for i := 0; i < b.N; i++ { _, err = ts.scrape(context.Background()) @@ -3696,7 +3675,7 @@ func testNativeHistogramMaxSchemaSet(t *testing.T, minBucketFactor string, expec buffer := protoMarshalDelimited(t, histogramMetricFamily) // Create a HTTP server to serve /metrics via ProtoBuf - metricsServer := newHTTPTestServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + metricsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) w.Write(buffer) }))