From d1370d6275883d1ad4c0e3c43f8bf5c0f3cb3620 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Fri, 9 Sep 2022 18:06:11 +0530 Subject: [PATCH 01/12] PoC: Add recommendation logic for best possible resolution based on the range. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/query_sample.go | 2 + pkg/pgmodel/querier/rollup.go | 115 ++++++++++++++++++++++++++++ pkg/pgmodel/querier/rollup_test.go | 111 +++++++++++++++++++++++++++ 3 files changed, 228 insertions(+) create mode 100644 pkg/pgmodel/querier/rollup.go create mode 100644 pkg/pgmodel/querier/rollup_test.go diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 310cc8dc4a..1633e9d4c1 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -78,6 +78,8 @@ func fetchSingleMetricSamples(tools *queryTools, metadata *evalMetadata) ([]samp return nil, nil, err } + fmt.Println(sqlQuery) + rows, err := tools.conn.Query(context.Background(), sqlQuery, values...) if err != nil { if e, ok := err.(*pgconn.PgError); ok { diff --git a/pkg/pgmodel/querier/rollup.go b/pkg/pgmodel/querier/rollup.go new file mode 100644 index 0000000000..5fd53c9464 --- /dev/null +++ b/pkg/pgmodel/querier/rollup.go @@ -0,0 +1,115 @@ +package querier + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/timescale/promscale/pkg/pgxconn" +) + +const ( + defaultDurationBetweenSamples = 15 * time.Second + low = 200 + high = 2000 + noRollupSchema = "" +) + +type rollupDecider struct { + conn pgxconn.PgxConn + cache map[string]time.Duration // schema_name: resolution + resolutionInASCOrder []time.Duration +} + +func (r *rollupDecider) refresh() error { + refreshInterval := time.NewTicker(time.Minute) + defer refreshInterval.Stop() + for { + var ( + schemaName []string + resolution []time.Duration + ) + err := r.conn.QueryRow(context.Background(), "SELECT array_agg(schema_name), array_agg(resolution) FROM _prom_catalog.rollup").Scan(&schemaName, &resolution) + if err != nil { + return fmt.Errorf("error fetching rollup details: %w", err) + } + cache := make(map[string]time.Duration) + for i := 0; i < len(schemaName); i++ { + cache[schemaName[i]] = resolution[i] + } + sort.Sort(sortDuration(resolution)) // From highest resolution (say 5m) to lowest resolution (say 1h). + r.resolutionInASCOrder = resolution + <-refreshInterval.C + } +} + +func (r *rollupDecider) decide(minSeconds, maxSeconds int64) (rollupSchemaName string) { + estimateSamples := func(resolution time.Duration) int64 { + return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) + } + estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) + if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low { + return noRollupSchema + } + acceptableResolution := []time.Duration{} + for _, resolution := range r.cache { + estimate := estimateSamples(resolution) + if r.withinRange(estimate) { + acceptableResolution = append(acceptableResolution, resolution) + } + } + switch len(acceptableResolution) { + case 0: + // Find the highest resolution that is below upper limit and respond. + for _, res := range r.resolutionInASCOrder { + estimate := estimateSamples(res) + if estimate < high { + return r.getSchemaFor(res) + } + } + + lowestResolution := r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1] + // This is the best case in terms of size. + // Example: If 1 hour is the lowest resolution, then all other resolutions will be way above 1 hour. + // Hence, the best answer is 1 hour. + return r.getSchemaFor(lowestResolution) + case 1: + // Debug stuff: easy to understand. + return r.getSchemaFor(acceptableResolution[0]) + default: + // Multiple resolutions fit here. Hence, choose the highest resolution for maximum granularity. + return r.getSchemaFor(acceptableResolution[0]) + } +} + +func (r *rollupDecider) withinRange(totalSamples int64) bool { + return low <= totalSamples && totalSamples <= high +} + +func (r *rollupDecider) getSchemaFor(resolution time.Duration) string { + for schema, res := range r.cache { + if res == resolution { + return schema + } + } + panic(fmt.Sprint( + "No schema found for resolution", + resolution, + "Please open an issue at https://github.com/timescale/promscale/issues", + )) // This will never be the case. +} + +type sortDuration []time.Duration + +func (s sortDuration) Len() int { + return len(s) +} + +func (s sortDuration) Less(i, j int) bool { + return s[i].Seconds() < s[j].Seconds() +} + +func (s sortDuration) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/pkg/pgmodel/querier/rollup_test.go b/pkg/pgmodel/querier/rollup_test.go new file mode 100644 index 0000000000..404101ca4c --- /dev/null +++ b/pkg/pgmodel/querier/rollup_test.go @@ -0,0 +1,111 @@ +package querier + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/timescale/promscale/pkg/pgxconn" +) + +func TestDecideRollup(t *testing.T) { + r := &rollupDecider{ + conn: mockPgxConn{}, + cache: map[string]time.Duration{ + "hour": time.Hour, + "5_minute": 5 * time.Minute, + "15_minute": 15 * time.Minute, + "week": 7 * 24 * time.Hour, + }, + resolutionInASCOrder: []time.Duration{5 * time.Minute, 15 * time.Minute, time.Hour, 7 * 24 * time.Hour}, + } + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: noRollupSchema, // raw resolution. + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: noRollupSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: noRollupSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: noRollupSchema, + }, { + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: "5_minute", + }, { + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "15_minute", + }, { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "hour", + }, { + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "week", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "week", + }, + } + for _, tc := range tcs { + recommendedSchema := r.decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, recommendedSchema, tc.name) + } +} + +type mockRow struct{} + +func (mockRow) Scan(dest ...interface{}) error { return nil } + +type mockPgxConn struct{} + +func (mockPgxConn) Close() {} +func (mockPgxConn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { + return pgconn.CommandTag{}, nil +} +func (mockPgxConn) Query(ctx context.Context, sql string, args ...interface{}) (pgxconn.PgxRows, error) { + return nil, nil +} +func (mockPgxConn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { + return mockRow{} +} +func (mockPgxConn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { + return 0, nil +} +func (mockPgxConn) CopyFromRows(rows [][]interface{}) pgx.CopyFromSource { return nil } +func (mockPgxConn) NewBatch() pgxconn.PgxBatch { return nil } +func (mockPgxConn) SendBatch(ctx context.Context, b pgxconn.PgxBatch) (pgx.BatchResults, error) { + return nil, nil +} +func (mockPgxConn) Acquire(ctx context.Context) (*pgxpool.Conn, error) { return nil, nil } +func (mockPgxConn) BeginTx(ctx context.Context) (pgx.Tx, error) { return nil, nil } From ef0e16d81b0ffea1c99e7560442bfd8f34a6486f Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Wed, 14 Sep 2022 12:39:01 +0530 Subject: [PATCH 02/12] PoC: [WIP] Update querier to support rollup querying. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/querier.go | 6 +- pkg/pgmodel/querier/query_builder_samples.go | 6 +- pkg/pgmodel/querier/query_sample.go | 46 ++++++- pkg/pgmodel/querier/rollup.go | 126 +++++++++++++++---- 4 files changed, 156 insertions(+), 28 deletions(-) diff --git a/pkg/pgmodel/querier/querier.go b/pkg/pgmodel/querier/querier.go index c49419d60a..da3bd1890b 100644 --- a/pkg/pgmodel/querier/querier.go +++ b/pkg/pgmodel/querier/querier.go @@ -16,7 +16,8 @@ import ( ) type pgxQuerier struct { - tools *queryTools + tools *queryTools + samplesQuerier *querySamples } var _ Querier = (*pgxQuerier)(nil) @@ -39,6 +40,7 @@ func NewQuerier( rAuth: rAuth, }, } + querier.samplesQuerier = newQuerySamples(querier) return querier } @@ -47,7 +49,7 @@ func (q *pgxQuerier) RemoteReadQuerier() RemoteReadQuerier { } func (q *pgxQuerier) SamplesQuerier() SamplesQuerier { - return newQuerySamples(q) + return q.samplesQuerier } func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier { diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index 8250cd78d6..412e8aef5b 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package querier import ( @@ -222,7 +226,7 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ strings.Join(selectorClauses, ", "), strings.Join(selectors, ", "), orderByClause, - pgx.Identifier{filter.column}.Sanitize(), + filter.column, ) return finalSQL, values, node, qf.tsSeries, nil diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 1633e9d4c1..7171d20069 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package querier import ( @@ -9,16 +13,26 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" + "github.com/timescale/promscale/pkg/pgmodel/model" ) type querySamples struct { *pgxQuerier + rollup *rollupDecider } func newQuerySamples(qr *pgxQuerier) *querySamples { - return &querySamples{qr} + rollup := &rollupDecider{ + conn: qr.tools.conn, + } + go rollup.refresh() + return &querySamples{ + pgxQuerier: qr, + rollup: rollup, + } } // Select implements the SamplesQuerier interface. It is the entry point for our @@ -38,10 +52,33 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH return nil, nil, fmt.Errorf("get evaluation metadata: %w", err) } - filter := metadata.timeFilter + filter := &metadata.timeFilter + + rollupSchemaName := q.rollup.decide(mint/1000, maxt/1000) + fmt.Println("schema name", rollupSchemaName) + if rollupSchemaName != "" { + // Use metric rollups. + column, err := q.rollup.getValueColumnString(filter.metric) + if err != nil { + log.Error("msg", "cannot use metric rollups for querying. Reason: error getting column value", "error", err.Error()) + } + if filter.schema == model.SchemaNameLabelName { + // The query belongs to custom Caggs. We need to warn the user that this query will be treated as + // general automatic downsampled query. That's the most we can do. + // If the user wants Caggs query, then he should not enable automatic rollups for querying in CLI flags. + log.Warn("msg", "conflicting schema found. Note: __schema__ will be overwritten") + } + filter.column = column + filter.schema = rollupSchemaName + } + if metadata.isSingleMetric { // Single vector selector case. - mInfo, err := q.tools.getMetricTableName(filter.schema, filter.metric, false) + s := filter.schema + if rollupSchemaName != "" { + s = "" + } + mInfo, err := q.tools.getMetricTableName(s, filter.metric, false) if err != nil { if err == errors.ErrMissingTableName { return nil, nil, nil @@ -51,6 +88,9 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH metadata.timeFilter.metric = mInfo.TableName metadata.timeFilter.schema = mInfo.TableSchema metadata.timeFilter.seriesTable = mInfo.SeriesTable + if rollupSchemaName != "" { + metadata.timeFilter.schema = rollupSchemaName + } sampleRows, topNode, err := fetchSingleMetricSamples(q.tools, metadata) if err != nil { diff --git a/pkg/pgmodel/querier/rollup.go b/pkg/pgmodel/querier/rollup.go index 5fd53c9464..6981553dcc 100644 --- a/pkg/pgmodel/querier/rollup.go +++ b/pkg/pgmodel/querier/rollup.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package querier import ( @@ -6,6 +10,7 @@ import ( "sort" "time" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgxconn" ) @@ -14,46 +19,95 @@ const ( low = 200 high = 2000 noRollupSchema = "" + noColumn = "" +) + +var ( + errNoMetricMetadata = fmt.Errorf("metric metadata not found") + errNoMetricColumnRelationship = fmt.Errorf("metric column relation does not exist. Possible invalid metric type") ) type rollupDecider struct { - conn pgxconn.PgxConn - cache map[string]time.Duration // schema_name: resolution - resolutionInASCOrder []time.Duration + conn pgxconn.PgxConn + schemaResolutionCache map[string]time.Duration // schema_name: resolution + metricMetadataCache map[string]string // metric_name: metric_type + resolutionInASCOrder []time.Duration } -func (r *rollupDecider) refresh() error { +func (r *rollupDecider) refresh() { refreshInterval := time.NewTicker(time.Minute) defer refreshInterval.Stop() for { + if r.refreshRollupResolution() { + r.refreshMetricMetadata() + } + <-refreshInterval.C + } +} + +func (r *rollupDecider) refreshRollupResolution() (proceedToNextStep bool) { + var ( + schemaName []string + resolution []time.Duration + ) + rows, err := r.conn.Query(context.Background(), "SELECT schema_name, resolution FROM _prom_catalog.rollup") + if err != nil { + log.Error("msg", "fetching rollup details", "error", err.Error()) + return false + } + defer rows.Close() + for rows.Next() { var ( - schemaName []string - resolution []time.Duration + sname string + dur time.Duration ) - err := r.conn.QueryRow(context.Background(), "SELECT array_agg(schema_name), array_agg(resolution) FROM _prom_catalog.rollup").Scan(&schemaName, &resolution) + err = rows.Scan(&sname, &dur) if err != nil { - return fmt.Errorf("error fetching rollup details: %w", err) - } - cache := make(map[string]time.Duration) - for i := 0; i < len(schemaName); i++ { - cache[schemaName[i]] = resolution[i] + log.Error("msg", "error scanning rows", "error", err.Error()) + return false } - sort.Sort(sortDuration(resolution)) // From highest resolution (say 5m) to lowest resolution (say 1h). - r.resolutionInASCOrder = resolution - <-refreshInterval.C + schemaName = append(schemaName, sname) + resolution = append(resolution, dur) + } + fmt.Println(resolution) + if len(resolution) == 0 { + // Optimisation: No need to proceed further. + return false + } + resolutionCache := make(map[string]time.Duration) + for i := 0; i < len(schemaName); i++ { + resolutionCache[schemaName[i]] = resolution[i] } + sort.Sort(sortDuration(resolution)) // From highest resolution (say 5m) to lowest resolution (say 1h). + r.resolutionInASCOrder = resolution + r.schemaResolutionCache = resolutionCache + return true +} + +func (r *rollupDecider) refreshMetricMetadata() { + var metricName, metricType []string + err := r.conn.QueryRow(context.Background(), "select array_agg(metric_family), array_agg(type) from _prom_catalog.metadata").Scan(&metricName, &metricType) + if err != nil { + log.Error("msg", "fetching metric metadata", "error", err.Error()) + return + } + metadataCache := make(map[string]string) + for i := range metricName { + metadataCache[metricName[i]] = metricType[i] + } + r.metricMetadataCache = metadataCache } func (r *rollupDecider) decide(minSeconds, maxSeconds int64) (rollupSchemaName string) { estimateSamples := func(resolution time.Duration) int64 { return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) } - estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) - if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low { - return noRollupSchema - } - acceptableResolution := []time.Duration{} - for _, resolution := range r.cache { + //estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) + //if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { + // return noRollupSchema + //} + var acceptableResolution []time.Duration + for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) if r.withinRange(estimate) { acceptableResolution = append(acceptableResolution, resolution) @@ -88,7 +142,7 @@ func (r *rollupDecider) withinRange(totalSamples int64) bool { } func (r *rollupDecider) getSchemaFor(resolution time.Duration) string { - for schema, res := range r.cache { + for schema, res := range r.schemaResolutionCache { if res == resolution { return schema } @@ -100,6 +154,34 @@ func (r *rollupDecider) getSchemaFor(resolution time.Duration) string { )) // This will never be the case. } +// metricTypeColumnRelationship is a relationship between the metric type received for query and a computation of +// set of columns that together compute to `value` of sample. This is because in metric rollups, we do not have a `value` +// column, rather they are stored as separate utility columns like {COUNTER -> [first, last], GAUGE -> [sum, count, min, max]}. +// +// For knowing the reasoning behind this, please read the design doc on metric rollups. +var metricTypeColumnRelationship = map[string]string{ + "GAUGE": "(sum / count)", +} + +func (r *rollupDecider) getValueColumnString(metricName string) (string, error) { + metricType, ok := r.metricMetadataCache[metricName] + if !ok { + log.Debug("msg", fmt.Sprintf("metric metadata not found for %s. Refreshing and trying again", metricName)) + r.refreshMetricMetadata() + metricType, ok = r.metricMetadataCache[metricName] + if ok { + goto checkMetricColumnRelationship + } + return noColumn, errNoMetricMetadata + } +checkMetricColumnRelationship: + columnString, ok := metricTypeColumnRelationship[metricType] + if !ok { + return noColumn, errNoMetricColumnRelationship + } + return columnString, nil +} + type sortDuration []time.Duration func (s sortDuration) Len() int { From 22faef7af8c82ef6699e821feefe5ab46ce18a0d Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Thu, 15 Sep 2022 18:45:25 +0530 Subject: [PATCH 03/12] PoC: refactor rollup implementation and fix sending `__column__` in response series. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/metadata.go | 1 + pkg/pgmodel/querier/query_builder.go | 13 ++++++------- pkg/pgmodel/querier/query_builder_samples.go | 11 +++++++++-- pkg/pgmodel/querier/query_sample.go | 20 +++++++------------- pkg/pgmodel/querier/rollup.go | 13 +++++++++---- pkg/pgmodel/querier/series_set.go | 1 + 6 files changed, 33 insertions(+), 26 deletions(-) diff --git a/pkg/pgmodel/querier/metadata.go b/pkg/pgmodel/querier/metadata.go index e98b1daa3a..c8c2fc0429 100644 --- a/pkg/pgmodel/querier/metadata.go +++ b/pkg/pgmodel/querier/metadata.go @@ -39,6 +39,7 @@ type evalMetadata struct { timeFilter timeFilter clauses []string values []interface{} + *rollupConfig *promqlMetadata } diff --git a/pkg/pgmodel/querier/query_builder.go b/pkg/pgmodel/querier/query_builder.go index cedf1f9376..961935c876 100644 --- a/pkg/pgmodel/querier/query_builder.go +++ b/pkg/pgmodel/querier/query_builder.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" - "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/extension" "github.com/timescale/promscale/pkg/pgmodel/lreader" @@ -249,12 +248,12 @@ type aggregators struct { // a single metric. It may apply pushdowns to functions. func getAggregators(metadata *promqlMetadata) (*aggregators, parser.Node) { - agg, node, err := tryPushDown(metadata) - if err != nil { - log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) - } else if agg != nil { - return agg, node - } + //agg, node, err := tryPushDown(metadata) + //if err != nil { + // log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) + //} else if agg != nil { + // return agg, node + //} defaultAggregators := &aggregators{ timeClause: "array_agg(time)", diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index 412e8aef5b..880e5537ac 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -217,8 +217,15 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ start, end = metadata.timeFilter.start, metadata.timeFilter.end } + samplesSchema, column := filter.schema, filter.column + if metadata.rollupConfig != nil { + // Use rollups. + samplesSchema = metadata.rollupConfig.schemaName + column = metadata.rollupConfig.columnClause + } + finalSQL := fmt.Sprintf(template, - pgx.Identifier{filter.schema, filter.metric}.Sanitize(), + pgx.Identifier{samplesSchema, filter.metric}.Sanitize(), pgx.Identifier{schema.PromDataSeries, filter.seriesTable}.Sanitize(), strings.Join(cases, " AND "), start, @@ -226,7 +233,7 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ strings.Join(selectorClauses, ", "), strings.Join(selectors, ", "), orderByClause, - filter.column, + column, ) return finalSQL, values, node, qf.tsSeries, nil diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 7171d20069..09b2cc839b 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -56,9 +56,9 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH rollupSchemaName := q.rollup.decide(mint/1000, maxt/1000) fmt.Println("schema name", rollupSchemaName) - if rollupSchemaName != "" { + if rollupSchemaName != noRollupSchema { // Use metric rollups. - column, err := q.rollup.getValueColumnString(filter.metric) + rollupConfig, err := q.rollup.getConfig(filter.metric, rollupSchemaName) if err != nil { log.Error("msg", "cannot use metric rollups for querying. Reason: error getting column value", "error", err.Error()) } @@ -66,19 +66,16 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH // The query belongs to custom Caggs. We need to warn the user that this query will be treated as // general automatic downsampled query. That's the most we can do. // If the user wants Caggs query, then he should not enable automatic rollups for querying in CLI flags. - log.Warn("msg", "conflicting schema found. Note: __schema__ will be overwritten") + log.Warn("msg", "conflicting schema found. Note: __schema__ & __column__ will be overwritten") + filter.schema = "" + filter.column = "" } - filter.column = column - filter.schema = rollupSchemaName + metadata.rollupConfig = rollupConfig } if metadata.isSingleMetric { // Single vector selector case. - s := filter.schema - if rollupSchemaName != "" { - s = "" - } - mInfo, err := q.tools.getMetricTableName(s, filter.metric, false) + mInfo, err := q.tools.getMetricTableName(filter.schema, filter.metric, false) if err != nil { if err == errors.ErrMissingTableName { return nil, nil, nil @@ -88,9 +85,6 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH metadata.timeFilter.metric = mInfo.TableName metadata.timeFilter.schema = mInfo.TableSchema metadata.timeFilter.seriesTable = mInfo.SeriesTable - if rollupSchemaName != "" { - metadata.timeFilter.schema = rollupSchemaName - } sampleRows, topNode, err := fetchSingleMetricSamples(q.tools, metadata) if err != nil { diff --git a/pkg/pgmodel/querier/rollup.go b/pkg/pgmodel/querier/rollup.go index 6981553dcc..1cd45f5978 100644 --- a/pkg/pgmodel/querier/rollup.go +++ b/pkg/pgmodel/querier/rollup.go @@ -163,7 +163,12 @@ var metricTypeColumnRelationship = map[string]string{ "GAUGE": "(sum / count)", } -func (r *rollupDecider) getValueColumnString(metricName string) (string, error) { +type rollupConfig struct { + columnClause string + schemaName string +} + +func (r *rollupDecider) getConfig(metricName, schemaName string) (*rollupConfig, error) { metricType, ok := r.metricMetadataCache[metricName] if !ok { log.Debug("msg", fmt.Sprintf("metric metadata not found for %s. Refreshing and trying again", metricName)) @@ -172,14 +177,14 @@ func (r *rollupDecider) getValueColumnString(metricName string) (string, error) if ok { goto checkMetricColumnRelationship } - return noColumn, errNoMetricMetadata + return nil, errNoMetricMetadata } checkMetricColumnRelationship: columnString, ok := metricTypeColumnRelationship[metricType] if !ok { - return noColumn, errNoMetricColumnRelationship + return nil, errNoMetricColumnRelationship } - return columnString, nil + return &rollupConfig{columnString, schemaName}, nil } type sortDuration []time.Duration diff --git a/pkg/pgmodel/querier/series_set.go b/pkg/pgmodel/querier/series_set.go index f75d76b04a..f087203043 100644 --- a/pkg/pgmodel/querier/series_set.go +++ b/pkg/pgmodel/querier/series_set.go @@ -175,6 +175,7 @@ type pgxSeriesIterator struct { // newIterator returns an iterator over the samples. It expects times and values to be the same length. func newIterator(times TimestampSeries, values *pgtype.Float8Array) *pgxSeriesIterator { + fmt.Println("totalSamples", times.Len()) return &pgxSeriesIterator{ cur: -1, totalSamples: times.Len(), From 6445e6501e6dfb8f7834f525e6f898650e76673e Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Thu, 15 Sep 2022 20:29:09 +0530 Subject: [PATCH 04/12] PoC: Update PromQL engine to avoid sending unecessary samples due to in-between steps (between rollup duration). Signed-off-by: Harkishen-Singh --- pkg/api/series.go | 2 +- pkg/pgmodel/querier/interface.go | 2 +- pkg/pgmodel/querier/query_remote_read.go | 4 +- pkg/pgmodel/querier/query_sample.go | 24 ++++----- pkg/pgmodel/querier/rollup.go | 2 + pkg/promql/engine.go | 63 +++++++++++++++--------- pkg/promql/test.go | 4 +- pkg/query/queryable.go | 6 +-- pkg/rules/adapters/query.go | 4 +- pkg/thanos/store.go | 3 +- 10 files changed, 69 insertions(+), 45 deletions(-) diff --git a/pkg/api/series.go b/pkg/api/series.go index 1f1195e233..8ad91f9fba 100644 --- a/pkg/api/series.go +++ b/pkg/api/series.go @@ -78,7 +78,7 @@ func series(queryable promql.Queryable) http.HandlerFunc { var sets []storage.SeriesSet var warnings storage.Warnings for _, mset := range matcherSets { - s, _ := q.Select(false, nil, nil, nil, mset...) + s, _, _ := q.Select(false, nil, nil, nil, mset...) warnings = append(warnings, s.Warnings()...) if s.Err() != nil { respondError(w, http.StatusUnprocessableEntity, s.Err(), "execution") diff --git a/pkg/pgmodel/querier/interface.go b/pkg/pgmodel/querier/interface.go index 8f67578e8f..3cc70a3dd1 100644 --- a/pkg/pgmodel/querier/interface.go +++ b/pkg/pgmodel/querier/interface.go @@ -44,7 +44,7 @@ type RemoteReadQuerier interface { // matching samples. type SamplesQuerier interface { // Select returns a series set containing the exemplar that matches the supplied query parameters. - Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node) + Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, bool) } // ExemplarQuerier queries data using the provided query data and returns the diff --git a/pkg/pgmodel/querier/query_remote_read.go b/pkg/pgmodel/querier/query_remote_read.go index 67611fe4f3..244f71cf11 100644 --- a/pkg/pgmodel/querier/query_remote_read.go +++ b/pkg/pgmodel/querier/query_remote_read.go @@ -27,7 +27,9 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro } qrySamples := newQuerySamples(q.pgxQuerier) - sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) + // TODO after PoC (harkishen): Do not use rollup in case of remote-read. This is because remote-read results are merged with local + // Prometheus data and using rollups here will cause evaluation problems. + sampleRows, _, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) if err != nil { return nil, err } diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 09b2cc839b..416640794c 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -37,19 +37,19 @@ func newQuerySamples(qr *pgxQuerier) *querySamples { // Select implements the SamplesQuerier interface. It is the entry point for our // own version of the Prometheus engine. -func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node) { - sampleRows, topNode, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) +func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node, usedRollup bool) { + sampleRows, topNode, usedRollup, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) if err != nil { - return errorSeriesSet{err: err}, nil + return errorSeriesSet{err: err}, nil, false } responseSeriesSet := buildSeriesSet(sampleRows, q.tools.labelsReader) - return responseSeriesSet, topNode + return responseSeriesSet, topNode, usedRollup } -func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) ([]sampleRow, parser.Node, error) { +func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) (rows []sampleRow, topNode parser.Node, usedRollup bool, err error) { metadata, err := getEvaluationMetadata(q.tools, mint, maxt, GetPromQLMetadata(ms, hints, qh, path)) if err != nil { - return nil, nil, fmt.Errorf("get evaluation metadata: %w", err) + return nil, nil, false, fmt.Errorf("get evaluation metadata: %w", err) } filter := &metadata.timeFilter @@ -78,9 +78,9 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH mInfo, err := q.tools.getMetricTableName(filter.schema, filter.metric, false) if err != nil { if err == errors.ErrMissingTableName { - return nil, nil, nil + return nil, nil, false, nil } - return nil, nil, fmt.Errorf("get metric table name: %w", err) + return nil, nil, false, fmt.Errorf("get metric table name: %w", err) } metadata.timeFilter.metric = mInfo.TableName metadata.timeFilter.schema = mInfo.TableSchema @@ -88,17 +88,17 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH sampleRows, topNode, err := fetchSingleMetricSamples(q.tools, metadata) if err != nil { - return nil, nil, err + return nil, nil, false, err } - return sampleRows, topNode, nil + return sampleRows, topNode, metadata.rollupConfig != nil, nil } // Multiple vector selector case. sampleRows, err := fetchMultipleMetricsSamples(q.tools, metadata) if err != nil { - return nil, nil, err + return nil, nil, false, err } - return sampleRows, nil, nil + return sampleRows, nil, metadata.rollupConfig != nil, nil } // fetchSingleMetricSamples returns all the result rows for a single metric diff --git a/pkg/pgmodel/querier/rollup.go b/pkg/pgmodel/querier/rollup.go index 1cd45f5978..129f3a65e9 100644 --- a/pkg/pgmodel/querier/rollup.go +++ b/pkg/pgmodel/querier/rollup.go @@ -106,6 +106,8 @@ func (r *rollupDecider) decide(minSeconds, maxSeconds int64) (rollupSchemaName s //if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { // return noRollupSchema //} + // -- to return always the lowest resolution. + //return r.getSchemaFor(r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 85370ab36c..bb1e97e9e9 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -73,7 +73,7 @@ type SamplesQuerier interface { // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. - Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (storage.SeriesSet, parser.Node) + Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, rollupUsed bool) } const ( @@ -648,7 +648,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } defer querier.Close() - topNodes := ng.populateSeries(querier, s) + topNodes, usedRollup := ng.populateSeries(querier, s) prepareSpanTimer.Finish() // Modify the offset of vector and matrix selectors for the @ modifier @@ -666,9 +666,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: ng.lookbackDelta, - topNodes: topNodes, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, + topNodes: topNodes, + usedRollup: usedRollup, } query.sampleStats.InitStepTracking(start, start, 1) @@ -722,6 +723,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, topNodes: topNodes, + usedRollup: usedRollup, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) @@ -858,14 +860,12 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS // evaluation. These terminal nodes are called "top nodes". // // populateSeries returns a map keyed by all top nodes. -func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) map[parser.Node]struct{} { - var ( - // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. - // The evaluation of the VectorSelector inside then evaluates the given range and unsets - // the variable. - evalRange time.Duration - topNodes map[parser.Node]struct{} = make(map[parser.Node]struct{}) - ) +func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) (topNodes map[parser.Node]struct{}, rollupUsed bool) { + // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. + // The evaluation of the VectorSelector inside then evaluates the given range and unsets + // the variable. + var evalRange time.Duration + topNodes = make(map[parser.Node]struct{}) parser.Inspect(evalStmt.Expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { @@ -887,8 +887,11 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) - set, topNode := querier.Select(false, hints, qh, path, n.LabelMatchers...) + set, topNode, fromRollup := querier.Select(false, hints, qh, path, n.LabelMatchers...) topNodes[topNode] = struct{}{} + if fromRollup { + rollupUsed = true + } n.UnexpandedSeriesSet = set case *parser.MatrixSelector: @@ -896,7 +899,7 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt } return nil }) - return topNodes + return topNodes, rollupUsed } // extractFuncFromPath walks up the path and searches for the first instance of @@ -979,6 +982,7 @@ type evaluator struct { logger log.Logger lookbackDelta time.Duration topNodes map[parser.Node]struct{} + usedRollup bool samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 } @@ -1674,16 +1678,29 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { Points: getPointSlice(numSteps), } - for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { - step++ - _, v, ok := ev.vectorSelectorSingle(it, e, ts) - if ok { - if ev.currentSamples < ev.maxSamples { - ss.Points = append(ss.Points, Point{V: v, T: ts}) - ev.samplesStats.IncrementSamplesAtStep(step, 1) - ev.currentSamples++ - } else { - ev.error(ErrTooManySamples(env)) + fmt.Println("usedRollup", ev.usedRollup) + if ev.usedRollup { + // Rollup evaluated the step interval. Hence, just use the step in rollup. + // This is important, as it **avoids unnecessary samples** (with sample `v`) due to `ts += ev.interval` (in the for loop in else block) + // thereby **speeding up evaluation** involving rollups. + itr := e.Series[i].Iterator() + for itr.Next() { + ts, v := itr.At() + ss.Points = append(ss.Points, Point{V: v, T: ts}) + ev.currentSamples++ + } + } else { + for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { + step++ + _, v, ok := ev.vectorSelectorSingle(it, e, ts) + if ok { + if ev.currentSamples < ev.maxSamples { + ss.Points = append(ss.Points, Point{V: v, T: ts}) + ev.samplesStats.IncrementSamplesAtStep(step, 1) + ev.currentSamples++ + } else { + ev.error(ErrTooManySamples(env)) + } } } } diff --git a/pkg/promql/test.go b/pkg/promql/test.go index c758db3999..ec73addf0c 100644 --- a/pkg/promql/test.go +++ b/pkg/promql/test.go @@ -110,9 +110,9 @@ type QuerierWrapper struct { storage.Querier } -func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node) { +func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node, bool) { ss := t.Querier.Select(b, sh, m...) - return ss, nil + return ss, nil, false } func (t *QuerierWrapper) LabelValues(n string) ([]string, storage.Warnings, error) { diff --git a/pkg/query/queryable.go b/pkg/query/queryable.go index dbacdb9dd3..d28980efaa 100644 --- a/pkg/query/queryable.go +++ b/pkg/query/queryable.go @@ -65,9 +65,9 @@ func (q *samplesQuerier) Close() { } } -func (q *samplesQuerier) Select(sortSeries bool, hints *storage.SelectHints, qh *pgQuerier.QueryHints, path []parser.Node, matchers ...*labels.Matcher) (storage.SeriesSet, parser.Node) { +func (q *samplesQuerier) Select(sortSeries bool, hints *storage.SelectHints, qh *pgQuerier.QueryHints, path []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, rollupUsed bool) { qry := q.metricsReader.SamplesQuerier() - ss, n := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) + ss, n, usingRollup := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) q.seriesSets = append(q.seriesSets, ss) - return ss, n + return ss, n, usingRollup } diff --git a/pkg/rules/adapters/query.go b/pkg/rules/adapters/query.go index 59455dea9e..7db0b4bf95 100644 --- a/pkg/rules/adapters/query.go +++ b/pkg/rules/adapters/query.go @@ -33,7 +33,9 @@ type querierAdapter struct { func (q querierAdapter) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { // Pushdowns are not supported here. This is fine as Prometheus rule-manager only uses queryable to know // the previous state of the alert. This function is not used in recording/alerting rules evaluation. - seriesSet, _ := q.qr.Select(sortSeries, hints, nil, nil, matchers...) + // TODO (harkishen) after PoC: Note: Maybe passdown as param that do we want rollups to be used or + // not for this evaluation. + seriesSet, _, _ := q.qr.Select(sortSeries, hints, nil, nil, matchers...) return seriesSet } diff --git a/pkg/thanos/store.go b/pkg/thanos/store.go index 89ce14fc21..0532ed3708 100644 --- a/pkg/thanos/store.go +++ b/pkg/thanos/store.go @@ -41,7 +41,8 @@ func (fc *Storage) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesSe } defer q.Close() - ss, _ := q.Select(false, nil, nil, nil, matchers...) + // TODO (harkishen) after PoC + ss, _, _ := q.Select(false, nil, nil, nil, matchers...) for ss.Next() { series := ss.At() From de4278f7464072ee38708258d86e5708d64a6701 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Fri, 16 Sep 2022 13:36:37 +0530 Subject: [PATCH 05/12] PoC: Use rollup interval in PromQL engine range to avoid unnecessary samples. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/interface.go | 2 +- pkg/pgmodel/querier/metadata.go | 2 +- pkg/pgmodel/querier/querier_sql_test.go | 6 +- pkg/pgmodel/querier/query_builder.go | 20 +++-- pkg/pgmodel/querier/query_builder_samples.go | 8 +- pkg/pgmodel/querier/query_sample.go | 36 ++++---- pkg/pgmodel/querier/rollup.go | 77 ++++++++++-------- pkg/pgmodel/querier/rollup_test.go | 15 +++- pkg/promql/engine.go | 86 ++++++++++++-------- pkg/promql/test.go | 4 +- pkg/query/queryable.go | 6 +- 11 files changed, 151 insertions(+), 111 deletions(-) diff --git a/pkg/pgmodel/querier/interface.go b/pkg/pgmodel/querier/interface.go index 3cc70a3dd1..7da2224c3d 100644 --- a/pkg/pgmodel/querier/interface.go +++ b/pkg/pgmodel/querier/interface.go @@ -44,7 +44,7 @@ type RemoteReadQuerier interface { // matching samples. type SamplesQuerier interface { // Select returns a series set containing the exemplar that matches the supplied query parameters. - Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, bool) + Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, *RollupConfig) } // ExemplarQuerier queries data using the provided query data and returns the diff --git a/pkg/pgmodel/querier/metadata.go b/pkg/pgmodel/querier/metadata.go index c8c2fc0429..5ae4c2cb8b 100644 --- a/pkg/pgmodel/querier/metadata.go +++ b/pkg/pgmodel/querier/metadata.go @@ -39,8 +39,8 @@ type evalMetadata struct { timeFilter timeFilter clauses []string values []interface{} - *rollupConfig *promqlMetadata + *RollupConfig } func GetMetadata(clauses []string, values []interface{}) *evalMetadata { diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index fd1685c4eb..31fcd49bb1 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -721,7 +721,11 @@ func TestPGXQuerierQuery(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}} + querier := pgxQuerier{ + tools: &queryTools{ + conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()), + }, + } result, err := querier.RemoteReadQuerier().Query(c.query) diff --git a/pkg/pgmodel/querier/query_builder.go b/pkg/pgmodel/querier/query_builder.go index 961935c876..ebe6ae8141 100644 --- a/pkg/pgmodel/querier/query_builder.go +++ b/pkg/pgmodel/querier/query_builder.go @@ -17,6 +17,8 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/extension" "github.com/timescale/promscale/pkg/pgmodel/lreader" @@ -246,14 +248,16 @@ type aggregators struct { // getAggregators returns the aggregator which should be used to fetch data for // a single metric. It may apply pushdowns to functions. -func getAggregators(metadata *promqlMetadata) (*aggregators, parser.Node) { - - //agg, node, err := tryPushDown(metadata) - //if err != nil { - // log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) - //} else if agg != nil { - // return agg, node - //} +func getAggregators(metadata *promqlMetadata, usingRollup bool) (*aggregators, parser.Node) { + + if !usingRollup { + agg, node, err := tryPushDown(metadata) + if err != nil { + log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) + } else if agg != nil { + return agg, node + } + } defaultAggregators := &aggregators{ timeClause: "array_agg(time)", diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index 880e5537ac..3841905684 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -131,7 +131,7 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ // When pushdowns are available, the is a pushdown // function which the promscale extension provides. - qf, node := getAggregators(metadata.promqlMetadata) + qf, node := getAggregators(metadata.promqlMetadata, metadata.RollupConfig != nil) var selectors, selectorClauses []string values := metadata.values @@ -218,10 +218,10 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ } samplesSchema, column := filter.schema, filter.column - if metadata.rollupConfig != nil { + if metadata.RollupConfig != nil { // Use rollups. - samplesSchema = metadata.rollupConfig.schemaName - column = metadata.rollupConfig.columnClause + samplesSchema = metadata.RollupConfig.schemaName + column = metadata.RollupConfig.columnClause } finalSQL := fmt.Sprintf(template, diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 416640794c..0d49d97f3f 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -37,31 +37,27 @@ func newQuerySamples(qr *pgxQuerier) *querySamples { // Select implements the SamplesQuerier interface. It is the entry point for our // own version of the Prometheus engine. -func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node, usedRollup bool) { - sampleRows, topNode, usedRollup, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) +func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node, rollupCfg *RollupConfig) { + sampleRows, topNode, rollupCfg, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) if err != nil { - return errorSeriesSet{err: err}, nil, false + return errorSeriesSet{err: err}, nil, nil } responseSeriesSet := buildSeriesSet(sampleRows, q.tools.labelsReader) - return responseSeriesSet, topNode, usedRollup + return responseSeriesSet, topNode, rollupCfg } -func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) (rows []sampleRow, topNode parser.Node, usedRollup bool, err error) { +func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) (rows []sampleRow, topNode parser.Node, cfg *RollupConfig, err error) { metadata, err := getEvaluationMetadata(q.tools, mint, maxt, GetPromQLMetadata(ms, hints, qh, path)) if err != nil { - return nil, nil, false, fmt.Errorf("get evaluation metadata: %w", err) + return nil, nil, nil, fmt.Errorf("get evaluation metadata: %w", err) } filter := &metadata.timeFilter - rollupSchemaName := q.rollup.decide(mint/1000, maxt/1000) - fmt.Println("schema name", rollupSchemaName) - if rollupSchemaName != noRollupSchema { + rollupConfig := q.rollup.decide(mint/1000, maxt/1000, filter.metric) + if rollupConfig != nil { // Use metric rollups. - rollupConfig, err := q.rollup.getConfig(filter.metric, rollupSchemaName) - if err != nil { - log.Error("msg", "cannot use metric rollups for querying. Reason: error getting column value", "error", err.Error()) - } + fmt.Println("schema name", rollupConfig.schemaName) if filter.schema == model.SchemaNameLabelName { // The query belongs to custom Caggs. We need to warn the user that this query will be treated as // general automatic downsampled query. That's the most we can do. @@ -70,7 +66,7 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH filter.schema = "" filter.column = "" } - metadata.rollupConfig = rollupConfig + metadata.RollupConfig = rollupConfig } if metadata.isSingleMetric { @@ -78,9 +74,9 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH mInfo, err := q.tools.getMetricTableName(filter.schema, filter.metric, false) if err != nil { if err == errors.ErrMissingTableName { - return nil, nil, false, nil + return nil, nil, nil, nil } - return nil, nil, false, fmt.Errorf("get metric table name: %w", err) + return nil, nil, nil, fmt.Errorf("get metric table name: %w", err) } metadata.timeFilter.metric = mInfo.TableName metadata.timeFilter.schema = mInfo.TableSchema @@ -88,17 +84,17 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH sampleRows, topNode, err := fetchSingleMetricSamples(q.tools, metadata) if err != nil { - return nil, nil, false, err + return nil, nil, nil, err } - return sampleRows, topNode, metadata.rollupConfig != nil, nil + return sampleRows, topNode, metadata.RollupConfig, nil } // Multiple vector selector case. sampleRows, err := fetchMultipleMetricsSamples(q.tools, metadata) if err != nil { - return nil, nil, false, err + return nil, nil, nil, err } - return sampleRows, nil, metadata.rollupConfig != nil, nil + return sampleRows, nil, metadata.RollupConfig, nil } // fetchSingleMetricSamples returns all the result rows for a single metric diff --git a/pkg/pgmodel/querier/rollup.go b/pkg/pgmodel/querier/rollup.go index 129f3a65e9..067861e331 100644 --- a/pkg/pgmodel/querier/rollup.go +++ b/pkg/pgmodel/querier/rollup.go @@ -98,16 +98,16 @@ func (r *rollupDecider) refreshMetricMetadata() { r.metricMetadataCache = metadataCache } -func (r *rollupDecider) decide(minSeconds, maxSeconds int64) (rollupSchemaName string) { +func (r *rollupDecider) decide(minSeconds, maxSeconds int64, metricName string) *RollupConfig { estimateSamples := func(resolution time.Duration) int64 { return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) } //estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) //if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { - // return noRollupSchema + // return nil //} - // -- to return always the lowest resolution. - //return r.getSchemaFor(r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) + // -- DEBUG: to return always the lowest resolution. + //return r.getConfig(metricName, r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]), true var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) @@ -115,47 +115,33 @@ func (r *rollupDecider) decide(minSeconds, maxSeconds int64) (rollupSchemaName s acceptableResolution = append(acceptableResolution, resolution) } } - switch len(acceptableResolution) { - case 0: - // Find the highest resolution that is below upper limit and respond. + if len(acceptableResolution) == 0 { + // No resolution was found to fit within the permitted range. + // Hence, find the highest resolution that is below upper (max) limit and respond. for _, res := range r.resolutionInASCOrder { estimate := estimateSamples(res) if estimate < high { - return r.getSchemaFor(res) + return r.getConfig(metricName, res) } } - + // None of the resolutions were below the upper limit. Hence, respond with the lowest available resolution. lowestResolution := r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1] // This is the best case in terms of size. // Example: If 1 hour is the lowest resolution, then all other resolutions will be way above 1 hour. // Hence, the best answer is 1 hour. - return r.getSchemaFor(lowestResolution) - case 1: - // Debug stuff: easy to understand. - return r.getSchemaFor(acceptableResolution[0]) - default: - // Multiple resolutions fit here. Hence, choose the highest resolution for maximum granularity. - return r.getSchemaFor(acceptableResolution[0]) + // + // Note: For understanding resolutions, in a case of resolutions [1m, 5m, 15m, 1h, 1w], + // 1m is the highest resolution (since maximum granularity) and 1w is the lowest resolution (due to lowest granularity). + return r.getConfig(metricName, lowestResolution) } + // Choose the highest resolution for maximum granularity. + return r.getConfig(metricName, acceptableResolution[0]) } func (r *rollupDecider) withinRange(totalSamples int64) bool { return low <= totalSamples && totalSamples <= high } -func (r *rollupDecider) getSchemaFor(resolution time.Duration) string { - for schema, res := range r.schemaResolutionCache { - if res == resolution { - return schema - } - } - panic(fmt.Sprint( - "No schema found for resolution", - resolution, - "Please open an issue at https://github.com/timescale/promscale/issues", - )) // This will never be the case. -} - // metricTypeColumnRelationship is a relationship between the metric type received for query and a computation of // set of columns that together compute to `value` of sample. This is because in metric rollups, we do not have a `value` // column, rather they are stored as separate utility columns like {COUNTER -> [first, last], GAUGE -> [sum, count, min, max]}. @@ -165,12 +151,18 @@ var metricTypeColumnRelationship = map[string]string{ "GAUGE": "(sum / count)", } -type rollupConfig struct { +type RollupConfig struct { columnClause string schemaName string + interval time.Duration +} + +func (r *RollupConfig) Interval() time.Duration { + return r.interval } -func (r *rollupDecider) getConfig(metricName, schemaName string) (*rollupConfig, error) { +func (r *rollupDecider) getConfig(metricName string, resolution time.Duration) *RollupConfig { + schemaName := r.getSchemaFor(resolution) metricType, ok := r.metricMetadataCache[metricName] if !ok { log.Debug("msg", fmt.Sprintf("metric metadata not found for %s. Refreshing and trying again", metricName)) @@ -179,14 +171,31 @@ func (r *rollupDecider) getConfig(metricName, schemaName string) (*rollupConfig, if ok { goto checkMetricColumnRelationship } - return nil, errNoMetricMetadata + return nil } checkMetricColumnRelationship: columnString, ok := metricTypeColumnRelationship[metricType] if !ok { - return nil, errNoMetricColumnRelationship + return nil + } + return &RollupConfig{ + columnClause: columnString, + schemaName: schemaName, + interval: r.schemaResolutionCache[schemaName], } - return &rollupConfig{columnString, schemaName}, nil +} + +func (r *rollupDecider) getSchemaFor(resolution time.Duration) string { + for schema, res := range r.schemaResolutionCache { + if res == resolution { + return schema + } + } + panic(fmt.Sprint( + "No schema found for resolution", + resolution, + "Please open an issue at https://github.com/timescale/promscale/issues", + )) // This will never be the case. } type sortDuration []time.Duration diff --git a/pkg/pgmodel/querier/rollup_test.go b/pkg/pgmodel/querier/rollup_test.go index 404101ca4c..3520e204c0 100644 --- a/pkg/pgmodel/querier/rollup_test.go +++ b/pkg/pgmodel/querier/rollup_test.go @@ -2,6 +2,7 @@ package querier import ( "context" + "fmt" "testing" "time" @@ -16,7 +17,7 @@ import ( func TestDecideRollup(t *testing.T) { r := &rollupDecider{ conn: mockPgxConn{}, - cache: map[string]time.Duration{ + schemaResolutionCache: map[string]time.Duration{ "hour": time.Hour, "5_minute": 5 * time.Minute, "15_minute": 15 * time.Minute, @@ -78,8 +79,16 @@ func TestDecideRollup(t *testing.T) { }, } for _, tc := range tcs { - recommendedSchema := r.decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) - require.Equal(t, tc.expectedSchemaName, recommendedSchema, tc.name) + cfg, shouldUseRollup := r.decide(int64(tc.min.Seconds()), int64(tc.max.Seconds()), "") + if tc.expectedSchemaName == noRollupSchema { + require.False(t, shouldUseRollup, tc.name) + require.Nil(t, cfg) + continue + } else { + require.True(t, shouldUseRollup, tc.name) + } + fmt.Println("test name", tc.name) + require.Equal(t, tc.expectedSchemaName, cfg.schemaName, tc.name) } } diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index bb1e97e9e9..4d23429b2a 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -73,7 +73,7 @@ type SamplesQuerier interface { // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. - Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, rollupUsed bool) + Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, cfg *pgquerier.RollupConfig) } const ( @@ -648,7 +648,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } defer querier.Close() - topNodes, usedRollup := ng.populateSeries(querier, s) + topNodes, rollupInterval := ng.populateSeries(querier, s) prepareSpanTimer.Finish() // Modify the offset of vector and matrix selectors for the @ modifier @@ -669,7 +669,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, topNodes: topNodes, - usedRollup: usedRollup, } query.sampleStats.InitStepTracking(start, start, 1) @@ -712,10 +711,31 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } // Range evaluation. + var intervalMs int64 + if rollupInterval > s.Interval { + // Rollups evaluated the step interval. Hence, just use this step in rollup. + // This is important, as it **avoids unnecessary samples** due to .Seek() as the values remain the same + // between the rollup intervals, thereby **speeding up evaluation** involving rollups. + // + // Example: If s.Interval (from received range query) is 5s, and rollup evaluated is of 5m, then if we were + // to pass s.Interval here, then in VectorSelector evaluation, the `ts += ev.interval` will cause .Seek() (in vectorSelectorSingle()) + // to repeat the values of the previous rollup sample for entire duration until next rollup sample is reached. + // + // IMPORTANT: This hinders the very aim of rollups, which is to speed up calculations by parent functions, since now, + // the parent functions are essentially working on the same NUMBER OF SAMPLES (caused by step from received query) + // which without the rollups would be the case. + // + // We do `rollupInterval > s.Interval` so that if the range query's interval is already greater than rollupInterval + // the we do not over respond with higher granularity than needed. + intervalMs = durationMilliseconds(rollupInterval) + } else { + intervalMs = durationMilliseconds(s.Interval) + } + evaluator := &evaluator{ startTimestamp: timeMilliseconds(s.Start), endTimestamp: timeMilliseconds(s.End), - interval: durationMilliseconds(s.Interval), + interval: intervalMs, ctx: ctxInnerEval, maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, @@ -723,7 +743,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, topNodes: topNodes, - usedRollup: usedRollup, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) @@ -860,7 +879,7 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS // evaluation. These terminal nodes are called "top nodes". // // populateSeries returns a map keyed by all top nodes. -func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) (topNodes map[parser.Node]struct{}, rollupUsed bool) { +func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) (topNodes map[parser.Node]struct{}, rollupInterval time.Duration) { // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. @@ -887,10 +906,10 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) - set, topNode, fromRollup := querier.Select(false, hints, qh, path, n.LabelMatchers...) + set, topNode, rollupCfg := querier.Select(false, hints, qh, path, n.LabelMatchers...) topNodes[topNode] = struct{}{} - if fromRollup { - rollupUsed = true + if rollupCfg != nil { + rollupInterval = rollupCfg.Interval() } n.UnexpandedSeriesSet = set @@ -899,7 +918,7 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt } return nil }) - return topNodes, rollupUsed + return topNodes, rollupInterval } // extractFuncFromPath walks up the path and searches for the first instance of @@ -982,7 +1001,6 @@ type evaluator struct { logger log.Logger lookbackDelta time.Duration topNodes map[parser.Node]struct{} - usedRollup bool samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 } @@ -1678,32 +1696,32 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { Points: getPointSlice(numSteps), } - fmt.Println("usedRollup", ev.usedRollup) - if ev.usedRollup { - // Rollup evaluated the step interval. Hence, just use the step in rollup. - // This is important, as it **avoids unnecessary samples** (with sample `v`) due to `ts += ev.interval` (in the for loop in else block) - // thereby **speeding up evaluation** involving rollups. - itr := e.Series[i].Iterator() - for itr.Next() { - ts, v := itr.At() - ss.Points = append(ss.Points, Point{V: v, T: ts}) - ev.currentSamples++ - } - } else { - for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { - step++ - _, v, ok := ev.vectorSelectorSingle(it, e, ts) - if ok { - if ev.currentSamples < ev.maxSamples { - ss.Points = append(ss.Points, Point{V: v, T: ts}) - ev.samplesStats.IncrementSamplesAtStep(step, 1) - ev.currentSamples++ - } else { - ev.error(ErrTooManySamples(env)) - } + //fmt.Println("rollupInterval", ev.rollupInterval) + //if ev.rollupInterval.Milliseconds() > ev.interval { + // // Rollup evaluated the step interval. Hence, just use the step in rollup. + // // This is important, as it **avoids unnecessary samples** (with sample `v`) due to `ts += ev.interval` (in the for loop in else block) + // // thereby **speeding up evaluation** involving rollups. + // itr := e.Series[i].Iterator() + // for itr.Next() { + // ts, v := itr.At() + // ss.Points = append(ss.Points, Point{V: v, T: ts}) + // ev.currentSamples++ + // } + //} else { + for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { + step++ + _, v, ok := ev.vectorSelectorSingle(it, e, ts) + if ok { + if ev.currentSamples < ev.maxSamples { + ss.Points = append(ss.Points, Point{V: v, T: ts}) + ev.samplesStats.IncrementSamplesAtStep(step, 1) + ev.currentSamples++ + } else { + ev.error(ErrTooManySamples(env)) } } } + //} if len(ss.Points) > 0 { mat = append(mat, ss) diff --git a/pkg/promql/test.go b/pkg/promql/test.go index ec73addf0c..192d7167ff 100644 --- a/pkg/promql/test.go +++ b/pkg/promql/test.go @@ -110,9 +110,9 @@ type QuerierWrapper struct { storage.Querier } -func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node, bool) { +func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node, *querier.RollupConfig) { ss := t.Querier.Select(b, sh, m...) - return ss, nil, false + return ss, nil, nil } func (t *QuerierWrapper) LabelValues(n string) ([]string, storage.Warnings, error) { diff --git a/pkg/query/queryable.go b/pkg/query/queryable.go index d28980efaa..492f3771f9 100644 --- a/pkg/query/queryable.go +++ b/pkg/query/queryable.go @@ -65,9 +65,9 @@ func (q *samplesQuerier) Close() { } } -func (q *samplesQuerier) Select(sortSeries bool, hints *storage.SelectHints, qh *pgQuerier.QueryHints, path []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, rollupUsed bool) { +func (q *samplesQuerier) Select(sortSeries bool, hints *storage.SelectHints, qh *pgQuerier.QueryHints, path []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, config *pgQuerier.RollupConfig) { qry := q.metricsReader.SamplesQuerier() - ss, n, usingRollup := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) + ss, n, cfg := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) q.seriesSets = append(q.seriesSets, ss) - return ss, n, usingRollup + return ss, n, cfg } From e0276a01c1351161ed070d5399474d98131b39c8 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Mon, 19 Sep 2022 19:02:36 +0530 Subject: [PATCH 06/12] PoC: Add support to evaluate instant queries having PromQL func using rollups Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/common.go | 5 +- pkg/pgmodel/querier/interface.go | 3 +- pkg/pgmodel/querier/metadata.go | 3 +- pkg/pgmodel/querier/query_builder.go | 6 +- pkg/pgmodel/querier/query_builder_samples.go | 54 ++++++++++++---- pkg/pgmodel/querier/query_sample.go | 29 ++++----- pkg/pgmodel/querier/rollup/config.go | 22 +++++++ .../querier/{rollup.go => rollup/decider.go} | 61 +++++++++---------- .../decider_test.go} | 2 +- pkg/pgmodel/querier/rollup/promql_optimize.go | 53 ++++++++++++++++ pkg/pgmodel/querier/row.go | 14 ++++- pkg/pgmodel/querier/series_set.go | 1 - pkg/promql/engine.go | 16 +++-- pkg/promql/test.go | 3 +- pkg/query/queryable.go | 9 ++- 15 files changed, 205 insertions(+), 76 deletions(-) create mode 100644 pkg/pgmodel/querier/rollup/config.go rename pkg/pgmodel/querier/{rollup.go => rollup/decider.go} (80%) rename pkg/pgmodel/querier/{rollup_test.go => rollup/decider_test.go} (99%) create mode 100644 pkg/pgmodel/querier/rollup/promql_optimize.go diff --git a/pkg/pgmodel/querier/common.go b/pkg/pgmodel/querier/common.go index 04b44b3ef5..c9a9153a38 100644 --- a/pkg/pgmodel/querier/common.go +++ b/pkg/pgmodel/querier/common.go @@ -45,8 +45,9 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro // QueryHints contain additional metadata which promscale requires type QueryHints struct { - CurrentNode parser.Node - Lookback time.Duration + CurrentNode parser.Node + Lookback time.Duration + IsInstantQuery bool } func GetMetricNameSeriesIds(conn pgxconn.PgxConn, metadata *evalMetadata) (metrics, schemas []string, correspondingSeriesIDs [][]model.SeriesID, err error) { diff --git a/pkg/pgmodel/querier/interface.go b/pkg/pgmodel/querier/interface.go index 7da2224c3d..0f4fc8d580 100644 --- a/pkg/pgmodel/querier/interface.go +++ b/pkg/pgmodel/querier/interface.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" "github.com/timescale/promscale/pkg/prompb" ) @@ -44,7 +45,7 @@ type RemoteReadQuerier interface { // matching samples. type SamplesQuerier interface { // Select returns a series set containing the exemplar that matches the supplied query parameters. - Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, *RollupConfig) + Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, *rollup.Config) } // ExemplarQuerier queries data using the provided query data and returns the diff --git a/pkg/pgmodel/querier/metadata.go b/pkg/pgmodel/querier/metadata.go index 5ae4c2cb8b..c08fb0d880 100644 --- a/pkg/pgmodel/querier/metadata.go +++ b/pkg/pgmodel/querier/metadata.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) // promqlMetadata is metadata received directly from our native PromQL engine. @@ -39,8 +40,8 @@ type evalMetadata struct { timeFilter timeFilter clauses []string values []interface{} + rollupConfig *rollup.Config *promqlMetadata - *RollupConfig } func GetMetadata(clauses []string, values []interface{}) *evalMetadata { diff --git a/pkg/pgmodel/querier/query_builder.go b/pkg/pgmodel/querier/query_builder.go index ebe6ae8141..d076c194bc 100644 --- a/pkg/pgmodel/querier/query_builder.go +++ b/pkg/pgmodel/querier/query_builder.go @@ -249,7 +249,6 @@ type aggregators struct { // getAggregators returns the aggregator which should be used to fetch data for // a single metric. It may apply pushdowns to functions. func getAggregators(metadata *promqlMetadata, usingRollup bool) (*aggregators, parser.Node) { - if !usingRollup { agg, node, err := tryPushDown(metadata) if err != nil { @@ -259,6 +258,11 @@ func getAggregators(metadata *promqlMetadata, usingRollup bool) (*aggregators, p } } + //fmt.Println("start", metadata.selectHints.Start) + //fmt.Println("end", metadata.selectHints.End) + //fmt.Println("range", metadata.selectHints.Range) + //fmt.Println("end - start in mins", (metadata.selectHints.End-metadata.selectHints.Start)/(1000*60)) + defaultAggregators := &aggregators{ timeClause: "array_agg(time)", valueClause: "array_agg(value)", diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index 3841905684..fa255c1132 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/timescale/promscale/pkg/pgmodel/common/schema" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) const ( @@ -107,7 +108,7 @@ const ( // buildSingleMetricSamplesQuery builds a SQL query which fetches the data for // one metric. -func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, error) { +func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, bool, error) { // The basic structure of the SQL query which this function produces is: // SELECT // series.labels @@ -131,7 +132,23 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ // When pushdowns are available, the is a pushdown // function which the promscale extension provides. - qf, node := getAggregators(metadata.promqlMetadata, metadata.RollupConfig != nil) + // TODO NEXT: Instant query with max_over_time(), min_over_time() + + qf, node := getAggregators(metadata.promqlMetadata, metadata.rollupConfig != nil) + + var ( + rollupOptimizer *rollup.SqlOptimizer + useInstantQueryOptimizer bool + grandparent parser.Node + ) + if metadata.rollupConfig != nil { + path := metadata.promqlMetadata.path + if len(path) >= 2 { + grandparent = path[len(path)-2] + rollupOptimizer = metadata.rollupConfig.SupportsFunctionalOptimization(grandparent, metadata.metric) + useInstantQueryOptimizer = rollupOptimizer != nil && metadata.queryHints.IsInstantQuery + } + } var selectors, selectorClauses []string values := metadata.values @@ -141,17 +158,30 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ var err error timeClauseBound, values, err = setParameterNumbers(qf.timeClause, values, qf.timeParams...) if err != nil { - return "", nil, nil, nil, err + return "", nil, nil, nil, false, err } selectors = append(selectors, "result.time_array") - selectorClauses = append(selectorClauses, timeClauseBound+" as time_array") + + if useInstantQueryOptimizer { + selectorClauses = append(selectorClauses, " current_timestamp::timestamptz as time_array") + node = grandparent // Send the optimization as top node so that evaluator in PromQL engine does not reevaluate. + } else { + selectorClauses = append(selectorClauses, timeClauseBound+" as time_array") + } } valueClauseBound, values, err := setParameterNumbers(qf.valueClause, values, qf.valueParams...) if err != nil { - return "", nil, nil, nil, err + return "", nil, nil, nil, false, err } selectors = append(selectors, "result.value_array") - selectorClauses = append(selectorClauses, valueClauseBound+" as value_array") + valueWithoutAggregation := false + + if useInstantQueryOptimizer { + selectorClauses = append(selectorClauses, rollupOptimizer.InstantQueryAgg()+" as value_array") + valueWithoutAggregation = true + } else { + selectorClauses = append(selectorClauses, valueClauseBound+" as value_array") + } orderByClause := "ORDER BY time" if qf.unOrdered { @@ -218,10 +248,13 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ } samplesSchema, column := filter.schema, filter.column - if metadata.RollupConfig != nil { + if metadata.rollupConfig != nil { // Use rollups. - samplesSchema = metadata.RollupConfig.schemaName - column = metadata.RollupConfig.columnClause + samplesSchema = metadata.rollupConfig.SchemaName() + column = metadata.rollupConfig.ColumnClause() + if rollupOptimizer != nil { + column = rollupOptimizer.ColumnName() + } } finalSQL := fmt.Sprintf(template, @@ -235,8 +268,7 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ orderByClause, column, ) - - return finalSQL, values, node, qf.tsSeries, nil + return finalSQL, values, node, qf.tsSeries, valueWithoutAggregation, nil } func buildMultipleMetricSamplesQuery(filter timeFilter, series []pgmodel.SeriesID) (string, error) { diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 0d49d97f3f..6e1787096d 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -17,27 +17,24 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) type querySamples struct { *pgxQuerier - rollup *rollupDecider + r *rollup.Manager } func newQuerySamples(qr *pgxQuerier) *querySamples { - rollup := &rollupDecider{ - conn: qr.tools.conn, - } - go rollup.refresh() return &querySamples{ pgxQuerier: qr, - rollup: rollup, + r: rollup.NewManager(qr.tools.conn), } } // Select implements the SamplesQuerier interface. It is the entry point for our // own version of the Prometheus engine. -func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node, rollupCfg *RollupConfig) { +func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node, rollupCfg *rollup.Config) { sampleRows, topNode, rollupCfg, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) if err != nil { return errorSeriesSet{err: err}, nil, nil @@ -46,7 +43,7 @@ func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHin return responseSeriesSet, topNode, rollupCfg } -func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) (rows []sampleRow, topNode parser.Node, cfg *RollupConfig, err error) { +func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) (rows []sampleRow, topNode parser.Node, cfg *rollup.Config, err error) { metadata, err := getEvaluationMetadata(q.tools, mint, maxt, GetPromQLMetadata(ms, hints, qh, path)) if err != nil { return nil, nil, nil, fmt.Errorf("get evaluation metadata: %w", err) @@ -54,10 +51,10 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH filter := &metadata.timeFilter - rollupConfig := q.rollup.decide(mint/1000, maxt/1000, filter.metric) + rollupConfig := q.r.Decide(mint/1000, maxt/1000, filter.metric) if rollupConfig != nil { // Use metric rollups. - fmt.Println("schema name", rollupConfig.schemaName) + fmt.Println("schema name", rollupConfig.SchemaName()) if filter.schema == model.SchemaNameLabelName { // The query belongs to custom Caggs. We need to warn the user that this query will be treated as // general automatic downsampled query. That's the most we can do. @@ -66,7 +63,7 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH filter.schema = "" filter.column = "" } - metadata.RollupConfig = rollupConfig + metadata.rollupConfig = rollupConfig } if metadata.isSingleMetric { @@ -87,14 +84,14 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH return nil, nil, nil, err } - return sampleRows, topNode, metadata.RollupConfig, nil + return sampleRows, topNode, metadata.rollupConfig, nil } // Multiple vector selector case. sampleRows, err := fetchMultipleMetricsSamples(q.tools, metadata) if err != nil { return nil, nil, nil, err } - return sampleRows, nil, metadata.RollupConfig, nil + return sampleRows, nil, metadata.rollupConfig, nil } // fetchSingleMetricSamples returns all the result rows for a single metric @@ -103,7 +100,7 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH // successfully applied, the new top node is returned together with the metric // rows. For more information about top nodes, see `engine.populateSeries`. func fetchSingleMetricSamples(tools *queryTools, metadata *evalMetadata) ([]sampleRow, parser.Node, error) { - sqlQuery, values, topNode, tsSeries, err := buildSingleMetricSamplesQuery(metadata) + sqlQuery, values, topNode, tsSeries, valueWithoutAgg, err := buildSingleMetricSamplesQuery(metadata) if err != nil { return nil, nil, err } @@ -137,7 +134,7 @@ func fetchSingleMetricSamples(tools *queryTools, metadata *evalMetadata) ([]samp } filter := metadata.timeFilter - samplesRows, err := appendSampleRows(make([]sampleRow, 0, 1), rows, tsSeries, updatedMetricName, filter.schema, filter.column) + samplesRows, err := appendSampleRows(make([]sampleRow, 0, 1), rows, tsSeries, updatedMetricName, filter.schema, filter.column, valueWithoutAgg) if err != nil { return nil, topNode, fmt.Errorf("appending sample rows: %w", err) } @@ -205,7 +202,7 @@ func fetchMultipleMetricsSamples(tools *queryTools, metadata *evalMetadata) ([]s return nil, err } // Append all rows into results. - results, err = appendSampleRows(results, rows, nil, "", "", "") + results, err = appendSampleRows(results, rows, nil, "", "", "", false) rows.Close() if err != nil { rows.Close() diff --git a/pkg/pgmodel/querier/rollup/config.go b/pkg/pgmodel/querier/rollup/config.go new file mode 100644 index 0000000000..7fcbae2e83 --- /dev/null +++ b/pkg/pgmodel/querier/rollup/config.go @@ -0,0 +1,22 @@ +package rollup + +import "time" + +type Config struct { + columnClause string + schemaName string + interval time.Duration + managerRef *Manager +} + +func (c *Config) Interval() time.Duration { + return c.interval +} + +func (c *Config) SchemaName() string { + return c.schemaName +} + +func (c *Config) ColumnClause() string { + return c.columnClause +} diff --git a/pkg/pgmodel/querier/rollup.go b/pkg/pgmodel/querier/rollup/decider.go similarity index 80% rename from pkg/pgmodel/querier/rollup.go rename to pkg/pgmodel/querier/rollup/decider.go index 067861e331..5f8b47e1f5 100644 --- a/pkg/pgmodel/querier/rollup.go +++ b/pkg/pgmodel/querier/rollup/decider.go @@ -2,7 +2,7 @@ // Please see the included NOTICE for copyright information and // LICENSE for a copy of the license. -package querier +package rollup import ( "context" @@ -18,23 +18,24 @@ const ( defaultDurationBetweenSamples = 15 * time.Second low = 200 high = 2000 - noRollupSchema = "" - noColumn = "" ) -var ( - errNoMetricMetadata = fmt.Errorf("metric metadata not found") - errNoMetricColumnRelationship = fmt.Errorf("metric column relation does not exist. Possible invalid metric type") -) - -type rollupDecider struct { +type Manager struct { conn pgxconn.PgxConn schemaResolutionCache map[string]time.Duration // schema_name: resolution - metricMetadataCache map[string]string // metric_name: metric_type + metricTypeCache map[string]string // metric_name: metric_type resolutionInASCOrder []time.Duration } -func (r *rollupDecider) refresh() { +func NewManager(conn pgxconn.PgxConn) *Manager { + rollup := &Manager{ + conn: conn, + } + go rollup.refresh() + return rollup +} + +func (r *Manager) refresh() { refreshInterval := time.NewTicker(time.Minute) defer refreshInterval.Stop() for { @@ -45,7 +46,7 @@ func (r *rollupDecider) refresh() { } } -func (r *rollupDecider) refreshRollupResolution() (proceedToNextStep bool) { +func (r *Manager) refreshRollupResolution() (proceedToNextStep bool) { var ( schemaName []string resolution []time.Duration @@ -84,7 +85,7 @@ func (r *rollupDecider) refreshRollupResolution() (proceedToNextStep bool) { return true } -func (r *rollupDecider) refreshMetricMetadata() { +func (r *Manager) refreshMetricMetadata() { var metricName, metricType []string err := r.conn.QueryRow(context.Background(), "select array_agg(metric_family), array_agg(type) from _prom_catalog.metadata").Scan(&metricName, &metricType) if err != nil { @@ -95,10 +96,13 @@ func (r *rollupDecider) refreshMetricMetadata() { for i := range metricName { metadataCache[metricName[i]] = metricType[i] } - r.metricMetadataCache = metadataCache + r.metricTypeCache = metadataCache } -func (r *rollupDecider) decide(minSeconds, maxSeconds int64, metricName string) *RollupConfig { +func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Config { + if len(r.resolutionInASCOrder) == 0 { + return nil + } estimateSamples := func(resolution time.Duration) int64 { return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) } @@ -106,8 +110,8 @@ func (r *rollupDecider) decide(minSeconds, maxSeconds int64, metricName string) //if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { // return nil //} - // -- DEBUG: to return always the lowest resolution. - //return r.getConfig(metricName, r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]), true + //-- DEBUG: to return always the lowest resolution. + return r.getConfig(metricName, r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) @@ -138,7 +142,7 @@ func (r *rollupDecider) decide(minSeconds, maxSeconds int64, metricName string) return r.getConfig(metricName, acceptableResolution[0]) } -func (r *rollupDecider) withinRange(totalSamples int64) bool { +func (r *Manager) withinRange(totalSamples int64) bool { return low <= totalSamples && totalSamples <= high } @@ -151,23 +155,13 @@ var metricTypeColumnRelationship = map[string]string{ "GAUGE": "(sum / count)", } -type RollupConfig struct { - columnClause string - schemaName string - interval time.Duration -} - -func (r *RollupConfig) Interval() time.Duration { - return r.interval -} - -func (r *rollupDecider) getConfig(metricName string, resolution time.Duration) *RollupConfig { +func (r *Manager) getConfig(metricName string, resolution time.Duration) *Config { schemaName := r.getSchemaFor(resolution) - metricType, ok := r.metricMetadataCache[metricName] + metricType, ok := r.metricTypeCache[metricName] if !ok { log.Debug("msg", fmt.Sprintf("metric metadata not found for %s. Refreshing and trying again", metricName)) r.refreshMetricMetadata() - metricType, ok = r.metricMetadataCache[metricName] + metricType, ok = r.metricTypeCache[metricName] if ok { goto checkMetricColumnRelationship } @@ -178,14 +172,15 @@ checkMetricColumnRelationship: if !ok { return nil } - return &RollupConfig{ + return &Config{ columnClause: columnString, schemaName: schemaName, interval: r.schemaResolutionCache[schemaName], + managerRef: r, } } -func (r *rollupDecider) getSchemaFor(resolution time.Duration) string { +func (r *Manager) getSchemaFor(resolution time.Duration) string { for schema, res := range r.schemaResolutionCache { if res == resolution { return schema diff --git a/pkg/pgmodel/querier/rollup_test.go b/pkg/pgmodel/querier/rollup/decider_test.go similarity index 99% rename from pkg/pgmodel/querier/rollup_test.go rename to pkg/pgmodel/querier/rollup/decider_test.go index 3520e204c0..05352d714d 100644 --- a/pkg/pgmodel/querier/rollup_test.go +++ b/pkg/pgmodel/querier/rollup/decider_test.go @@ -1,4 +1,4 @@ -package querier +package rollup import ( "context" diff --git a/pkg/pgmodel/querier/rollup/promql_optimize.go b/pkg/pgmodel/querier/rollup/promql_optimize.go new file mode 100644 index 0000000000..93a9b37528 --- /dev/null +++ b/pkg/pgmodel/querier/rollup/promql_optimize.go @@ -0,0 +1,53 @@ +package rollup + +import "github.com/prometheus/prometheus/promql/parser" + +type SqlOptimizer struct { + columnName, instantQueryAgg string +} + +func (s SqlOptimizer) ColumnName() string { + return s.columnName +} + +func (s SqlOptimizer) InstantQueryAgg() string { + return s.instantQueryAgg +} + +type ( + metricType string + promqlFuncName string +) + +var supportedPromQLFunc = map[metricType]map[promqlFuncName]SqlOptimizer{ + "GAUGE": { + "min_over_time": {columnName: "min", instantQueryAgg: "min(value)"}, + "max_over_time": {columnName: "max", instantQueryAgg: "max(value)"}, + "avg_over_time": {columnName: "sum / count"}, + "sum_over_time": {}, + "count_over_time": {}, + }, +} + +func (c *Config) SupportsFunctionalOptimization(functionalCall parser.Node, metricName string) *SqlOptimizer { + typ, ok := c.managerRef.metricTypeCache[metricName] + if !ok { + // No metadata found, hence no optimization. + return nil + } + node, isFuncCall := functionalCall.(*parser.Call) + if !isFuncCall { + return nil + } + fnName := node.Func.Name + + supportedOptimizations, found := supportedPromQLFunc[metricType(typ)] + if !found { + return nil + } + optimizer, ok := supportedOptimizations[promqlFuncName(fnName)] + if !ok { + return nil + } + return &optimizer +} diff --git a/pkg/pgmodel/querier/row.go b/pkg/pgmodel/querier/row.go index 5d535686d2..63ff8315ec 100644 --- a/pkg/pgmodel/querier/row.go +++ b/pkg/pgmodel/querier/row.go @@ -164,7 +164,7 @@ func (r *sampleRow) GetAdditionalLabels() (ll labels.Labels) { // appendSampleRows adds new results rows to already existing result rows and // returns the as a result. -func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSeries, metric, schema, column string) ([]sampleRow, error) { +func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSeries, metric, schema, column string, valueWithoutAgg bool) ([]sampleRow, error) { if in.Err() != nil { return out, in.Err() } @@ -179,7 +179,17 @@ func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSer times := tPool.Get().(*pgtype.TimestamptzArray) times.Elements = times.Elements[:0] timesWrapper := timestamptzArrayWrapper{times} - row.err = in.Scan(&row.labelIds, ×Wrapper, &valuesWrapper) + if valueWithoutAgg { + var ( + flt pgtype.Float8 + ts pgtype.Timestamptz + ) + row.err = in.Scan(&row.labelIds, &ts, &flt) + timesWrapper.Elements = []pgtype.Timestamptz{ts} + valuesWrapper.Elements = []pgtype.Float8{flt} + } else { + row.err = in.Scan(&row.labelIds, ×Wrapper, &valuesWrapper) + } row.timeArrayOwnership = times row.times = newRowTimestampSeries(times) } else { diff --git a/pkg/pgmodel/querier/series_set.go b/pkg/pgmodel/querier/series_set.go index f087203043..f75d76b04a 100644 --- a/pkg/pgmodel/querier/series_set.go +++ b/pkg/pgmodel/querier/series_set.go @@ -175,7 +175,6 @@ type pgxSeriesIterator struct { // newIterator returns an iterator over the samples. It expects times and values to be the same length. func newIterator(times TimestampSeries, values *pgtype.Float8Array) *pgxSeriesIterator { - fmt.Println("totalSamples", times.Len()) return &pgxSeriesIterator{ cur: -1, totalSamples: times.Len(), diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 4d23429b2a..359e13945b 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -45,6 +45,7 @@ import ( "github.com/prometheus/prometheus/util/stats" pgquerier "github.com/timescale/promscale/pkg/pgmodel/querier" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" "github.com/timescale/promscale/pkg/util" ) @@ -73,7 +74,7 @@ type SamplesQuerier interface { // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. - Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, cfg *pgquerier.RollupConfig) + Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, cfg *rollup.Config) } const ( @@ -648,7 +649,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } defer querier.Close() - topNodes, rollupInterval := ng.populateSeries(querier, s) + topNodes, rollupInterval := ng.populateSeries(querier, s, s.Start == s.End) prepareSpanTimer.Finish() // Modify the offset of vector and matrix selectors for the @ modifier @@ -879,7 +880,7 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS // evaluation. These terminal nodes are called "top nodes". // // populateSeries returns a map keyed by all top nodes. -func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) (topNodes map[parser.Node]struct{}, rollupInterval time.Duration) { +func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt, isInstantQuery bool) (topNodes map[parser.Node]struct{}, rollupInterval time.Duration) { // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. @@ -900,8 +901,9 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt } qh = &pgquerier.QueryHints{ - CurrentNode: n, - Lookback: ng.lookbackDelta, + CurrentNode: n, + Lookback: ng.lookbackDelta, + IsInstantQuery: isInstantQuery, } evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) @@ -1337,6 +1339,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { defer span.End() if _, isTopNode := ev.topNodes[expr]; isTopNode { + fmt.Println("in eval() topNode", ev, ev.topNodes[expr]) // the storage layer has already processed this node. Just return // the result. var ( @@ -1370,6 +1373,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { if mat == nil { ev.error(fmt.Errorf("Matrix not filled in")) } + fmt.Println("total samples in mat", mat.TotalSamples()) return mat, warnings } @@ -1402,6 +1406,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { }, e.Param, e.Expr) case *parser.Call: + fmt.Println("into call") call := FunctionCalls[e.Func.Name] if e.Func.Name == "timestamp" { // Matrix evaluation always returns the evaluation time, @@ -1542,6 +1547,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { enh.Ts = ts // Make the function call. outVec := call(inArgs, e.Args, enh) + fmt.Println("outVec", outVec) ev.samplesStats.IncrementSamplesAtStep(step, int64(len(points))) enh.Out = outVec[:0] if len(outVec) > 0 { diff --git a/pkg/promql/test.go b/pkg/promql/test.go index 192d7167ff..67e9b84f5e 100644 --- a/pkg/promql/test.go +++ b/pkg/promql/test.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/assert" "github.com/timescale/promscale/pkg/pgmodel/querier" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) var ( @@ -110,7 +111,7 @@ type QuerierWrapper struct { storage.Querier } -func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node, *querier.RollupConfig) { +func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node, *rollup.Config) { ss := t.Querier.Select(b, sh, m...) return ss, nil, nil } diff --git a/pkg/query/queryable.go b/pkg/query/queryable.go index 492f3771f9..1845b1aa03 100644 --- a/pkg/query/queryable.go +++ b/pkg/query/queryable.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/lreader" pgQuerier "github.com/timescale/promscale/pkg/pgmodel/querier" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" "github.com/timescale/promscale/pkg/promql" ) @@ -65,7 +66,13 @@ func (q *samplesQuerier) Close() { } } -func (q *samplesQuerier) Select(sortSeries bool, hints *storage.SelectHints, qh *pgQuerier.QueryHints, path []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, config *pgQuerier.RollupConfig) { +func (q *samplesQuerier) Select( + sortSeries bool, + hints *storage.SelectHints, + qh *pgQuerier.QueryHints, + path []parser.Node, + matchers ...*labels.Matcher, +) (seriesSet storage.SeriesSet, topNode parser.Node, config *rollup.Config) { qry := q.metricsReader.SamplesQuerier() ss, n, cfg := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) q.seriesSets = append(q.seriesSets, ss) From 528f5780fb908e38a4cb4bef520b04bcd8987265 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Tue, 20 Sep 2022 17:48:45 +0530 Subject: [PATCH 07/12] Cleanup: Implementation of optimizing PromQL execution using rollup. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/query_builder_samples.go | 55 +++++++------ pkg/pgmodel/querier/rollup/config.go | 11 +-- pkg/pgmodel/querier/rollup/decider.go | 32 ++------ pkg/pgmodel/querier/rollup/promql_optimize.go | 78 ++++++++++--------- pkg/promql/engine.go | 1 - 5 files changed, 83 insertions(+), 94 deletions(-) diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index fa255c1132..f99b4489a6 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -12,7 +12,6 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/timescale/promscale/pkg/pgmodel/common/schema" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" - "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) const ( @@ -136,17 +135,35 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ qf, node := getAggregators(metadata.promqlMetadata, metadata.rollupConfig != nil) - var ( - rollupOptimizer *rollup.SqlOptimizer - useInstantQueryOptimizer bool - grandparent parser.Node - ) + filter := metadata.timeFilter + column := filter.column + samplesSchema := filter.schema + + var instantQueryAgg string if metadata.rollupConfig != nil { + samplesSchema = metadata.rollupConfig.SchemaName() + + rollupOptimizer := metadata.rollupConfig.GetOptimizer(metadata.metric) + column = rollupOptimizer.RegularColumnName() + + // See if we can optimize the query aggregation, like min_over_time(metric[1h]) path := metadata.promqlMetadata.path if len(path) >= 2 { - grandparent = path[len(path)-2] - rollupOptimizer = metadata.rollupConfig.SupportsFunctionalOptimization(grandparent, metadata.metric) - useInstantQueryOptimizer = rollupOptimizer != nil && metadata.queryHints.IsInstantQuery + // May contain a functional call. Let's check it out. + grandparent := path[len(path)-2] + if callNode, isPromQLFunc := grandparent.(*parser.Call); isPromQLFunc { + fnName := callNode.Func.Name + + columnClause := rollupOptimizer.GetColumnClause(fnName) + if columnClause != "" { + column = columnClause + + if metadata.queryHints.IsInstantQuery { + instantQueryAgg = rollupOptimizer.GetAggForInstantQuery(fnName) + node = grandparent // We have already evaluated the aggregation, hence no need to compute in PromQL engine. Hence, sent as a pushdown response. + } + } + } } } @@ -162,9 +179,8 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ } selectors = append(selectors, "result.time_array") - if useInstantQueryOptimizer { + if instantQueryAgg != "" { selectorClauses = append(selectorClauses, " current_timestamp::timestamptz as time_array") - node = grandparent // Send the optimization as top node so that evaluator in PromQL engine does not reevaluate. } else { selectorClauses = append(selectorClauses, timeClauseBound+" as time_array") } @@ -174,10 +190,10 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ return "", nil, nil, nil, false, err } selectors = append(selectors, "result.value_array") - valueWithoutAggregation := false - if useInstantQueryOptimizer { - selectorClauses = append(selectorClauses, rollupOptimizer.InstantQueryAgg()+" as value_array") + valueWithoutAggregation := false + if instantQueryAgg != "" { + selectorClauses = append(selectorClauses, instantQueryAgg+" as value_array") valueWithoutAggregation = true } else { selectorClauses = append(selectorClauses, valueClauseBound+" as value_array") @@ -235,7 +251,6 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ // The selectHints' time range is calculated by `getTimeRangesForSelector`, // which determines the correct `scan_start` for the current expression. - filter := metadata.timeFilter sh := metadata.selectHints var start, end string // selectHints are non-nil when the query was initiated through the `query` @@ -247,16 +262,6 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ start, end = metadata.timeFilter.start, metadata.timeFilter.end } - samplesSchema, column := filter.schema, filter.column - if metadata.rollupConfig != nil { - // Use rollups. - samplesSchema = metadata.rollupConfig.SchemaName() - column = metadata.rollupConfig.ColumnClause() - if rollupOptimizer != nil { - column = rollupOptimizer.ColumnName() - } - } - finalSQL := fmt.Sprintf(template, pgx.Identifier{samplesSchema, filter.metric}.Sanitize(), pgx.Identifier{schema.PromDataSeries, filter.seriesTable}.Sanitize(), diff --git a/pkg/pgmodel/querier/rollup/config.go b/pkg/pgmodel/querier/rollup/config.go index 7fcbae2e83..0e8d228e83 100644 --- a/pkg/pgmodel/querier/rollup/config.go +++ b/pkg/pgmodel/querier/rollup/config.go @@ -3,10 +3,9 @@ package rollup import "time" type Config struct { - columnClause string - schemaName string - interval time.Duration - managerRef *Manager + schemaName string + interval time.Duration + managerRef *Manager } func (c *Config) Interval() time.Duration { @@ -16,7 +15,3 @@ func (c *Config) Interval() time.Duration { func (c *Config) SchemaName() string { return c.schemaName } - -func (c *Config) ColumnClause() string { - return c.columnClause -} diff --git a/pkg/pgmodel/querier/rollup/decider.go b/pkg/pgmodel/querier/rollup/decider.go index 5f8b47e1f5..5fd474c23f 100644 --- a/pkg/pgmodel/querier/rollup/decider.go +++ b/pkg/pgmodel/querier/rollup/decider.go @@ -111,7 +111,7 @@ func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Confi // return nil //} //-- DEBUG: to return always the lowest resolution. - return r.getConfig(metricName, r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) + //return r.getConfig(r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) @@ -125,7 +125,7 @@ func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Confi for _, res := range r.resolutionInASCOrder { estimate := estimateSamples(res) if estimate < high { - return r.getConfig(metricName, res) + return r.getConfig(res) } } // None of the resolutions were below the upper limit. Hence, respond with the lowest available resolution. @@ -136,10 +136,10 @@ func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Confi // // Note: For understanding resolutions, in a case of resolutions [1m, 5m, 15m, 1h, 1w], // 1m is the highest resolution (since maximum granularity) and 1w is the lowest resolution (due to lowest granularity). - return r.getConfig(metricName, lowestResolution) + return r.getConfig(lowestResolution) } // Choose the highest resolution for maximum granularity. - return r.getConfig(metricName, acceptableResolution[0]) + return r.getConfig(acceptableResolution[0]) } func (r *Manager) withinRange(totalSamples int64) bool { @@ -155,28 +155,12 @@ var metricTypeColumnRelationship = map[string]string{ "GAUGE": "(sum / count)", } -func (r *Manager) getConfig(metricName string, resolution time.Duration) *Config { +func (r *Manager) getConfig(resolution time.Duration) *Config { schemaName := r.getSchemaFor(resolution) - metricType, ok := r.metricTypeCache[metricName] - if !ok { - log.Debug("msg", fmt.Sprintf("metric metadata not found for %s. Refreshing and trying again", metricName)) - r.refreshMetricMetadata() - metricType, ok = r.metricTypeCache[metricName] - if ok { - goto checkMetricColumnRelationship - } - return nil - } -checkMetricColumnRelationship: - columnString, ok := metricTypeColumnRelationship[metricType] - if !ok { - return nil - } return &Config{ - columnClause: columnString, - schemaName: schemaName, - interval: r.schemaResolutionCache[schemaName], - managerRef: r, + schemaName: schemaName, + interval: r.schemaResolutionCache[schemaName], + managerRef: r, } } diff --git a/pkg/pgmodel/querier/rollup/promql_optimize.go b/pkg/pgmodel/querier/rollup/promql_optimize.go index 93a9b37528..7813b380bf 100644 --- a/pkg/pgmodel/querier/rollup/promql_optimize.go +++ b/pkg/pgmodel/querier/rollup/promql_optimize.go @@ -1,53 +1,59 @@ package rollup -import "github.com/prometheus/prometheus/promql/parser" - -type SqlOptimizer struct { - columnName, instantQueryAgg string -} - -func (s SqlOptimizer) ColumnName() string { - return s.columnName -} - -func (s SqlOptimizer) InstantQueryAgg() string { - return s.instantQueryAgg +func (c *Config) GetOptimizer(metricName string) *SqlOptimizer { + typ, ok := c.managerRef.metricTypeCache[metricName] + if !ok { + // No metadata found, hence no optimization. + return nil + } + return &SqlOptimizer{ + typ: metricType(typ), + } } type ( - metricType string - promqlFuncName string + metricType string + promqlFuncName string + columnInformation struct { + columnName string + instantQueryAgg string + } ) -var supportedPromQLFunc = map[metricType]map[promqlFuncName]SqlOptimizer{ +var metricFuncRelation = map[metricType]map[promqlFuncName]columnInformation{ "GAUGE": { + "": {columnName: "sum / count"}, // When no function is used. + "avg_over_time": {columnName: "sum / count", instantQueryAgg: "avg(value)"}, "min_over_time": {columnName: "min", instantQueryAgg: "min(value)"}, "max_over_time": {columnName: "max", instantQueryAgg: "max(value)"}, - "avg_over_time": {columnName: "sum / count"}, - "sum_over_time": {}, - "count_over_time": {}, + "sum_over_time": {columnName: "sum", instantQueryAgg: "sum(value)"}, + "count_over_time": {columnName: "count", instantQueryAgg: "sum(value)"}, // Since we want to sum all the counts of each bucket. }, } -func (c *Config) SupportsFunctionalOptimization(functionalCall parser.Node, metricName string) *SqlOptimizer { - typ, ok := c.managerRef.metricTypeCache[metricName] - if !ok { - // No metadata found, hence no optimization. - return nil - } - node, isFuncCall := functionalCall.(*parser.Call) - if !isFuncCall { - return nil - } - fnName := node.Func.Name +type SqlOptimizer struct { + typ metricType +} - supportedOptimizations, found := supportedPromQLFunc[metricType(typ)] - if !found { - return nil +func (s *SqlOptimizer) RegularColumnName() string { + r := metricFuncRelation[s.typ] + return r[""].columnName +} + +func (s *SqlOptimizer) GetColumnClause(funcName string) string { + r := metricFuncRelation[s.typ] + c, supported := r[promqlFuncName(funcName)] + if !supported { + return "" } - optimizer, ok := supportedOptimizations[promqlFuncName(fnName)] - if !ok { - return nil + return c.columnName +} + +func (s *SqlOptimizer) GetAggForInstantQuery(funcName string) string { + r := metricFuncRelation[s.typ] + c, supported := r[promqlFuncName(funcName)] + if !supported { + return "" } - return &optimizer + return c.instantQueryAgg } diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 359e13945b..7ad9d45f7e 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -1547,7 +1547,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { enh.Ts = ts // Make the function call. outVec := call(inArgs, e.Args, enh) - fmt.Println("outVec", outVec) ev.samplesStats.IncrementSamplesAtStep(step, int64(len(points))) enh.Out = outVec[:0] if len(outVec) > 0 { From 5b1c6835f378db99cccbef4fa0abd03598433eca Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Wed, 21 Sep 2022 19:47:53 +0530 Subject: [PATCH 08/12] Prepare for PoC demo. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/query_sample.go | 12 ++++++++++++ pkg/pgmodel/querier/rollup/decider.go | 5 +++++ pkg/pgmodel/querier/row.go | 2 ++ pkg/promql/engine.go | 20 ++++++-------------- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 6e1787096d..0514d7332f 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -40,6 +40,18 @@ func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHin return errorSeriesSet{err: err}, nil, nil } responseSeriesSet := buildSeriesSet(sampleRows, q.tools.labelsReader) + + // debug + //for responseSeriesSet.Next() { + // at := responseSeriesSet.At() + // itr := at.Iterator() + // c := 0 + // for itr.Next() { + // c++ + // } + // fmt.Println("debug count", at.Labels(), c) + //} + // debug return responseSeriesSet, topNode, rollupCfg } diff --git a/pkg/pgmodel/querier/rollup/decider.go b/pkg/pgmodel/querier/rollup/decider.go index 5fd474c23f..fcb83343d7 100644 --- a/pkg/pgmodel/querier/rollup/decider.go +++ b/pkg/pgmodel/querier/rollup/decider.go @@ -112,13 +112,18 @@ func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Confi //} //-- DEBUG: to return always the lowest resolution. //return r.getConfig(r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) + //return nil + //return r.getConfig(time.Hour) + //return r.getConfig(time.Minute * 5) var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) + fmt.Println("resolution=>", resolution, "estimate=>", estimate) if r.withinRange(estimate) { acceptableResolution = append(acceptableResolution, resolution) } } + fmt.Println("acceptableResolution", acceptableResolution) if len(acceptableResolution) == 0 { // No resolution was found to fit within the permitted range. // Hence, find the highest resolution that is below upper (max) limit and respond. diff --git a/pkg/pgmodel/querier/row.go b/pkg/pgmodel/querier/row.go index 63ff8315ec..b1d8dae0c0 100644 --- a/pkg/pgmodel/querier/row.go +++ b/pkg/pgmodel/querier/row.go @@ -2,6 +2,7 @@ package querier import ( "encoding/binary" + "fmt" "sync" "github.com/jackc/pgtype" @@ -190,6 +191,7 @@ func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSer } else { row.err = in.Scan(&row.labelIds, ×Wrapper, &valuesWrapper) } + fmt.Println("sql layer, total raw samples", len(valuesWrapper.Elements)) row.timeArrayOwnership = times row.times = newRowTimestampSeries(times) } else { diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 7ad9d45f7e..3ffb07ccbc 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -691,6 +691,8 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } + fmt.Println("query instant: total samples", mat.TotalSamples()) + query.matrix = mat switch s.Expr.Type() { case parser.ValueTypeVector: @@ -758,6 +760,8 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } query.matrix = mat + fmt.Println("query range: total samples", mat.TotalSamples()) + if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, warnings, err } @@ -1406,7 +1410,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { }, e.Param, e.Expr) case *parser.Call: - fmt.Println("into call") call := FunctionCalls[e.Func.Name] if e.Func.Name == "timestamp" { // Matrix evaluation always returns the evaluation time, @@ -1693,6 +1696,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } mat := make(Matrix, 0, len(e.Series)) + fmt.Println("ev.lookbackDelta", ev.lookbackDelta) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) for i, s := range e.Series { it.Reset(s.Iterator()) @@ -1701,18 +1705,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { Points: getPointSlice(numSteps), } - //fmt.Println("rollupInterval", ev.rollupInterval) - //if ev.rollupInterval.Milliseconds() > ev.interval { - // // Rollup evaluated the step interval. Hence, just use the step in rollup. - // // This is important, as it **avoids unnecessary samples** (with sample `v`) due to `ts += ev.interval` (in the for loop in else block) - // // thereby **speeding up evaluation** involving rollups. - // itr := e.Series[i].Iterator() - // for itr.Next() { - // ts, v := itr.At() - // ss.Points = append(ss.Points, Point{V: v, T: ts}) - // ev.currentSamples++ - // } - //} else { + fmt.Println("ev.startTimestamp", ev.startTimestamp, "ev.endTimestamp", ev.endTimestamp) for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ _, v, ok := ev.vectorSelectorSingle(it, e, ts) @@ -1726,7 +1719,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } } } - //} if len(ss.Points) > 0 { mat = append(mat, ss) From 0b411079352c0f3607553c16d7b21d990a78499c Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Thu, 22 Sep 2022 16:44:48 +0530 Subject: [PATCH 09/12] Preparing for demo. Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/rollup/decider.go | 9 ++++--- pkg/pgmodel/querier/rollup/decider_test.go | 28 +++++++++++++++------- pkg/pgmodel/querier/row.go | 1 + pkg/promql/engine.go | 3 ++- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/pkg/pgmodel/querier/rollup/decider.go b/pkg/pgmodel/querier/rollup/decider.go index fcb83343d7..64c49e0eaa 100644 --- a/pkg/pgmodel/querier/rollup/decider.go +++ b/pkg/pgmodel/querier/rollup/decider.go @@ -107,14 +107,13 @@ func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Confi return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) } //estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) + //fmt.Println("resolution=>", "raw", "estimate=>", estimatedRawSamples) //if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { // return nil //} - //-- DEBUG: to return always the lowest resolution. - //return r.getConfig(r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1]) - //return nil - //return r.getConfig(time.Hour) - //return r.getConfig(time.Minute * 5) + // + //return nil // Always use raw samples. + //return r.getConfig(time.Minute * 5) // Always use 5 min rollup var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) diff --git a/pkg/pgmodel/querier/rollup/decider_test.go b/pkg/pgmodel/querier/rollup/decider_test.go index 05352d714d..6a7fed863a 100644 --- a/pkg/pgmodel/querier/rollup/decider_test.go +++ b/pkg/pgmodel/querier/rollup/decider_test.go @@ -2,7 +2,6 @@ package rollup import ( "context" - "fmt" "testing" "time" @@ -14,8 +13,10 @@ import ( "github.com/timescale/promscale/pkg/pgxconn" ) +const noRollupSchema = "" + func TestDecideRollup(t *testing.T) { - r := &rollupDecider{ + r := &Manager{ conn: mockPgxConn{}, schemaResolutionCache: map[string]time.Duration{ "hour": time.Hour, @@ -56,12 +57,25 @@ func TestDecideRollup(t *testing.T) { min: 0, max: 24 * time.Hour, expectedSchemaName: "5_minute", - }, { + }, + { name: "7 days", min: 0, max: 7 * 24 * time.Hour, expectedSchemaName: "15_minute", - }, { + // DRY RUN on 200 - 2000 logic + // -------- + // + // Assumed default scrape interval being 15 secs + // raw -> 40320 + // + // And, when using following rollup resolutions, num samples: + // 5 mins -> 2016 + // 15 mins -> 672 + // 1 hour -> 168 + // 1 week -> 1 + }, + { name: "30 days", min: 0, max: 30 * 24 * time.Hour, @@ -79,15 +93,11 @@ func TestDecideRollup(t *testing.T) { }, } for _, tc := range tcs { - cfg, shouldUseRollup := r.decide(int64(tc.min.Seconds()), int64(tc.max.Seconds()), "") + cfg := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds()), "") if tc.expectedSchemaName == noRollupSchema { - require.False(t, shouldUseRollup, tc.name) require.Nil(t, cfg) continue - } else { - require.True(t, shouldUseRollup, tc.name) } - fmt.Println("test name", tc.name) require.Equal(t, tc.expectedSchemaName, cfg.schemaName, tc.name) } } diff --git a/pkg/pgmodel/querier/row.go b/pkg/pgmodel/querier/row.go index b1d8dae0c0..47288592dc 100644 --- a/pkg/pgmodel/querier/row.go +++ b/pkg/pgmodel/querier/row.go @@ -191,6 +191,7 @@ func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSer } else { row.err = in.Scan(&row.labelIds, ×Wrapper, &valuesWrapper) } + fmt.Println("sql layer, total raw samples", len(valuesWrapper.Elements)) row.timeArrayOwnership = times row.times = newRowTimestampSeries(times) diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 3ffb07ccbc..21ebddd70f 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -715,6 +715,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval // Range evaluation. var intervalMs int64 + if rollupInterval > s.Interval { // Rollups evaluated the step interval. Hence, just use this step in rollup. // This is important, as it **avoids unnecessary samples** due to .Seek() as the values remain the same @@ -1691,12 +1692,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { return String{V: e.Val, T: ev.startTimestamp}, nil case *parser.VectorSelector: + ws, err := checkAndExpandSeriesSet(ev.ctx, e) if err != nil { ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } mat := make(Matrix, 0, len(e.Series)) - fmt.Println("ev.lookbackDelta", ev.lookbackDelta) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) for i, s := range e.Series { it.Reset(s.Iterator()) From 94da010eae4c334e1b8661694e3f152f8a87598c Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Mon, 26 Sep 2022 14:50:39 +0530 Subject: [PATCH 10/12] Improve avg_over_time() instant query by using sum(sum) / sum(count) Signed-off-by: Harkishen-Singh --- pkg/pgmodel/querier/rollup/promql_optimize.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pgmodel/querier/rollup/promql_optimize.go b/pkg/pgmodel/querier/rollup/promql_optimize.go index 7813b380bf..c3f3d1d4c0 100644 --- a/pkg/pgmodel/querier/rollup/promql_optimize.go +++ b/pkg/pgmodel/querier/rollup/promql_optimize.go @@ -23,7 +23,7 @@ type ( var metricFuncRelation = map[metricType]map[promqlFuncName]columnInformation{ "GAUGE": { "": {columnName: "sum / count"}, // When no function is used. - "avg_over_time": {columnName: "sum / count", instantQueryAgg: "avg(value)"}, + "avg_over_time": {columnName: "sum, count", instantQueryAgg: "sum(sum) / sum(value)"}, "min_over_time": {columnName: "min", instantQueryAgg: "min(value)"}, "max_over_time": {columnName: "max", instantQueryAgg: "max(value)"}, "sum_over_time": {columnName: "sum", instantQueryAgg: "sum(value)"}, From ff6e36c16a60ab04b7a8de93731bc324342ecf18 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Wed, 28 Sep 2022 16:23:18 +0530 Subject: [PATCH 11/12] Add PromQL optimizer for other metric types and rollupIterator to avoid empty responses. Signed-off-by: Harkishen-Singh --- migration-tool/go.sum | 5 - pkg/pgmodel/querier/query_builder_samples.go | 17 ++- pkg/pgmodel/querier/query_sample.go | 2 +- .../querier/rollup/{decider.go => manager.go} | 25 ++-- .../{decider_test.go => manager_test.go} | 9 +- pkg/pgmodel/querier/rollup/promql_optimize.go | 119 ++++++++++++++---- pkg/promql/custom_iterator.go | 60 +++++++++ pkg/promql/engine.go | 21 +++- 8 files changed, 201 insertions(+), 57 deletions(-) rename pkg/pgmodel/querier/rollup/{decider.go => manager.go} (91%) rename pkg/pgmodel/querier/rollup/{decider_test.go => manager_test.go} (95%) create mode 100644 pkg/promql/custom_iterator.go diff --git a/migration-tool/go.sum b/migration-tool/go.sum index a5dba3d739..df30f3a239 100644 --- a/migration-tool/go.sum +++ b/migration-tool/go.sum @@ -407,7 +407,6 @@ github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= -github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw= github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= @@ -998,8 +997,6 @@ github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9 github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.34.0 h1:RBmGO9d/FVjqHT0yUGQwBJhkwKV+wPCn7KGpvfab0uE= -github.com/prometheus/common v0.34.0/go.mod h1:gB3sOl7P0TvJabZpLY5uQMpUqRCPPCyRLCZYc7JZTNE= github.com/prometheus/common v0.35.0 h1:Eyr+Pw2VymWejHqCugNaQXkAi6KayVNxaHeu6khmFBE= github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= @@ -1096,7 +1093,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I= github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -1781,7 +1777,6 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index f99b4489a6..00b2553a8e 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/timescale/promscale/pkg/pgmodel/common/schema" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) const ( @@ -144,7 +145,12 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ samplesSchema = metadata.rollupConfig.SchemaName() rollupOptimizer := metadata.rollupConfig.GetOptimizer(metadata.metric) - column = rollupOptimizer.RegularColumnName() + typeOptimizer := rollup.Optimizer(rollupOptimizer.RangeQuery()) + if metadata.queryHints.IsInstantQuery { + typeOptimizer = rollupOptimizer.InstantQuery() + } + + column = typeOptimizer.RegularColumnName() // See if we can optimize the query aggregation, like min_over_time(metric[1h]) path := metadata.promqlMetadata.path @@ -154,13 +160,16 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ if callNode, isPromQLFunc := grandparent.(*parser.Call); isPromQLFunc { fnName := callNode.Func.Name - columnClause := rollupOptimizer.GetColumnClause(fnName) + columnClause := typeOptimizer.GetColumnClause(fnName) if columnClause != "" { column = columnClause if metadata.queryHints.IsInstantQuery { - instantQueryAgg = rollupOptimizer.GetAggForInstantQuery(fnName) - node = grandparent // We have already evaluated the aggregation, hence no need to compute in PromQL engine. Hence, sent as a pushdown response. + instantQueryOptimizer := typeOptimizer.(rollup.InstantQuery) + if agg := instantQueryOptimizer.GetAggForInstantQuery(fnName); agg != "" { + instantQueryAgg = agg + node = grandparent // We have already evaluated the aggregation, hence no need to compute in PromQL engine. So, send this as a pushdown response. + } } } } diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 0514d7332f..52299b4efd 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -63,7 +63,7 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH filter := &metadata.timeFilter - rollupConfig := q.r.Decide(mint/1000, maxt/1000, filter.metric) + rollupConfig := q.r.Decide(mint/1000, maxt/1000) if rollupConfig != nil { // Use metric rollups. fmt.Println("schema name", rollupConfig.SchemaName()) diff --git a/pkg/pgmodel/querier/rollup/decider.go b/pkg/pgmodel/querier/rollup/manager.go similarity index 91% rename from pkg/pgmodel/querier/rollup/decider.go rename to pkg/pgmodel/querier/rollup/manager.go index 64c49e0eaa..86703e4181 100644 --- a/pkg/pgmodel/querier/rollup/decider.go +++ b/pkg/pgmodel/querier/rollup/manager.go @@ -16,8 +16,8 @@ import ( const ( defaultDurationBetweenSamples = 15 * time.Second - low = 200 - high = 2000 + low = 500 + high = 5000 ) type Manager struct { @@ -99,21 +99,24 @@ func (r *Manager) refreshMetricMetadata() { r.metricTypeCache = metadataCache } -func (r *Manager) Decide(minSeconds, maxSeconds int64, metricName string) *Config { +func (r *Manager) Decide(minSeconds, maxSeconds int64) *Config { if len(r.resolutionInASCOrder) == 0 { return nil } estimateSamples := func(resolution time.Duration) int64 { return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) } - //estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) - //fmt.Println("resolution=>", "raw", "estimate=>", estimatedRawSamples) - //if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { - // return nil - //} - // - //return nil // Always use raw samples. - //return r.getConfig(time.Minute * 5) // Always use 5 min rollup + + //return nil + + estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) + + fmt.Println("resolution=>", "raw", "estimate=>", estimatedRawSamples) + + if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { + return nil + } + var acceptableResolution []time.Duration for _, resolution := range r.schemaResolutionCache { estimate := estimateSamples(resolution) diff --git a/pkg/pgmodel/querier/rollup/decider_test.go b/pkg/pgmodel/querier/rollup/manager_test.go similarity index 95% rename from pkg/pgmodel/querier/rollup/decider_test.go rename to pkg/pgmodel/querier/rollup/manager_test.go index 6a7fed863a..50d9a04ea0 100644 --- a/pkg/pgmodel/querier/rollup/decider_test.go +++ b/pkg/pgmodel/querier/rollup/manager_test.go @@ -19,6 +19,7 @@ func TestDecideRollup(t *testing.T) { r := &Manager{ conn: mockPgxConn{}, schemaResolutionCache: map[string]time.Duration{ + // Raw -> 15 seconds "hour": time.Hour, "5_minute": 5 * time.Minute, "15_minute": 15 * time.Minute, @@ -62,15 +63,15 @@ func TestDecideRollup(t *testing.T) { name: "7 days", min: 0, max: 7 * 24 * time.Hour, - expectedSchemaName: "15_minute", - // DRY RUN on 200 - 2000 logic + expectedSchemaName: "5_minute", + // DRY RUN on 500 - 5000 logic // -------- // // Assumed default scrape interval being 15 secs // raw -> 40320 // // And, when using following rollup resolutions, num samples: - // 5 mins -> 2016 + // 5 mins -> 2016 <-- Falls in the acceptable range. // 15 mins -> 672 // 1 hour -> 168 // 1 week -> 1 @@ -93,7 +94,7 @@ func TestDecideRollup(t *testing.T) { }, } for _, tc := range tcs { - cfg := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds()), "") + cfg := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) if tc.expectedSchemaName == noRollupSchema { require.Nil(t, cfg) continue diff --git a/pkg/pgmodel/querier/rollup/promql_optimize.go b/pkg/pgmodel/querier/rollup/promql_optimize.go index c3f3d1d4c0..a7cfe84828 100644 --- a/pkg/pgmodel/querier/rollup/promql_optimize.go +++ b/pkg/pgmodel/querier/rollup/promql_optimize.go @@ -1,26 +1,15 @@ package rollup -func (c *Config) GetOptimizer(metricName string) *SqlOptimizer { - typ, ok := c.managerRef.metricTypeCache[metricName] - if !ok { - // No metadata found, hence no optimization. - return nil - } - return &SqlOptimizer{ - typ: metricType(typ), - } -} - type ( metricType string - promqlFuncName string columnInformation struct { - columnName string - instantQueryAgg string + columnName string + instantQueryAgg string + onlyForInstantQuery bool } ) -var metricFuncRelation = map[metricType]map[promqlFuncName]columnInformation{ +var instantQueryFuncColumn = map[metricType]map[string]columnInformation{ // Relationship=> metric_type: PromQL_function_name: info_on_which_rollup_column_to_use "GAUGE": { "": {columnName: "sum / count"}, // When no function is used. "avg_over_time": {columnName: "sum, count", instantQueryAgg: "sum(sum) / sum(value)"}, @@ -29,31 +18,109 @@ var metricFuncRelation = map[metricType]map[promqlFuncName]columnInformation{ "sum_over_time": {columnName: "sum", instantQueryAgg: "sum(value)"}, "count_over_time": {columnName: "count", instantQueryAgg: "sum(value)"}, // Since we want to sum all the counts of each bucket. }, + "COUNTER": { + "": {columnName: "last_with_counter_reset"}, + }, + "HISTOGRAM": { + "": {columnName: "last_with_counter_reset"}, + }, + "SUMMARY": { + "": {columnName: "sum / count"}, + "avg_over_time": {columnName: "sum, count", instantQueryAgg: "sum(sum) / sum(value)"}, + "min_over_time": {columnName: "sum / count", instantQueryAgg: "min(value)"}, + "max_over_time": {columnName: "sum / count", instantQueryAgg: "max(value)"}, + "sum_over_time": {columnName: "sum", instantQueryAgg: "sum(value)"}, + "count_over_time": {columnName: "count", instantQueryAgg: "sum(value)"}, + }, +} + +var rangeQueryFuncColumn = map[metricType]map[string]string{ + "GAUGE": { + "": "sum / count", // When no function is used. + "min_over_time": "min", + "max_over_time": "max", + "sum_over_time": "sum", + "count_over_time": "count", + }, + "COUNTER": { + "": "last_with_counter_reset", + }, + "HISTOGRAM": { + "": "last_with_counter_reset", + }, + "SUMMARY": { + "": "sum / count", + "avg_over_time": "sum, count", + "sum_over_time": "sum", + "count_over_time": "count", + }, +} + +type Optimizer interface { + RegularColumnName() string + GetColumnClause(funcName string) string } type SqlOptimizer struct { typ metricType } -func (s *SqlOptimizer) RegularColumnName() string { - r := metricFuncRelation[s.typ] - return r[""].columnName +func (c *Config) GetOptimizer(metricName string) *SqlOptimizer { + typ, ok := c.managerRef.metricTypeCache[metricName] + if !ok { + // No metadata found, hence no optimization. + return nil + } + return &SqlOptimizer{ + typ: metricType(typ), + } } -func (s *SqlOptimizer) GetColumnClause(funcName string) string { - r := metricFuncRelation[s.typ] - c, supported := r[promqlFuncName(funcName)] - if !supported { +func (o *SqlOptimizer) InstantQuery() InstantQuery { + return InstantQuery{o.typ} +} + +func (o *SqlOptimizer) RangeQuery() RangeQuery { + return RangeQuery{o.typ} +} + +type InstantQuery struct { + typ metricType +} + +func (i InstantQuery) RegularColumnName() string { + return instantQueryFuncColumn[i.typ][""].columnName +} + +func (i InstantQuery) GetColumnClause(funcName string) string { + clause, exists := instantQueryFuncColumn[i.typ][funcName] + if !exists { return "" } - return c.columnName + return clause.columnName } -func (s *SqlOptimizer) GetAggForInstantQuery(funcName string) string { - r := metricFuncRelation[s.typ] - c, supported := r[promqlFuncName(funcName)] +func (i InstantQuery) GetAggForInstantQuery(funcName string) string { + r := instantQueryFuncColumn[i.typ] + c, supported := r[funcName] if !supported { return "" } return c.instantQueryAgg } + +type RangeQuery struct { + typ metricType +} + +func (r RangeQuery) RegularColumnName() string { + return rangeQueryFuncColumn[r.typ][""] +} + +func (r RangeQuery) GetColumnClause(funcName string) string { + clause, exists := rangeQueryFuncColumn[r.typ][funcName] + if !exists { + return "" + } + return clause +} diff --git a/pkg/promql/custom_iterator.go b/pkg/promql/custom_iterator.go new file mode 100644 index 0000000000..fe5780a767 --- /dev/null +++ b/pkg/promql/custom_iterator.go @@ -0,0 +1,60 @@ +package promql + +import ( + "fmt" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +type samplesIterator interface { + Reset(it chunkenc.Iterator) + PeekPrev() (t int64, v float64, ok bool) + Seek(t int64) bool + Next() bool + At() (int64, float64) + Err() error +} + +type rollupItr struct { + it chunkenc.Iterator + prevValue float64 + prevTime int64 +} + +func newRollupIterator() *rollupItr { + fmt.Println("using rollup iterator") + return &rollupItr{} +} + +func (r *rollupItr) Reset(it chunkenc.Iterator) { + r.it = it +} + +func (r *rollupItr) PeekPrev() (t int64, v float64, ok bool) { + if r.prevTime == 0 { + return 0, 0, false + } + return r.prevTime, r.prevValue, true +} + +func (r *rollupItr) Next() bool { + return r.it.Next() +} + +func (r *rollupItr) At() (int64, float64) { + return r.it.At() +} + +func (r *rollupItr) Seek(t int64) bool { + for r.Next() { + ts, v := r.At() + r.prevTime, r.prevValue = ts, v + if ts >= t { + return true + } + } + return false +} + +func (r *rollupItr) Err() error { + return r.it.Err() +} diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 21ebddd70f..281df65944 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -670,6 +670,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, topNodes: topNodes, + usingRollups: rollupInterval > 0, } query.sampleStats.InitStepTracking(start, start, 1) @@ -747,6 +748,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, topNodes: topNodes, + usingRollups: rollupInterval > 0, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) @@ -1007,9 +1009,11 @@ type evaluator struct { currentSamples int logger log.Logger lookbackDelta time.Duration - topNodes map[parser.Node]struct{} samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 + + topNodes map[parser.Node]struct{} + usingRollups bool } // errorf causes a panic with the input formatted into an error. @@ -1692,13 +1696,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { return String{V: e.Val, T: ev.startTimestamp}, nil case *parser.VectorSelector: - ws, err := checkAndExpandSeriesSet(ev.ctx, e) if err != nil { ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } mat := make(Matrix, 0, len(e.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + it := decideSamplesIterator(ev) for i, s := range e.Series { it.Reset(s.Iterator()) ss := Series{ @@ -1706,7 +1709,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { Points: getPointSlice(numSteps), } - fmt.Println("ev.startTimestamp", ev.startTimestamp, "ev.endTimestamp", ev.endTimestamp) for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ _, v, ok := ev.vectorSelectorSingle(it, e, ts) @@ -1837,6 +1839,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(errors.Errorf("unhandled expression of type: %T", expr)) } +func decideSamplesIterator(ev *evaluator) samplesIterator { + if ev.usingRollups { + return newRollupIterator() + } + return storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) +} + // vectorSelector evaluates a *parser.VectorSelector expression. func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { ws, err := checkAndExpandSeriesSet(ev.ctx, node) @@ -1844,7 +1853,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } vec := make(Vector, 0, len(node.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + it := decideSamplesIterator(ev) for i, s := range node.Series { it.Reset(s.Iterator()) @@ -1867,7 +1876,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } // vectorSelectorSingle evaluates a instant vector for the iterator of one time series. -func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { +func (ev *evaluator) vectorSelectorSingle(it samplesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { refTime := ts - durationMilliseconds(node.Offset) var t int64 var v float64 From 2c60416470cd4c2a0240151396413b677a4c2c3e Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Mon, 3 Oct 2022 15:12:28 +0530 Subject: [PATCH 12/12] m Signed-off-by: Harkishen-Singh --- pkg/promql/engine.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 281df65944..cc8634efc4 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -2007,7 +2007,26 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m ev.currentSamples++ out = append(out, Point{T: t, V: v}) } - } + //else if ev.usingRollups { + // fmt.Println("ok in PeekBack", ok) + // out = append(out, Point{T: t, V: v}) + // ev.currentSamples++ + //} + } + + //if len(out) == 0 && ev.usingRollups { + // // Not enough samples, so to get a minimum value, we need atleast 2 samples. Lets add the last and the second last. + // fmt.Println("CASE: len(out) == 0 && ev.usingRollups") + // t, v, ok := it.PeekBack(2) + // if ok { + // out = append(out, Point{T: t, V: v}) + // } + // t, v, ok = it.PeekBack(1) + // if ok { + // out = append(out, Point{T: t, V: v}) + // } + //} + // The seeked sample might also be in the range. if ok { t, v := it.At() @@ -2020,6 +2039,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } } ev.samplesStats.UpdatePeak(ev.currentSamples) + //fmt.Println("len of out", len(out)) return out }