From 6a7e9c4d8e0cbad2b0beefb4ab86245fae37e606 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Tue, 5 Aug 2025 13:47:55 +0100 Subject: [PATCH] feat: Conditional GCM staleness support --- google/export/export.go | 4 +- google/export/series_cache.go | 9 ++- google/export/transform.go | 22 +++--- .../promqle2etest/backend_export_gcm.go | 24 +++++- google/internal/promqle2etest/gcm_test.go | 77 +++++++++++++++++++ promql/promqltest/test.go | 2 +- promql/promqltest/testdata/staleness.test | 56 +++----------- 7 files changed, 131 insertions(+), 63 deletions(-) diff --git a/google/export/export.go b/google/export/export.go index d43b181fb9..a028b39c8e 100644 --- a/google/export/export.go +++ b/google/export/export.go @@ -165,7 +165,7 @@ const ( batchDelayMax = 50 * time.Millisecond // Prefix for GCM metric. - MetricTypePrefix = "prometheus.googleapis.com" + MetricTypePrefix = "custom.googleapis.com" // "prometheus.googleapis.com" ) // Supported gRPC compression formats. @@ -986,6 +986,8 @@ func (b *batch) send( samplesPerRPCBatch.Observe(float64(len(l))) + fmt.Printf("DEBUG: Sending CreateTimeSeriesRequest.TimeSeries %v\n", l) + // We do not retry any requests due to the risk of producing a backlog // that cannot be worked down, especially if large amounts of clients try to do so. err := sendOne(sendCtx, &monitoring_pb.CreateTimeSeriesRequest{ diff --git a/google/export/series_cache.go b/google/export/series_cache.go index 11fa2b3d72..bb73e213ca 100644 --- a/google/export/series_cache.go +++ b/google/export/series_cache.go @@ -133,10 +133,10 @@ func (e *seriesCacheEntry) setNextRefresh() { } func newSeriesCache( - logger log.Logger, - reg prometheus.Registerer, - metricTypePrefix string, - matchers Matchers, + logger log.Logger, + reg prometheus.Registerer, + metricTypePrefix string, + matchers Matchers, ) *seriesCache { if logger == nil { logger = log.NewNopLogger() @@ -255,6 +255,7 @@ func (c *seriesCache) get(s record.RefSample, externalLabels labels.Labels, meta // getResetAdjusted takes a sample for a referenced series and returns // its reset timestamp and adjusted value. // If the last return argument is false, the sample should be dropped. +// TODO(bwplotka): Support staleness func (c *seriesCache) getResetAdjusted(ref storage.SeriesRef, t int64, v float64) (int64, float64, bool) { c.mtx.Lock() e, ok := c.entries[ref] diff --git a/google/export/transform.go b/google/export/transform.go index 64dcfe9bef..2e48f4e715 100644 --- a/google/export/transform.go +++ b/google/export/transform.go @@ -70,14 +70,17 @@ func discardExemplarIncIfExists(series storage.SeriesRef, exemplars map[storage. } type sampleBuilder struct { + allowStalenessMarkers bool + series *seriesCache dists map[uint64]*distribution } func newSampleBuilder(c *seriesCache) *sampleBuilder { return &sampleBuilder{ - series: c, - dists: make(map[uint64]*distribution, 128), + allowStalenessMarkers: true, // TODO(bwplotka): Hack, make it a proper ff. + series: c, + dists: make(map[uint64]*distribution, 128), } } @@ -94,8 +97,7 @@ func (b *sampleBuilder) next(metadata MetadataFunc, externalLabels labels.Labels sample := samples[0] tailSamples := samples[1:] - // Staleness markers are currently not supported by Cloud Monitoring. - if value.IsStaleNaN(sample.V) { + if value.IsStaleNaN(sample.V) && !b.allowStalenessMarkers { prometheusSamplesDiscarded.WithLabelValues("staleness-marker").Inc() discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "staleness-marker") return nil, tailSamples, nil @@ -384,12 +386,12 @@ func isHistogramSeries(metric, name string) bool { // Once all series for a single distribution have been observed, it returns it. // It returns the reset timestamp along with the distribution and the remaining samples. func (b *sampleBuilder) buildDistribution( - metric string, - _ labels.Labels, - samples []record.RefSample, - exemplars map[storage.SeriesRef]record.RefExemplar, - externalLabels labels.Labels, - metadata MetadataFunc, + metric string, + _ labels.Labels, + samples []record.RefSample, + exemplars map[storage.SeriesRef]record.RefExemplar, + externalLabels labels.Labels, + metadata MetadataFunc, ) (*distribution_pb.Distribution, int64, []record.RefSample, error) { // The Prometheus/OpenMetrics exposition format does not require all histogram series for a single distribution // to be grouped together. But it does require that all series for a histogram metric in general are grouped diff --git a/google/internal/promqle2etest/backend_export_gcm.go b/google/internal/promqle2etest/backend_export_gcm.go index f39feea582..4d8d165a2f 100644 --- a/google/internal/promqle2etest/backend_export_gcm.go +++ b/google/internal/promqle2etest/backend_export_gcm.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "math" "os" "testing" "time" @@ -38,6 +39,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" @@ -60,6 +62,8 @@ import ( type LocalExportGCMBackend struct { Name string GCMSA []byte + + YOLOStalenessInjection bool } func (l LocalExportGCMBackend) Ref() string { return l.Name } @@ -128,6 +132,7 @@ func (l LocalExportGCMBackend) StartAndWaitReady(t testing.TB, _ e2e.Environment "project_id": creds.ProjectID, "collector": "local-export-gcm", }, + YOLOStalenessInjection: l.YOLOStalenessInjection, } } @@ -138,7 +143,8 @@ type runningLocalExportWithGCM struct { e *export.Exporter // NOTE(bwplotka): Not guarded by mutex, so it has to be synced with Exporter.Export. - labelsByRef map[storage.SeriesRef]labels.Labels + labelsByRef map[storage.SeriesRef]labels.Labels + YOLOStalenessInjection bool } func (l *runningLocalExportWithGCM) API() v1.API { @@ -152,8 +158,10 @@ func (l *runningLocalExportWithGCM) CollectionLabels() map[string]string { func (l *runningLocalExportWithGCM) IngestSamples(ctx context.Context, t testing.TB, recorded [][]*dto.MetricFamily) { t.Helper() + metadata := map[string]export.MetricMetadata{} + st := labels.NewSymbolTable() - for _, mfs := range recorded { + for i, mfs := range recorded { if ctx.Err() != nil { return // cancel } @@ -161,6 +169,18 @@ func (l *runningLocalExportWithGCM) IngestSamples(ctx context.Context, t testing continue } + if l.YOLOStalenessInjection && i == len(recorded)-1 { + // YOLO, hack on last "scrape". + l.e.Export(func(metric string) (export.MetricMetadata, bool) { + m, ok := metadata[metric] + return m, ok + }, []record.RefSample{{ + // Yolo. + Ref: chunks.HeadSeriesRef(0), V: math.Float64frombits(value.StaleNaN), T: mfs[0].Metric[0].GetTimestampMs(), + }}, nil) + continue + } + // Encode gathered metric family as proto Prometheus exposition format, decode as internal // Prometheus textparse format to have metrics how Prometheus would have // before append. We don't use dto straight away due to quite complex code diff --git a/google/internal/promqle2etest/gcm_test.go b/google/internal/promqle2etest/gcm_test.go index 87610b96cf..4adfa80d0b 100644 --- a/google/internal/promqle2etest/gcm_test.go +++ b/google/internal/promqle2etest/gcm_test.go @@ -341,3 +341,80 @@ func TestExportGCM_PrometheusGauge(t *testing.T) { t.Cleanup(cancel) pt.Run(ctx) } + +// TestExportGCM_Staleness tests a gauge staleness behaviour. +func TestExportGCM_Staleness(t *testing.T) { + const interval = 15 * time.Second + + _, _, localExportGCM := setupBackends(t) + localExportGCM.YOLOStalenessInjection = true // This causes last recorded sample to be NaN. + + pt := promqle2e.NewScrapeStyleTest(t) + pt.SetCurrentTime(time.Now().Add(-10 * time.Minute)) // We only do a few scrapes, so -10m buffer is enough. + + //nolint:promlinter // Test metric. + gauge := promauto.With(pt.Registerer()).NewGaugeVec(prometheus.GaugeOpts{ + Name: "promqle2e_test_gauge_staleness_total", + Help: "Test gauge used by promqle2e test framework for acceptance tests.", + ConstLabels: map[string]string{"repo": "github.com/GoogleCloudPlatform/prometheus"}, + }, []string{"foo"}) + var g prometheus.Gauge + + // No metric expected, gaugeVec empty. + pt.RecordScrape(interval) + + g = gauge.WithLabelValues("bar") + g.Set(200) + pt.RecordScrape(interval). + Expect(g, 200, localExportGCM) + + // Staleness will be injected here (last scrape). + pt.RecordScrape(interval) + // PromQL, even raw instant query for range vector, does not return NaNs. + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) + t.Cleanup(cancel) + pt.Run(ctx) +} + +// TestExportGCM_Staleness tests a gauge staleness behaviour. +func TestExportGCM_StalenessCounter(t *testing.T) { + const interval = 15 * time.Second + + _, _, localExportGCM := setupBackends(t) + localExportGCM.YOLOStalenessInjection = true // This causes last recorded sample to be NaN. + + pt := promqle2e.NewScrapeStyleTest(t) + pt.SetCurrentTime(time.Now().Add(-10 * time.Minute)) // We only do a few scrapes, so -10m buffer is enough. + + //nolint:promlinter // Test metric. + counter := promauto.With(pt.Registerer()).NewCounterVec(prometheus.CounterOpts{ + Name: "promqle2e_test_counter_staleness_total", + Help: "Test counter used by promqle2e test framework for acceptance tests.", + ConstLabels: map[string]string{"repo": "github.com/GoogleCloudPlatform/prometheus"}, + }, []string{"foo"}) + var c prometheus.Counter + + // No metric expected, gaugeVec empty. + pt.RecordScrape(interval) + + c = counter.WithLabelValues("bar") + c.Add(0) + pt.RecordScrape(interval) + c.Add(200) + pt.RecordScrape(interval). + Expect(c, 200, localExportGCM) + c.Add(200) + pt.RecordScrape(interval). + Expect(c, 400, localExportGCM) + c.Add(200) + pt.RecordScrape(interval). + Expect(c, 600, localExportGCM) + // Staleness will be injected! + pt.RecordScrape(interval) + // PromQL, even raw instant query for range vector, does not return NaNs. + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) + t.Cleanup(cancel) + pt.Run(ctx) +} diff --git a/promql/promqltest/test.go b/promql/promqltest/test.go index 1affd91f63..3f29c8791c 100644 --- a/promql/promqltest/test.go +++ b/promql/promqltest/test.go @@ -91,7 +91,7 @@ func RunBuiltinTests(t *testing.T, engine promql.QueryEngine) { t.Cleanup(func() { parser.EnableExperimentalFunctions = false }) parser.EnableExperimentalFunctions = true - files, err := fs.Glob(testsFs, "*/*.test") + files, err := fs.Glob(testsFs, "*/staleness.test") require.NoError(t, err) for _, fn := range files { diff --git a/promql/promqltest/testdata/staleness.test b/promql/promqltest/testdata/staleness.test index 76ee2f2878..3bc8c3295d 100644 --- a/promql/promqltest/testdata/staleness.test +++ b/promql/promqltest/testdata/staleness.test @@ -1,51 +1,17 @@ load 10s - metric 0 1 stale 2 + metric 0 1 stale -# Instant vector doesn't return series when stale. -eval instant at 10s metric - {__name__="metric"} 1 +# DEBUGGING +eval instant at 40s rate(metric[5m]) + {} 0.005 -eval instant at 20s metric +#eval instant at 40s metric[31s] +# 1 @[10000] -eval instant at 30s metric - {__name__="metric"} 2 +eval instant at 40s rate(metric[1s]) -eval instant at 40s metric - {__name__="metric"} 2 +load 10 + metric2 200 400 600 stale -# It goes stale 5 minutes after the last sample. -eval instant at 330s metric - {__name__="metric"} 2 - -eval instant at 331s metric - - -# Range vector ignores stale sample. -eval instant at 30s count_over_time(metric[1m]) - {} 3 - -eval instant at 10s count_over_time(metric[1s]) - {} 1 - -eval instant at 20s count_over_time(metric[1s]) - -eval instant at 20s count_over_time(metric[10s]) - {} 1 - - -clear - -load 10s - metric 0 - -# Series with single point goes stale after 5 minutes. -eval instant at 0s metric - {__name__="metric"} 0 - -eval instant at 150s metric - {__name__="metric"} 0 - -eval instant at 300s metric - {__name__="metric"} 0 - -eval instant at 301s metric +eval instant at 40s rate(metric2[5m]) + {} 2