diff --git a/migration-tool/go.sum b/migration-tool/go.sum index a5dba3d739..df30f3a239 100644 --- a/migration-tool/go.sum +++ b/migration-tool/go.sum @@ -407,7 +407,6 @@ github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= -github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw= github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= @@ -998,8 +997,6 @@ github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9 github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.34.0 h1:RBmGO9d/FVjqHT0yUGQwBJhkwKV+wPCn7KGpvfab0uE= -github.com/prometheus/common v0.34.0/go.mod h1:gB3sOl7P0TvJabZpLY5uQMpUqRCPPCyRLCZYc7JZTNE= github.com/prometheus/common v0.35.0 h1:Eyr+Pw2VymWejHqCugNaQXkAi6KayVNxaHeu6khmFBE= github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= @@ -1096,7 +1093,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I= github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -1781,7 +1777,6 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/api/series.go b/pkg/api/series.go index 1f1195e233..8ad91f9fba 100644 --- a/pkg/api/series.go +++ b/pkg/api/series.go @@ -78,7 +78,7 @@ func series(queryable promql.Queryable) http.HandlerFunc { var sets []storage.SeriesSet var warnings storage.Warnings for _, mset := range matcherSets { - s, _ := q.Select(false, nil, nil, nil, mset...) + s, _, _ := q.Select(false, nil, nil, nil, mset...) warnings = append(warnings, s.Warnings()...) if s.Err() != nil { respondError(w, http.StatusUnprocessableEntity, s.Err(), "execution") diff --git a/pkg/pgmodel/querier/common.go b/pkg/pgmodel/querier/common.go index 04b44b3ef5..c9a9153a38 100644 --- a/pkg/pgmodel/querier/common.go +++ b/pkg/pgmodel/querier/common.go @@ -45,8 +45,9 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro // QueryHints contain additional metadata which promscale requires type QueryHints struct { - CurrentNode parser.Node - Lookback time.Duration + CurrentNode parser.Node + Lookback time.Duration + IsInstantQuery bool } func GetMetricNameSeriesIds(conn pgxconn.PgxConn, metadata *evalMetadata) (metrics, schemas []string, correspondingSeriesIDs [][]model.SeriesID, err error) { diff --git a/pkg/pgmodel/querier/interface.go b/pkg/pgmodel/querier/interface.go index 8f67578e8f..0f4fc8d580 100644 --- a/pkg/pgmodel/querier/interface.go +++ b/pkg/pgmodel/querier/interface.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" "github.com/timescale/promscale/pkg/prompb" ) @@ -44,7 +45,7 @@ type RemoteReadQuerier interface { // matching samples. type SamplesQuerier interface { // Select returns a series set containing the exemplar that matches the supplied query parameters. - Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node) + Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, *rollup.Config) } // ExemplarQuerier queries data using the provided query data and returns the diff --git a/pkg/pgmodel/querier/metadata.go b/pkg/pgmodel/querier/metadata.go index e98b1daa3a..c08fb0d880 100644 --- a/pkg/pgmodel/querier/metadata.go +++ b/pkg/pgmodel/querier/metadata.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) // promqlMetadata is metadata received directly from our native PromQL engine. @@ -39,6 +40,7 @@ type evalMetadata struct { timeFilter timeFilter clauses []string values []interface{} + rollupConfig *rollup.Config *promqlMetadata } diff --git a/pkg/pgmodel/querier/querier.go b/pkg/pgmodel/querier/querier.go index c49419d60a..da3bd1890b 100644 --- a/pkg/pgmodel/querier/querier.go +++ b/pkg/pgmodel/querier/querier.go @@ -16,7 +16,8 @@ import ( ) type pgxQuerier struct { - tools *queryTools + tools *queryTools + samplesQuerier *querySamples } var _ Querier = (*pgxQuerier)(nil) @@ -39,6 +40,7 @@ func NewQuerier( rAuth: rAuth, }, } + querier.samplesQuerier = newQuerySamples(querier) return querier } @@ -47,7 +49,7 @@ func (q *pgxQuerier) RemoteReadQuerier() RemoteReadQuerier { } func (q *pgxQuerier) SamplesQuerier() SamplesQuerier { - return newQuerySamples(q) + return q.samplesQuerier } func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier { diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index fd1685c4eb..31fcd49bb1 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -721,7 +721,11 @@ func TestPGXQuerierQuery(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}} + querier := pgxQuerier{ + tools: &queryTools{ + conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()), + }, + } result, err := querier.RemoteReadQuerier().Query(c.query) diff --git a/pkg/pgmodel/querier/query_builder.go b/pkg/pgmodel/querier/query_builder.go index cedf1f9376..d076c194bc 100644 --- a/pkg/pgmodel/querier/query_builder.go +++ b/pkg/pgmodel/querier/query_builder.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/extension" @@ -247,15 +248,21 @@ type aggregators struct { // getAggregators returns the aggregator which should be used to fetch data for // a single metric. It may apply pushdowns to functions. -func getAggregators(metadata *promqlMetadata) (*aggregators, parser.Node) { - - agg, node, err := tryPushDown(metadata) - if err != nil { - log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) - } else if agg != nil { - return agg, node +func getAggregators(metadata *promqlMetadata, usingRollup bool) (*aggregators, parser.Node) { + if !usingRollup { + agg, node, err := tryPushDown(metadata) + if err != nil { + log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) + } else if agg != nil { + return agg, node + } } + //fmt.Println("start", metadata.selectHints.Start) + //fmt.Println("end", metadata.selectHints.End) + //fmt.Println("range", metadata.selectHints.Range) + //fmt.Println("end - start in mins", (metadata.selectHints.End-metadata.selectHints.Start)/(1000*60)) + defaultAggregators := &aggregators{ timeClause: "array_agg(time)", valueClause: "array_agg(value)", diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index 8250cd78d6..00b2553a8e 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package querier import ( @@ -8,6 +12,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/timescale/promscale/pkg/pgmodel/common/schema" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) const ( @@ -103,7 +108,7 @@ const ( // buildSingleMetricSamplesQuery builds a SQL query which fetches the data for // one metric. -func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, error) { +func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, bool, error) { // The basic structure of the SQL query which this function produces is: // SELECT // series.labels @@ -127,7 +132,49 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ // When pushdowns are available, the is a pushdown // function which the promscale extension provides. - qf, node := getAggregators(metadata.promqlMetadata) + // TODO NEXT: Instant query with max_over_time(), min_over_time() + + qf, node := getAggregators(metadata.promqlMetadata, metadata.rollupConfig != nil) + + filter := metadata.timeFilter + column := filter.column + samplesSchema := filter.schema + + var instantQueryAgg string + if metadata.rollupConfig != nil { + samplesSchema = metadata.rollupConfig.SchemaName() + + rollupOptimizer := metadata.rollupConfig.GetOptimizer(metadata.metric) + typeOptimizer := rollup.Optimizer(rollupOptimizer.RangeQuery()) + if metadata.queryHints.IsInstantQuery { + typeOptimizer = rollupOptimizer.InstantQuery() + } + + column = typeOptimizer.RegularColumnName() + + // See if we can optimize the query aggregation, like min_over_time(metric[1h]) + path := metadata.promqlMetadata.path + if len(path) >= 2 { + // May contain a functional call. Let's check it out. + grandparent := path[len(path)-2] + if callNode, isPromQLFunc := grandparent.(*parser.Call); isPromQLFunc { + fnName := callNode.Func.Name + + columnClause := typeOptimizer.GetColumnClause(fnName) + if columnClause != "" { + column = columnClause + + if metadata.queryHints.IsInstantQuery { + instantQueryOptimizer := typeOptimizer.(rollup.InstantQuery) + if agg := instantQueryOptimizer.GetAggForInstantQuery(fnName); agg != "" { + instantQueryAgg = agg + node = grandparent // We have already evaluated the aggregation, hence no need to compute in PromQL engine. So, send this as a pushdown response. + } + } + } + } + } + } var selectors, selectorClauses []string values := metadata.values @@ -137,17 +184,29 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ var err error timeClauseBound, values, err = setParameterNumbers(qf.timeClause, values, qf.timeParams...) if err != nil { - return "", nil, nil, nil, err + return "", nil, nil, nil, false, err } selectors = append(selectors, "result.time_array") - selectorClauses = append(selectorClauses, timeClauseBound+" as time_array") + + if instantQueryAgg != "" { + selectorClauses = append(selectorClauses, " current_timestamp::timestamptz as time_array") + } else { + selectorClauses = append(selectorClauses, timeClauseBound+" as time_array") + } } valueClauseBound, values, err := setParameterNumbers(qf.valueClause, values, qf.valueParams...) if err != nil { - return "", nil, nil, nil, err + return "", nil, nil, nil, false, err } selectors = append(selectors, "result.value_array") - selectorClauses = append(selectorClauses, valueClauseBound+" as value_array") + + valueWithoutAggregation := false + if instantQueryAgg != "" { + selectorClauses = append(selectorClauses, instantQueryAgg+" as value_array") + valueWithoutAggregation = true + } else { + selectorClauses = append(selectorClauses, valueClauseBound+" as value_array") + } orderByClause := "ORDER BY time" if qf.unOrdered { @@ -201,7 +260,6 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ // The selectHints' time range is calculated by `getTimeRangesForSelector`, // which determines the correct `scan_start` for the current expression. - filter := metadata.timeFilter sh := metadata.selectHints var start, end string // selectHints are non-nil when the query was initiated through the `query` @@ -214,7 +272,7 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ } finalSQL := fmt.Sprintf(template, - pgx.Identifier{filter.schema, filter.metric}.Sanitize(), + pgx.Identifier{samplesSchema, filter.metric}.Sanitize(), pgx.Identifier{schema.PromDataSeries, filter.seriesTable}.Sanitize(), strings.Join(cases, " AND "), start, @@ -222,10 +280,9 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ strings.Join(selectorClauses, ", "), strings.Join(selectors, ", "), orderByClause, - pgx.Identifier{filter.column}.Sanitize(), + column, ) - - return finalSQL, values, node, qf.tsSeries, nil + return finalSQL, values, node, qf.tsSeries, valueWithoutAggregation, nil } func buildMultipleMetricSamplesQuery(filter timeFilter, series []pgmodel.SeriesID) (string, error) { diff --git a/pkg/pgmodel/querier/query_remote_read.go b/pkg/pgmodel/querier/query_remote_read.go index 67611fe4f3..244f71cf11 100644 --- a/pkg/pgmodel/querier/query_remote_read.go +++ b/pkg/pgmodel/querier/query_remote_read.go @@ -27,7 +27,9 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro } qrySamples := newQuerySamples(q.pgxQuerier) - sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) + // TODO after PoC (harkishen): Do not use rollup in case of remote-read. This is because remote-read results are merged with local + // Prometheus data and using rollups here will cause evaluation problems. + sampleRows, _, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) if err != nil { return nil, err } diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 310cc8dc4a..52299b4efd 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package querier import ( @@ -9,44 +13,79 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" + "github.com/timescale/promscale/pkg/pgmodel/model" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) type querySamples struct { *pgxQuerier + r *rollup.Manager } func newQuerySamples(qr *pgxQuerier) *querySamples { - return &querySamples{qr} + return &querySamples{ + pgxQuerier: qr, + r: rollup.NewManager(qr.tools.conn), + } } // Select implements the SamplesQuerier interface. It is the entry point for our // own version of the Prometheus engine. -func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node) { - sampleRows, topNode, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) +func (q *querySamples) Select(mint, maxt int64, _ bool, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms ...*labels.Matcher) (seriesSet SeriesSet, node parser.Node, rollupCfg *rollup.Config) { + sampleRows, topNode, rollupCfg, err := q.fetchSamplesRows(mint, maxt, hints, qh, path, ms) if err != nil { - return errorSeriesSet{err: err}, nil + return errorSeriesSet{err: err}, nil, nil } responseSeriesSet := buildSeriesSet(sampleRows, q.tools.labelsReader) - return responseSeriesSet, topNode + + // debug + //for responseSeriesSet.Next() { + // at := responseSeriesSet.At() + // itr := at.Iterator() + // c := 0 + // for itr.Next() { + // c++ + // } + // fmt.Println("debug count", at.Labels(), c) + //} + // debug + return responseSeriesSet, topNode, rollupCfg } -func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) ([]sampleRow, parser.Node, error) { +func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectHints, qh *QueryHints, path []parser.Node, ms []*labels.Matcher) (rows []sampleRow, topNode parser.Node, cfg *rollup.Config, err error) { metadata, err := getEvaluationMetadata(q.tools, mint, maxt, GetPromQLMetadata(ms, hints, qh, path)) if err != nil { - return nil, nil, fmt.Errorf("get evaluation metadata: %w", err) + return nil, nil, nil, fmt.Errorf("get evaluation metadata: %w", err) + } + + filter := &metadata.timeFilter + + rollupConfig := q.r.Decide(mint/1000, maxt/1000) + if rollupConfig != nil { + // Use metric rollups. + fmt.Println("schema name", rollupConfig.SchemaName()) + if filter.schema == model.SchemaNameLabelName { + // The query belongs to custom Caggs. We need to warn the user that this query will be treated as + // general automatic downsampled query. That's the most we can do. + // If the user wants Caggs query, then he should not enable automatic rollups for querying in CLI flags. + log.Warn("msg", "conflicting schema found. Note: __schema__ & __column__ will be overwritten") + filter.schema = "" + filter.column = "" + } + metadata.rollupConfig = rollupConfig } - filter := metadata.timeFilter if metadata.isSingleMetric { // Single vector selector case. mInfo, err := q.tools.getMetricTableName(filter.schema, filter.metric, false) if err != nil { if err == errors.ErrMissingTableName { - return nil, nil, nil + return nil, nil, nil, nil } - return nil, nil, fmt.Errorf("get metric table name: %w", err) + return nil, nil, nil, fmt.Errorf("get metric table name: %w", err) } metadata.timeFilter.metric = mInfo.TableName metadata.timeFilter.schema = mInfo.TableSchema @@ -54,17 +93,17 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH sampleRows, topNode, err := fetchSingleMetricSamples(q.tools, metadata) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return sampleRows, topNode, nil + return sampleRows, topNode, metadata.rollupConfig, nil } // Multiple vector selector case. sampleRows, err := fetchMultipleMetricsSamples(q.tools, metadata) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return sampleRows, nil, nil + return sampleRows, nil, metadata.rollupConfig, nil } // fetchSingleMetricSamples returns all the result rows for a single metric @@ -73,11 +112,13 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH // successfully applied, the new top node is returned together with the metric // rows. For more information about top nodes, see `engine.populateSeries`. func fetchSingleMetricSamples(tools *queryTools, metadata *evalMetadata) ([]sampleRow, parser.Node, error) { - sqlQuery, values, topNode, tsSeries, err := buildSingleMetricSamplesQuery(metadata) + sqlQuery, values, topNode, tsSeries, valueWithoutAgg, err := buildSingleMetricSamplesQuery(metadata) if err != nil { return nil, nil, err } + fmt.Println(sqlQuery) + rows, err := tools.conn.Query(context.Background(), sqlQuery, values...) if err != nil { if e, ok := err.(*pgconn.PgError); ok { @@ -105,7 +146,7 @@ func fetchSingleMetricSamples(tools *queryTools, metadata *evalMetadata) ([]samp } filter := metadata.timeFilter - samplesRows, err := appendSampleRows(make([]sampleRow, 0, 1), rows, tsSeries, updatedMetricName, filter.schema, filter.column) + samplesRows, err := appendSampleRows(make([]sampleRow, 0, 1), rows, tsSeries, updatedMetricName, filter.schema, filter.column, valueWithoutAgg) if err != nil { return nil, topNode, fmt.Errorf("appending sample rows: %w", err) } @@ -173,7 +214,7 @@ func fetchMultipleMetricsSamples(tools *queryTools, metadata *evalMetadata) ([]s return nil, err } // Append all rows into results. - results, err = appendSampleRows(results, rows, nil, "", "", "") + results, err = appendSampleRows(results, rows, nil, "", "", "", false) rows.Close() if err != nil { rows.Close() diff --git a/pkg/pgmodel/querier/rollup/config.go b/pkg/pgmodel/querier/rollup/config.go new file mode 100644 index 0000000000..0e8d228e83 --- /dev/null +++ b/pkg/pgmodel/querier/rollup/config.go @@ -0,0 +1,17 @@ +package rollup + +import "time" + +type Config struct { + schemaName string + interval time.Duration + managerRef *Manager +} + +func (c *Config) Interval() time.Duration { + return c.interval +} + +func (c *Config) SchemaName() string { + return c.schemaName +} diff --git a/pkg/pgmodel/querier/rollup/manager.go b/pkg/pgmodel/querier/rollup/manager.go new file mode 100644 index 0000000000..86703e4181 --- /dev/null +++ b/pkg/pgmodel/querier/rollup/manager.go @@ -0,0 +1,199 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/pgxconn" +) + +const ( + defaultDurationBetweenSamples = 15 * time.Second + low = 500 + high = 5000 +) + +type Manager struct { + conn pgxconn.PgxConn + schemaResolutionCache map[string]time.Duration // schema_name: resolution + metricTypeCache map[string]string // metric_name: metric_type + resolutionInASCOrder []time.Duration +} + +func NewManager(conn pgxconn.PgxConn) *Manager { + rollup := &Manager{ + conn: conn, + } + go rollup.refresh() + return rollup +} + +func (r *Manager) refresh() { + refreshInterval := time.NewTicker(time.Minute) + defer refreshInterval.Stop() + for { + if r.refreshRollupResolution() { + r.refreshMetricMetadata() + } + <-refreshInterval.C + } +} + +func (r *Manager) refreshRollupResolution() (proceedToNextStep bool) { + var ( + schemaName []string + resolution []time.Duration + ) + rows, err := r.conn.Query(context.Background(), "SELECT schema_name, resolution FROM _prom_catalog.rollup") + if err != nil { + log.Error("msg", "fetching rollup details", "error", err.Error()) + return false + } + defer rows.Close() + for rows.Next() { + var ( + sname string + dur time.Duration + ) + err = rows.Scan(&sname, &dur) + if err != nil { + log.Error("msg", "error scanning rows", "error", err.Error()) + return false + } + schemaName = append(schemaName, sname) + resolution = append(resolution, dur) + } + fmt.Println(resolution) + if len(resolution) == 0 { + // Optimisation: No need to proceed further. + return false + } + resolutionCache := make(map[string]time.Duration) + for i := 0; i < len(schemaName); i++ { + resolutionCache[schemaName[i]] = resolution[i] + } + sort.Sort(sortDuration(resolution)) // From highest resolution (say 5m) to lowest resolution (say 1h). + r.resolutionInASCOrder = resolution + r.schemaResolutionCache = resolutionCache + return true +} + +func (r *Manager) refreshMetricMetadata() { + var metricName, metricType []string + err := r.conn.QueryRow(context.Background(), "select array_agg(metric_family), array_agg(type) from _prom_catalog.metadata").Scan(&metricName, &metricType) + if err != nil { + log.Error("msg", "fetching metric metadata", "error", err.Error()) + return + } + metadataCache := make(map[string]string) + for i := range metricName { + metadataCache[metricName[i]] = metricType[i] + } + r.metricTypeCache = metadataCache +} + +func (r *Manager) Decide(minSeconds, maxSeconds int64) *Config { + if len(r.resolutionInASCOrder) == 0 { + return nil + } + estimateSamples := func(resolution time.Duration) int64 { + return int64(float64(maxSeconds-minSeconds) / resolution.Seconds()) + } + + //return nil + + estimatedRawSamples := estimateSamples(defaultDurationBetweenSamples) + + fmt.Println("resolution=>", "raw", "estimate=>", estimatedRawSamples) + + if r.withinRange(estimatedRawSamples) || estimatedRawSamples < low || len(r.resolutionInASCOrder) == 0 { + return nil + } + + var acceptableResolution []time.Duration + for _, resolution := range r.schemaResolutionCache { + estimate := estimateSamples(resolution) + fmt.Println("resolution=>", resolution, "estimate=>", estimate) + if r.withinRange(estimate) { + acceptableResolution = append(acceptableResolution, resolution) + } + } + fmt.Println("acceptableResolution", acceptableResolution) + if len(acceptableResolution) == 0 { + // No resolution was found to fit within the permitted range. + // Hence, find the highest resolution that is below upper (max) limit and respond. + for _, res := range r.resolutionInASCOrder { + estimate := estimateSamples(res) + if estimate < high { + return r.getConfig(res) + } + } + // None of the resolutions were below the upper limit. Hence, respond with the lowest available resolution. + lowestResolution := r.resolutionInASCOrder[len(r.resolutionInASCOrder)-1] + // This is the best case in terms of size. + // Example: If 1 hour is the lowest resolution, then all other resolutions will be way above 1 hour. + // Hence, the best answer is 1 hour. + // + // Note: For understanding resolutions, in a case of resolutions [1m, 5m, 15m, 1h, 1w], + // 1m is the highest resolution (since maximum granularity) and 1w is the lowest resolution (due to lowest granularity). + return r.getConfig(lowestResolution) + } + // Choose the highest resolution for maximum granularity. + return r.getConfig(acceptableResolution[0]) +} + +func (r *Manager) withinRange(totalSamples int64) bool { + return low <= totalSamples && totalSamples <= high +} + +// metricTypeColumnRelationship is a relationship between the metric type received for query and a computation of +// set of columns that together compute to `value` of sample. This is because in metric rollups, we do not have a `value` +// column, rather they are stored as separate utility columns like {COUNTER -> [first, last], GAUGE -> [sum, count, min, max]}. +// +// For knowing the reasoning behind this, please read the design doc on metric rollups. +var metricTypeColumnRelationship = map[string]string{ + "GAUGE": "(sum / count)", +} + +func (r *Manager) getConfig(resolution time.Duration) *Config { + schemaName := r.getSchemaFor(resolution) + return &Config{ + schemaName: schemaName, + interval: r.schemaResolutionCache[schemaName], + managerRef: r, + } +} + +func (r *Manager) getSchemaFor(resolution time.Duration) string { + for schema, res := range r.schemaResolutionCache { + if res == resolution { + return schema + } + } + panic(fmt.Sprint( + "No schema found for resolution", + resolution, + "Please open an issue at https://github.com/timescale/promscale/issues", + )) // This will never be the case. +} + +type sortDuration []time.Duration + +func (s sortDuration) Len() int { + return len(s) +} + +func (s sortDuration) Less(i, j int) bool { + return s[i].Seconds() < s[j].Seconds() +} + +func (s sortDuration) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/pkg/pgmodel/querier/rollup/manager_test.go b/pkg/pgmodel/querier/rollup/manager_test.go new file mode 100644 index 0000000000..50d9a04ea0 --- /dev/null +++ b/pkg/pgmodel/querier/rollup/manager_test.go @@ -0,0 +1,131 @@ +package rollup + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/timescale/promscale/pkg/pgxconn" +) + +const noRollupSchema = "" + +func TestDecideRollup(t *testing.T) { + r := &Manager{ + conn: mockPgxConn{}, + schemaResolutionCache: map[string]time.Duration{ + // Raw -> 15 seconds + "hour": time.Hour, + "5_minute": 5 * time.Minute, + "15_minute": 15 * time.Minute, + "week": 7 * 24 * time.Hour, + }, + resolutionInASCOrder: []time.Duration{5 * time.Minute, 15 * time.Minute, time.Hour, 7 * 24 * time.Hour}, + } + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: noRollupSchema, // raw resolution. + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: noRollupSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: noRollupSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: noRollupSchema, + }, { + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: "5_minute", + }, + { + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "5_minute", + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 15 secs + // raw -> 40320 + // + // And, when using following rollup resolutions, num samples: + // 5 mins -> 2016 <-- Falls in the acceptable range. + // 15 mins -> 672 + // 1 hour -> 168 + // 1 week -> 1 + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "hour", + }, { + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "week", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "week", + }, + } + for _, tc := range tcs { + cfg := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + if tc.expectedSchemaName == noRollupSchema { + require.Nil(t, cfg) + continue + } + require.Equal(t, tc.expectedSchemaName, cfg.schemaName, tc.name) + } +} + +type mockRow struct{} + +func (mockRow) Scan(dest ...interface{}) error { return nil } + +type mockPgxConn struct{} + +func (mockPgxConn) Close() {} +func (mockPgxConn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { + return pgconn.CommandTag{}, nil +} +func (mockPgxConn) Query(ctx context.Context, sql string, args ...interface{}) (pgxconn.PgxRows, error) { + return nil, nil +} +func (mockPgxConn) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { + return mockRow{} +} +func (mockPgxConn) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { + return 0, nil +} +func (mockPgxConn) CopyFromRows(rows [][]interface{}) pgx.CopyFromSource { return nil } +func (mockPgxConn) NewBatch() pgxconn.PgxBatch { return nil } +func (mockPgxConn) SendBatch(ctx context.Context, b pgxconn.PgxBatch) (pgx.BatchResults, error) { + return nil, nil +} +func (mockPgxConn) Acquire(ctx context.Context) (*pgxpool.Conn, error) { return nil, nil } +func (mockPgxConn) BeginTx(ctx context.Context) (pgx.Tx, error) { return nil, nil } diff --git a/pkg/pgmodel/querier/rollup/promql_optimize.go b/pkg/pgmodel/querier/rollup/promql_optimize.go new file mode 100644 index 0000000000..a7cfe84828 --- /dev/null +++ b/pkg/pgmodel/querier/rollup/promql_optimize.go @@ -0,0 +1,126 @@ +package rollup + +type ( + metricType string + columnInformation struct { + columnName string + instantQueryAgg string + onlyForInstantQuery bool + } +) + +var instantQueryFuncColumn = map[metricType]map[string]columnInformation{ // Relationship=> metric_type: PromQL_function_name: info_on_which_rollup_column_to_use + "GAUGE": { + "": {columnName: "sum / count"}, // When no function is used. + "avg_over_time": {columnName: "sum, count", instantQueryAgg: "sum(sum) / sum(value)"}, + "min_over_time": {columnName: "min", instantQueryAgg: "min(value)"}, + "max_over_time": {columnName: "max", instantQueryAgg: "max(value)"}, + "sum_over_time": {columnName: "sum", instantQueryAgg: "sum(value)"}, + "count_over_time": {columnName: "count", instantQueryAgg: "sum(value)"}, // Since we want to sum all the counts of each bucket. + }, + "COUNTER": { + "": {columnName: "last_with_counter_reset"}, + }, + "HISTOGRAM": { + "": {columnName: "last_with_counter_reset"}, + }, + "SUMMARY": { + "": {columnName: "sum / count"}, + "avg_over_time": {columnName: "sum, count", instantQueryAgg: "sum(sum) / sum(value)"}, + "min_over_time": {columnName: "sum / count", instantQueryAgg: "min(value)"}, + "max_over_time": {columnName: "sum / count", instantQueryAgg: "max(value)"}, + "sum_over_time": {columnName: "sum", instantQueryAgg: "sum(value)"}, + "count_over_time": {columnName: "count", instantQueryAgg: "sum(value)"}, + }, +} + +var rangeQueryFuncColumn = map[metricType]map[string]string{ + "GAUGE": { + "": "sum / count", // When no function is used. + "min_over_time": "min", + "max_over_time": "max", + "sum_over_time": "sum", + "count_over_time": "count", + }, + "COUNTER": { + "": "last_with_counter_reset", + }, + "HISTOGRAM": { + "": "last_with_counter_reset", + }, + "SUMMARY": { + "": "sum / count", + "avg_over_time": "sum, count", + "sum_over_time": "sum", + "count_over_time": "count", + }, +} + +type Optimizer interface { + RegularColumnName() string + GetColumnClause(funcName string) string +} + +type SqlOptimizer struct { + typ metricType +} + +func (c *Config) GetOptimizer(metricName string) *SqlOptimizer { + typ, ok := c.managerRef.metricTypeCache[metricName] + if !ok { + // No metadata found, hence no optimization. + return nil + } + return &SqlOptimizer{ + typ: metricType(typ), + } +} + +func (o *SqlOptimizer) InstantQuery() InstantQuery { + return InstantQuery{o.typ} +} + +func (o *SqlOptimizer) RangeQuery() RangeQuery { + return RangeQuery{o.typ} +} + +type InstantQuery struct { + typ metricType +} + +func (i InstantQuery) RegularColumnName() string { + return instantQueryFuncColumn[i.typ][""].columnName +} + +func (i InstantQuery) GetColumnClause(funcName string) string { + clause, exists := instantQueryFuncColumn[i.typ][funcName] + if !exists { + return "" + } + return clause.columnName +} + +func (i InstantQuery) GetAggForInstantQuery(funcName string) string { + r := instantQueryFuncColumn[i.typ] + c, supported := r[funcName] + if !supported { + return "" + } + return c.instantQueryAgg +} + +type RangeQuery struct { + typ metricType +} + +func (r RangeQuery) RegularColumnName() string { + return rangeQueryFuncColumn[r.typ][""] +} + +func (r RangeQuery) GetColumnClause(funcName string) string { + clause, exists := rangeQueryFuncColumn[r.typ][funcName] + if !exists { + return "" + } + return clause +} diff --git a/pkg/pgmodel/querier/row.go b/pkg/pgmodel/querier/row.go index 5d535686d2..47288592dc 100644 --- a/pkg/pgmodel/querier/row.go +++ b/pkg/pgmodel/querier/row.go @@ -2,6 +2,7 @@ package querier import ( "encoding/binary" + "fmt" "sync" "github.com/jackc/pgtype" @@ -164,7 +165,7 @@ func (r *sampleRow) GetAdditionalLabels() (ll labels.Labels) { // appendSampleRows adds new results rows to already existing result rows and // returns the as a result. -func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSeries, metric, schema, column string) ([]sampleRow, error) { +func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSeries, metric, schema, column string, valueWithoutAgg bool) ([]sampleRow, error) { if in.Err() != nil { return out, in.Err() } @@ -179,7 +180,19 @@ func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSer times := tPool.Get().(*pgtype.TimestamptzArray) times.Elements = times.Elements[:0] timesWrapper := timestamptzArrayWrapper{times} - row.err = in.Scan(&row.labelIds, ×Wrapper, &valuesWrapper) + if valueWithoutAgg { + var ( + flt pgtype.Float8 + ts pgtype.Timestamptz + ) + row.err = in.Scan(&row.labelIds, &ts, &flt) + timesWrapper.Elements = []pgtype.Timestamptz{ts} + valuesWrapper.Elements = []pgtype.Float8{flt} + } else { + row.err = in.Scan(&row.labelIds, ×Wrapper, &valuesWrapper) + } + + fmt.Println("sql layer, total raw samples", len(valuesWrapper.Elements)) row.timeArrayOwnership = times row.times = newRowTimestampSeries(times) } else { diff --git a/pkg/promql/custom_iterator.go b/pkg/promql/custom_iterator.go new file mode 100644 index 0000000000..fe5780a767 --- /dev/null +++ b/pkg/promql/custom_iterator.go @@ -0,0 +1,60 @@ +package promql + +import ( + "fmt" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +type samplesIterator interface { + Reset(it chunkenc.Iterator) + PeekPrev() (t int64, v float64, ok bool) + Seek(t int64) bool + Next() bool + At() (int64, float64) + Err() error +} + +type rollupItr struct { + it chunkenc.Iterator + prevValue float64 + prevTime int64 +} + +func newRollupIterator() *rollupItr { + fmt.Println("using rollup iterator") + return &rollupItr{} +} + +func (r *rollupItr) Reset(it chunkenc.Iterator) { + r.it = it +} + +func (r *rollupItr) PeekPrev() (t int64, v float64, ok bool) { + if r.prevTime == 0 { + return 0, 0, false + } + return r.prevTime, r.prevValue, true +} + +func (r *rollupItr) Next() bool { + return r.it.Next() +} + +func (r *rollupItr) At() (int64, float64) { + return r.it.At() +} + +func (r *rollupItr) Seek(t int64) bool { + for r.Next() { + ts, v := r.At() + r.prevTime, r.prevValue = ts, v + if ts >= t { + return true + } + } + return false +} + +func (r *rollupItr) Err() error { + return r.it.Err() +} diff --git a/pkg/promql/engine.go b/pkg/promql/engine.go index 85370ab36c..cc8634efc4 100644 --- a/pkg/promql/engine.go +++ b/pkg/promql/engine.go @@ -45,6 +45,7 @@ import ( "github.com/prometheus/prometheus/util/stats" pgquerier "github.com/timescale/promscale/pkg/pgmodel/querier" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" "github.com/timescale/promscale/pkg/util" ) @@ -73,7 +74,7 @@ type SamplesQuerier interface { // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. - Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (storage.SeriesSet, parser.Node) + Select(sortSeries bool, hints *storage.SelectHints, qh *pgquerier.QueryHints, nodes []parser.Node, matchers ...*labels.Matcher) (seriesSet storage.SeriesSet, topNode parser.Node, cfg *rollup.Config) } const ( @@ -648,7 +649,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } defer querier.Close() - topNodes := ng.populateSeries(querier, s) + topNodes, rollupInterval := ng.populateSeries(querier, s, s.Start == s.End) prepareSpanTimer.Finish() // Modify the offset of vector and matrix selectors for the @ modifier @@ -666,9 +667,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: ng.lookbackDelta, - topNodes: topNodes, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, + topNodes: topNodes, + usingRollups: rollupInterval > 0, } query.sampleStats.InitStepTracking(start, start, 1) @@ -690,6 +692,8 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval panic(errors.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } + fmt.Println("query instant: total samples", mat.TotalSamples()) + query.matrix = mat switch s.Expr.Type() { case parser.ValueTypeVector: @@ -711,10 +715,32 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } // Range evaluation. + var intervalMs int64 + + if rollupInterval > s.Interval { + // Rollups evaluated the step interval. Hence, just use this step in rollup. + // This is important, as it **avoids unnecessary samples** due to .Seek() as the values remain the same + // between the rollup intervals, thereby **speeding up evaluation** involving rollups. + // + // Example: If s.Interval (from received range query) is 5s, and rollup evaluated is of 5m, then if we were + // to pass s.Interval here, then in VectorSelector evaluation, the `ts += ev.interval` will cause .Seek() (in vectorSelectorSingle()) + // to repeat the values of the previous rollup sample for entire duration until next rollup sample is reached. + // + // IMPORTANT: This hinders the very aim of rollups, which is to speed up calculations by parent functions, since now, + // the parent functions are essentially working on the same NUMBER OF SAMPLES (caused by step from received query) + // which without the rollups would be the case. + // + // We do `rollupInterval > s.Interval` so that if the range query's interval is already greater than rollupInterval + // the we do not over respond with higher granularity than needed. + intervalMs = durationMilliseconds(rollupInterval) + } else { + intervalMs = durationMilliseconds(s.Interval) + } + evaluator := &evaluator{ startTimestamp: timeMilliseconds(s.Start), endTimestamp: timeMilliseconds(s.End), - interval: durationMilliseconds(s.Interval), + interval: intervalMs, ctx: ctxInnerEval, maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, @@ -722,6 +748,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, topNodes: topNodes, + usingRollups: rollupInterval > 0, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) @@ -736,6 +763,8 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval } query.matrix = mat + fmt.Println("query range: total samples", mat.TotalSamples()) + if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, warnings, err } @@ -858,14 +887,12 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS // evaluation. These terminal nodes are called "top nodes". // // populateSeries returns a map keyed by all top nodes. -func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) map[parser.Node]struct{} { - var ( - // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. - // The evaluation of the VectorSelector inside then evaluates the given range and unsets - // the variable. - evalRange time.Duration - topNodes map[parser.Node]struct{} = make(map[parser.Node]struct{}) - ) +func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt, isInstantQuery bool) (topNodes map[parser.Node]struct{}, rollupInterval time.Duration) { + // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. + // The evaluation of the VectorSelector inside then evaluates the given range and unsets + // the variable. + var evalRange time.Duration + topNodes = make(map[parser.Node]struct{}) parser.Inspect(evalStmt.Expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { @@ -881,14 +908,18 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt } qh = &pgquerier.QueryHints{ - CurrentNode: n, - Lookback: ng.lookbackDelta, + CurrentNode: n, + Lookback: ng.lookbackDelta, + IsInstantQuery: isInstantQuery, } evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) - set, topNode := querier.Select(false, hints, qh, path, n.LabelMatchers...) + set, topNode, rollupCfg := querier.Select(false, hints, qh, path, n.LabelMatchers...) topNodes[topNode] = struct{}{} + if rollupCfg != nil { + rollupInterval = rollupCfg.Interval() + } n.UnexpandedSeriesSet = set case *parser.MatrixSelector: @@ -896,7 +927,7 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalSt } return nil }) - return topNodes + return topNodes, rollupInterval } // extractFuncFromPath walks up the path and searches for the first instance of @@ -978,9 +1009,11 @@ type evaluator struct { currentSamples int logger log.Logger lookbackDelta time.Duration - topNodes map[parser.Node]struct{} samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 + + topNodes map[parser.Node]struct{} + usingRollups bool } // errorf causes a panic with the input formatted into an error. @@ -1315,6 +1348,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { defer span.End() if _, isTopNode := ev.topNodes[expr]; isTopNode { + fmt.Println("in eval() topNode", ev, ev.topNodes[expr]) // the storage layer has already processed this node. Just return // the result. var ( @@ -1348,6 +1382,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { if mat == nil { ev.error(fmt.Errorf("Matrix not filled in")) } + fmt.Println("total samples in mat", mat.TotalSamples()) return mat, warnings } @@ -1666,7 +1701,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } mat := make(Matrix, 0, len(e.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + it := decideSamplesIterator(ev) for i, s := range e.Series { it.Reset(s.Iterator()) ss := Series{ @@ -1804,6 +1839,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(errors.Errorf("unhandled expression of type: %T", expr)) } +func decideSamplesIterator(ev *evaluator) samplesIterator { + if ev.usingRollups { + return newRollupIterator() + } + return storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) +} + // vectorSelector evaluates a *parser.VectorSelector expression. func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { ws, err := checkAndExpandSeriesSet(ev.ctx, node) @@ -1811,7 +1853,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } vec := make(Vector, 0, len(node.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + it := decideSamplesIterator(ev) for i, s := range node.Series { it.Reset(s.Iterator()) @@ -1834,7 +1876,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } // vectorSelectorSingle evaluates a instant vector for the iterator of one time series. -func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { +func (ev *evaluator) vectorSelectorSingle(it samplesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { refTime := ts - durationMilliseconds(node.Offset) var t int64 var v float64 @@ -1965,7 +2007,26 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m ev.currentSamples++ out = append(out, Point{T: t, V: v}) } - } + //else if ev.usingRollups { + // fmt.Println("ok in PeekBack", ok) + // out = append(out, Point{T: t, V: v}) + // ev.currentSamples++ + //} + } + + //if len(out) == 0 && ev.usingRollups { + // // Not enough samples, so to get a minimum value, we need atleast 2 samples. Lets add the last and the second last. + // fmt.Println("CASE: len(out) == 0 && ev.usingRollups") + // t, v, ok := it.PeekBack(2) + // if ok { + // out = append(out, Point{T: t, V: v}) + // } + // t, v, ok = it.PeekBack(1) + // if ok { + // out = append(out, Point{T: t, V: v}) + // } + //} + // The seeked sample might also be in the range. if ok { t, v := it.At() @@ -1978,6 +2039,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } } ev.samplesStats.UpdatePeak(ev.currentSamples) + //fmt.Println("len of out", len(out)) return out } diff --git a/pkg/promql/test.go b/pkg/promql/test.go index c758db3999..67e9b84f5e 100644 --- a/pkg/promql/test.go +++ b/pkg/promql/test.go @@ -36,6 +36,7 @@ import ( "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/assert" "github.com/timescale/promscale/pkg/pgmodel/querier" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" ) var ( @@ -110,9 +111,9 @@ type QuerierWrapper struct { storage.Querier } -func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node) { +func (t *QuerierWrapper) Select(b bool, sh *storage.SelectHints, _ *querier.QueryHints, _ []parser.Node, m ...*labels.Matcher) (storage.SeriesSet, parser.Node, *rollup.Config) { ss := t.Querier.Select(b, sh, m...) - return ss, nil + return ss, nil, nil } func (t *QuerierWrapper) LabelValues(n string) ([]string, storage.Warnings, error) { diff --git a/pkg/query/queryable.go b/pkg/query/queryable.go index dbacdb9dd3..1845b1aa03 100644 --- a/pkg/query/queryable.go +++ b/pkg/query/queryable.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/lreader" pgQuerier "github.com/timescale/promscale/pkg/pgmodel/querier" + "github.com/timescale/promscale/pkg/pgmodel/querier/rollup" "github.com/timescale/promscale/pkg/promql" ) @@ -65,9 +66,15 @@ func (q *samplesQuerier) Close() { } } -func (q *samplesQuerier) Select(sortSeries bool, hints *storage.SelectHints, qh *pgQuerier.QueryHints, path []parser.Node, matchers ...*labels.Matcher) (storage.SeriesSet, parser.Node) { +func (q *samplesQuerier) Select( + sortSeries bool, + hints *storage.SelectHints, + qh *pgQuerier.QueryHints, + path []parser.Node, + matchers ...*labels.Matcher, +) (seriesSet storage.SeriesSet, topNode parser.Node, config *rollup.Config) { qry := q.metricsReader.SamplesQuerier() - ss, n := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) + ss, n, cfg := qry.Select(q.mint, q.maxt, sortSeries, hints, qh, path, matchers...) q.seriesSets = append(q.seriesSets, ss) - return ss, n + return ss, n, cfg } diff --git a/pkg/rules/adapters/query.go b/pkg/rules/adapters/query.go index 59455dea9e..7db0b4bf95 100644 --- a/pkg/rules/adapters/query.go +++ b/pkg/rules/adapters/query.go @@ -33,7 +33,9 @@ type querierAdapter struct { func (q querierAdapter) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { // Pushdowns are not supported here. This is fine as Prometheus rule-manager only uses queryable to know // the previous state of the alert. This function is not used in recording/alerting rules evaluation. - seriesSet, _ := q.qr.Select(sortSeries, hints, nil, nil, matchers...) + // TODO (harkishen) after PoC: Note: Maybe passdown as param that do we want rollups to be used or + // not for this evaluation. + seriesSet, _, _ := q.qr.Select(sortSeries, hints, nil, nil, matchers...) return seriesSet } diff --git a/pkg/thanos/store.go b/pkg/thanos/store.go index 89ce14fc21..0532ed3708 100644 --- a/pkg/thanos/store.go +++ b/pkg/thanos/store.go @@ -41,7 +41,8 @@ func (fc *Storage) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesSe } defer q.Close() - ss, _ := q.Select(false, nil, nil, nil, matchers...) + // TODO (harkishen) after PoC + ss, _, _ := q.Select(false, nil, nil, nil, matchers...) for ss.Next() { series := ss.At()