Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ const (
batchDelayMax = 50 * time.Millisecond

// Prefix for GCM metric.
MetricTypePrefix = "prometheus.googleapis.com"
MetricTypePrefix = "custom.googleapis.com" // "prometheus.googleapis.com"
)

// Supported gRPC compression formats.
Expand Down Expand Up @@ -986,6 +986,8 @@ func (b *batch) send(

samplesPerRPCBatch.Observe(float64(len(l)))

fmt.Printf("DEBUG: Sending CreateTimeSeriesRequest.TimeSeries %v\n", l)

// We do not retry any requests due to the risk of producing a backlog
// that cannot be worked down, especially if large amounts of clients try to do so.
err := sendOne(sendCtx, &monitoring_pb.CreateTimeSeriesRequest{
Expand Down
9 changes: 5 additions & 4 deletions google/export/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (e *seriesCacheEntry) setNextRefresh() {
}

func newSeriesCache(
logger log.Logger,
reg prometheus.Registerer,
metricTypePrefix string,
matchers Matchers,
logger log.Logger,
reg prometheus.Registerer,
metricTypePrefix string,
matchers Matchers,
) *seriesCache {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -255,6 +255,7 @@ func (c *seriesCache) get(s record.RefSample, externalLabels labels.Labels, meta
// getResetAdjusted takes a sample for a referenced series and returns
// its reset timestamp and adjusted value.
// If the last return argument is false, the sample should be dropped.
// TODO(bwplotka): Support staleness
func (c *seriesCache) getResetAdjusted(ref storage.SeriesRef, t int64, v float64) (int64, float64, bool) {
c.mtx.Lock()
e, ok := c.entries[ref]
Expand Down
22 changes: 12 additions & 10 deletions google/export/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,17 @@ func discardExemplarIncIfExists(series storage.SeriesRef, exemplars map[storage.
}

type sampleBuilder struct {
allowStalenessMarkers bool

series *seriesCache
dists map[uint64]*distribution
}

func newSampleBuilder(c *seriesCache) *sampleBuilder {
return &sampleBuilder{
series: c,
dists: make(map[uint64]*distribution, 128),
allowStalenessMarkers: true, // TODO(bwplotka): Hack, make it a proper ff.
series: c,
dists: make(map[uint64]*distribution, 128),
}
}

Expand All @@ -94,8 +97,7 @@ func (b *sampleBuilder) next(metadata MetadataFunc, externalLabels labels.Labels
sample := samples[0]
tailSamples := samples[1:]

// Staleness markers are currently not supported by Cloud Monitoring.
if value.IsStaleNaN(sample.V) {
if value.IsStaleNaN(sample.V) && !b.allowStalenessMarkers {
prometheusSamplesDiscarded.WithLabelValues("staleness-marker").Inc()
discardExemplarIncIfExists(storage.SeriesRef(sample.Ref), exemplars, "staleness-marker")
return nil, tailSamples, nil
Expand Down Expand Up @@ -384,12 +386,12 @@ func isHistogramSeries(metric, name string) bool {
// Once all series for a single distribution have been observed, it returns it.
// It returns the reset timestamp along with the distribution and the remaining samples.
func (b *sampleBuilder) buildDistribution(
metric string,
_ labels.Labels,
samples []record.RefSample,
exemplars map[storage.SeriesRef]record.RefExemplar,
externalLabels labels.Labels,
metadata MetadataFunc,
metric string,
_ labels.Labels,
samples []record.RefSample,
exemplars map[storage.SeriesRef]record.RefExemplar,
externalLabels labels.Labels,
metadata MetadataFunc,
) (*distribution_pb.Distribution, int64, []record.RefSample, error) {
// The Prometheus/OpenMetrics exposition format does not require all histogram series for a single distribution
// to be grouped together. But it does require that all series for a histogram metric in general are grouped
Expand Down
24 changes: 22 additions & 2 deletions google/internal/promqle2etest/backend_export_gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"testing"
"time"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
Expand All @@ -60,6 +62,8 @@ import (
type LocalExportGCMBackend struct {
Name string
GCMSA []byte

YOLOStalenessInjection bool
}

func (l LocalExportGCMBackend) Ref() string { return l.Name }
Expand Down Expand Up @@ -128,6 +132,7 @@ func (l LocalExportGCMBackend) StartAndWaitReady(t testing.TB, _ e2e.Environment
"project_id": creds.ProjectID,
"collector": "local-export-gcm",
},
YOLOStalenessInjection: l.YOLOStalenessInjection,
}
}

Expand All @@ -138,7 +143,8 @@ type runningLocalExportWithGCM struct {
e *export.Exporter

// NOTE(bwplotka): Not guarded by mutex, so it has to be synced with Exporter.Export.
labelsByRef map[storage.SeriesRef]labels.Labels
labelsByRef map[storage.SeriesRef]labels.Labels
YOLOStalenessInjection bool
}

func (l *runningLocalExportWithGCM) API() v1.API {
Expand All @@ -152,15 +158,29 @@ func (l *runningLocalExportWithGCM) CollectionLabels() map[string]string {
func (l *runningLocalExportWithGCM) IngestSamples(ctx context.Context, t testing.TB, recorded [][]*dto.MetricFamily) {
t.Helper()

metadata := map[string]export.MetricMetadata{}

st := labels.NewSymbolTable()
for _, mfs := range recorded {
for i, mfs := range recorded {
if ctx.Err() != nil {
return // cancel
}
if len(mfs) == 0 {
continue
}

if l.YOLOStalenessInjection && i == len(recorded)-1 {
// YOLO, hack on last "scrape".
l.e.Export(func(metric string) (export.MetricMetadata, bool) {
m, ok := metadata[metric]
return m, ok
}, []record.RefSample{{
// Yolo.
Ref: chunks.HeadSeriesRef(0), V: math.Float64frombits(value.StaleNaN), T: mfs[0].Metric[0].GetTimestampMs(),
}}, nil)
continue
}

// Encode gathered metric family as proto Prometheus exposition format, decode as internal
// Prometheus textparse format to have metrics how Prometheus would have
// before append. We don't use dto straight away due to quite complex code
Expand Down
77 changes: 77 additions & 0 deletions google/internal/promqle2etest/gcm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,80 @@ func TestExportGCM_PrometheusGauge(t *testing.T) {
t.Cleanup(cancel)
pt.Run(ctx)
}

// TestExportGCM_Staleness tests a gauge staleness behaviour.
func TestExportGCM_Staleness(t *testing.T) {
const interval = 15 * time.Second

_, _, localExportGCM := setupBackends(t)
localExportGCM.YOLOStalenessInjection = true // This causes last recorded sample to be NaN.

pt := promqle2e.NewScrapeStyleTest(t)
pt.SetCurrentTime(time.Now().Add(-10 * time.Minute)) // We only do a few scrapes, so -10m buffer is enough.

//nolint:promlinter // Test metric.
gauge := promauto.With(pt.Registerer()).NewGaugeVec(prometheus.GaugeOpts{
Name: "promqle2e_test_gauge_staleness_total",
Help: "Test gauge used by promqle2e test framework for acceptance tests.",
ConstLabels: map[string]string{"repo": "github.com/GoogleCloudPlatform/prometheus"},
}, []string{"foo"})
var g prometheus.Gauge

// No metric expected, gaugeVec empty.
pt.RecordScrape(interval)

g = gauge.WithLabelValues("bar")
g.Set(200)
pt.RecordScrape(interval).
Expect(g, 200, localExportGCM)

// Staleness will be injected here (last scrape).
pt.RecordScrape(interval)
// PromQL, even raw instant query for range vector, does not return NaNs.

ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
t.Cleanup(cancel)
pt.Run(ctx)
}

// TestExportGCM_Staleness tests a gauge staleness behaviour.
func TestExportGCM_StalenessCounter(t *testing.T) {
const interval = 15 * time.Second

_, _, localExportGCM := setupBackends(t)
localExportGCM.YOLOStalenessInjection = true // This causes last recorded sample to be NaN.

pt := promqle2e.NewScrapeStyleTest(t)
pt.SetCurrentTime(time.Now().Add(-10 * time.Minute)) // We only do a few scrapes, so -10m buffer is enough.

//nolint:promlinter // Test metric.
counter := promauto.With(pt.Registerer()).NewCounterVec(prometheus.CounterOpts{
Name: "promqle2e_test_counter_staleness_total",
Help: "Test counter used by promqle2e test framework for acceptance tests.",
ConstLabels: map[string]string{"repo": "github.com/GoogleCloudPlatform/prometheus"},
}, []string{"foo"})
var c prometheus.Counter

// No metric expected, gaugeVec empty.
pt.RecordScrape(interval)

c = counter.WithLabelValues("bar")
c.Add(0)
pt.RecordScrape(interval)
c.Add(200)
pt.RecordScrape(interval).
Expect(c, 200, localExportGCM)
c.Add(200)
pt.RecordScrape(interval).
Expect(c, 400, localExportGCM)
c.Add(200)
pt.RecordScrape(interval).
Expect(c, 600, localExportGCM)
// Staleness will be injected!
pt.RecordScrape(interval)
// PromQL, even raw instant query for range vector, does not return NaNs.

ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
t.Cleanup(cancel)
pt.Run(ctx)
}
2 changes: 1 addition & 1 deletion promql/promqltest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func RunBuiltinTests(t *testing.T, engine promql.QueryEngine) {
t.Cleanup(func() { parser.EnableExperimentalFunctions = false })
parser.EnableExperimentalFunctions = true

files, err := fs.Glob(testsFs, "*/*.test")
files, err := fs.Glob(testsFs, "*/staleness.test")
require.NoError(t, err)

for _, fn := range files {
Expand Down
56 changes: 11 additions & 45 deletions promql/promqltest/testdata/staleness.test
Original file line number Diff line number Diff line change
@@ -1,51 +1,17 @@
load 10s
metric 0 1 stale 2
metric 0 1 stale

# Instant vector doesn't return series when stale.
eval instant at 10s metric
{__name__="metric"} 1
# DEBUGGING
eval instant at 40s rate(metric[5m])
{} 0.005

eval instant at 20s metric
#eval instant at 40s metric[31s]
# 1 @[10000]

eval instant at 30s metric
{__name__="metric"} 2
eval instant at 40s rate(metric[1s])

eval instant at 40s metric
{__name__="metric"} 2
load 10
metric2 200 400 600 stale

# It goes stale 5 minutes after the last sample.
eval instant at 330s metric
{__name__="metric"} 2

eval instant at 331s metric


# Range vector ignores stale sample.
eval instant at 30s count_over_time(metric[1m])
{} 3

eval instant at 10s count_over_time(metric[1s])
{} 1

eval instant at 20s count_over_time(metric[1s])

eval instant at 20s count_over_time(metric[10s])
{} 1


clear

load 10s
metric 0

# Series with single point goes stale after 5 minutes.
eval instant at 0s metric
{__name__="metric"} 0

eval instant at 150s metric
{__name__="metric"} 0

eval instant at 300s metric
{__name__="metric"} 0

eval instant at 301s metric
eval instant at 40s rate(metric2[5m])
{} 2
Loading