From e1d967fb294c7a2b8127abca7f12fa1feb1acb90 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Tue, 18 Oct 2022 18:56:05 +0530 Subject: [PATCH 1/7] Adds support to create/delete metric rollups via dataset-config. Signed-off-by: Harkishen-Singh This commit adds support for creation or deletion of metric rollups by reading the dataset-config file. The dataset config file contains a new field `downsample:` which supports 2 options: `automatic` & `resolution`. If `automatic` is false, then connector updates in the database so that `_prom_catalog.scan_for_new_rollups` does not create any new rollups (Caggs). `resolution` is a comma separated list of resolutions for downsampling, in a way that label:resolution:retention is the format that is needed. Example: `short:5m:90d,long:1h:365d` -> The resolution works in a strict manner. Its aim is to make sure that the database contains exactly those resolutions that are mentioned in the dataset config. Like, if the database already contains `short` and `long` downsampled data and the dataset is supplied with `short:5m:90d,very_long:1d:365d` then `long` downsampled case is deleted and `very_long` is created. -> `resolution` is applied if `automatic` is `true`. If automatic is not given in dataset config, it defaults to `true` based on our plan to keep downsampling applied as a default setting. --- pkg/dataset/config.go | 35 +++-- pkg/dataset/config_test.go | 33 +++-- pkg/dataset/downsample.go | 76 ++++++++++ pkg/rollup/rollup.go | 133 +++++++++++++++++ pkg/rollup/rollup_test.go | 138 ++++++++++++++++++ pkg/runner/client.go | 7 + .../end_to_end_tests/config_dataset_test.go | 11 +- .../duration.go => util/day_duration.go} | 6 +- 8 files changed, 405 insertions(+), 34 deletions(-) create mode 100644 pkg/dataset/downsample.go create mode 100644 pkg/rollup/rollup.go create mode 100644 pkg/rollup/rollup_test.go rename pkg/{dataset/duration.go => util/day_duration.go} (89%) diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index 7fd0e96a61..b6c7a41806 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package dataset import ( @@ -6,8 +10,10 @@ import ( "time" "github.com/jackc/pgx/v4" - "github.com/timescale/promscale/pkg/log" "gopkg.in/yaml.v2" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/util" ) const ( @@ -38,16 +44,17 @@ type Config struct { // Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval DayDuration `yaml:"default_chunk_interval"` - Compression *bool `yaml:"compress_data"` // Using pointer to check if the the value was set. - HALeaseRefresh DayDuration `yaml:"ha_lease_refresh"` - HALeaseTimeout DayDuration `yaml:"ha_lease_timeout"` - RetentionPeriod DayDuration `yaml:"default_retention_period"` + ChunkInterval util.DayDuration `yaml:"default_chunk_interval"` + Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. + HALeaseRefresh util.DayDuration `yaml:"ha_lease_refresh"` + HALeaseTimeout util.DayDuration `yaml:"ha_lease_timeout"` + RetentionPeriod util.DayDuration `yaml:"default_retention_period"` + Downsample `yaml:"downsample,omitempty"` } // Traces contains dataset configuration options for traces data. type Traces struct { - RetentionPeriod DayDuration `yaml:"default_retention_period"` + RetentionPeriod util.DayDuration `yaml:"default_retention_period"` } // NewConfig creates a new dataset config based on the configuration YAML contents. @@ -60,6 +67,10 @@ func NewConfig(contents string) (cfg Config, err error) { func (c *Config) Apply(conn *pgx.Conn) error { c.applyDefaults() + if err := c.Downsample.Apply(conn); err != nil { + return fmt.Errorf("error applying configuration for downsampling: %w", err) + } + log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval)) log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression)) log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh)) @@ -87,21 +98,21 @@ func (c *Config) Apply(conn *pgx.Conn) error { func (c *Config) applyDefaults() { if c.Metrics.ChunkInterval <= 0 { - c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval) + c.Metrics.ChunkInterval = util.DayDuration(defaultMetricChunkInterval) } if c.Metrics.Compression == nil { c.Metrics.Compression = &defaultMetricCompressionVar } if c.Metrics.HALeaseRefresh <= 0 { - c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh) + c.Metrics.HALeaseRefresh = util.DayDuration(defaultMetricHALeaseRefresh) } if c.Metrics.HALeaseTimeout <= 0 { - c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout) + c.Metrics.HALeaseTimeout = util.DayDuration(defaultMetricHALeaseTimeout) } if c.Metrics.RetentionPeriod <= 0 { - c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod) + c.Metrics.RetentionPeriod = util.DayDuration(defaultMetricRetentionPeriod) } if c.Traces.RetentionPeriod <= 0 { - c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod) + c.Traces.RetentionPeriod = util.DayDuration(defaultTraceRetentionPeriod) } } diff --git a/pkg/dataset/config_test.go b/pkg/dataset/config_test.go index e9f954abb4..b2b07e5bc3 100644 --- a/pkg/dataset/config_test.go +++ b/pkg/dataset/config_test.go @@ -1,6 +1,7 @@ package dataset import ( + "github.com/timescale/promscale/pkg/util" "testing" "time" @@ -45,7 +46,7 @@ func TestNewConfig(t *testing.T) { default_retention_period: 3d2h`, cfg: Config{ Metrics: Metrics{ - RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour), + RetentionPeriod: util.DayDuration(((3 * 24) + 2) * time.Hour), }, }, }, @@ -61,14 +62,14 @@ traces: default_retention_period: 15d`, cfg: Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(3 * time.Hour), + ChunkInterval: util.DayDuration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: DayDuration(2 * time.Minute), - HALeaseTimeout: DayDuration(5 * time.Second), - RetentionPeriod: DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: util.DayDuration(2 * time.Minute), + HALeaseTimeout: util.DayDuration(5 * time.Second), + RetentionPeriod: util.DayDuration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: DayDuration(15 * 24 * time.Hour), + RetentionPeriod: util.DayDuration(15 * 24 * time.Hour), }, }, }, @@ -97,14 +98,14 @@ func TestApplyDefaults(t *testing.T) { t, Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(defaultMetricChunkInterval), + ChunkInterval: util.DayDuration(defaultMetricChunkInterval), Compression: &defaultMetricCompressionVar, - HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh), - HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout), - RetentionPeriod: DayDuration(defaultMetricRetentionPeriod), + HALeaseRefresh: util.DayDuration(defaultMetricHALeaseRefresh), + HALeaseTimeout: util.DayDuration(defaultMetricHALeaseTimeout), + RetentionPeriod: util.DayDuration(defaultMetricRetentionPeriod), }, Traces: Traces{ - RetentionPeriod: DayDuration(defaultTraceRetentionPeriod), + RetentionPeriod: util.DayDuration(defaultTraceRetentionPeriod), }, }, c, @@ -112,14 +113,14 @@ func TestApplyDefaults(t *testing.T) { untouched := Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(3 * time.Hour), + ChunkInterval: util.DayDuration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: DayDuration(2 * time.Minute), - HALeaseTimeout: DayDuration(5 * time.Second), - RetentionPeriod: DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: util.DayDuration(2 * time.Minute), + HALeaseTimeout: util.DayDuration(5 * time.Second), + RetentionPeriod: util.DayDuration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: DayDuration(15 * 24 * time.Hour), + RetentionPeriod: util.DayDuration(15 * 24 * time.Hour), }, } diff --git a/pkg/dataset/downsample.go b/pkg/dataset/downsample.go new file mode 100644 index 0000000000..4a61275408 --- /dev/null +++ b/pkg/dataset/downsample.go @@ -0,0 +1,76 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package dataset + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v4" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/rollup" +) + +const ( + defaultDownsampleState = true + defaultDownsampleResolution = "short:5m:90d,long:1h:395d" // label:resolution:retention +) + +var ( + setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" + setDefaultDownsampleResolutionSQL = "SELECT prom_api.set_downsample_resolution($1)" + + defaultDownsampleStateVar = defaultDownsampleState +) + +type Downsample struct { + Automatic *bool `yaml:"automatic,omitempty"` + Resolution string `yaml:"resolution,omitempty"` +} + +func (d *Downsample) Apply(conn *pgx.Conn) error { + d.applyDefaults() + + queries := map[string]interface{}{ + setDefaultDownsampleStateSQL: d.Automatic, + } + if *d.Automatic { + if err := handleDownsampling(conn, d.Resolution); err != nil { + return fmt.Errorf("handle downsample: %w", err) + } + queries[setDefaultDownsampleResolutionSQL] = d.Resolution + log.Info("msg", fmt.Sprintf("Setting metric downsample resolution to %s", d.Resolution)) + } + log.Info("msg", fmt.Sprintf("Setting metric automatic downsample to %t", *d.Automatic)) + + for sql, param := range queries { + if _, err := conn.Exec(context.Background(), sql, param); err != nil { + return err + } + } + return nil +} + +func (d *Downsample) applyDefaults() { + if d.Automatic == nil { + // In default case, we plan to downsample data. + d.Automatic = &defaultDownsampleStateVar + } + if d.Resolution == "" { + d.Resolution = defaultDownsampleResolution + } +} + +func handleDownsampling(conn *pgx.Conn, resolutionStr string) error { + downsampleResolutions, err := rollup.Parse(resolutionStr) + if err != nil { + return fmt.Errorf("error parsing downsample resolution: %w", err) + } + if err = rollup.EnsureRollupWith(conn, downsampleResolutions); err != nil { + return fmt.Errorf("ensure rollup with: %w", err) + } + return nil +} diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go new file mode 100644 index 0000000000..c42cdf8ff3 --- /dev/null +++ b/pkg/rollup/rollup.go @@ -0,0 +1,133 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/jackc/pgx/v4" + "github.com/timescale/promscale/pkg/util" +) + +type DownsampleResolution struct { + label string + resolution util.DayDuration + retention util.DayDuration +} + +// Parse parses the rollup resolution string and returns true if it respects the format of downsampling resolution, +// which is a string of comma separated label:resolution:retention. +// Example: short:5m:90d,long:1h:395d +func Parse(resolutionStr string) ([]DownsampleResolution, error) { + resolutions := strings.Split(resolutionStr, ",") + if len(resolutions) < 1 { + return nil, fmt.Errorf("resolutions cannot be less than 1") + } + var r []DownsampleResolution + for _, resolution := range resolutions { + resolution = strings.TrimSpace(resolution) + if resolution == "" { + continue + } + + items := strings.Split(resolution, ":") + if len(items) != 3 { + return nil, fmt.Errorf("expected items not found: needed in format of label:resolution:retention but found %v for %s resolution", strings.Join(items, ":"), resolution) + } + lName := items[0] + + var res util.DayDuration + err := res.UnmarshalText([]byte(items[1])) + if err != nil { + return nil, fmt.Errorf("error parsing resolution %s: %w", items[1], err) + } + + var retention util.DayDuration + err = retention.UnmarshalText([]byte(items[2])) + if err != nil { + return nil, fmt.Errorf("error parsing retention %s: %w", items[2], err) + } + r = append(r, DownsampleResolution{lName, res, retention}) + } + return r, nil +} + +// EnsureRollupWith ensures "strictly" that the given new resolutions are applied in the database. +// Note: It follows a "strict" behaviour, meaning any existing resolutions of downsampling in +// the database will be removed, so that the all downsampling data in the database strictly +// matches the provided newResolutions. +func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) error { + rows, err := conn.Query(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup") + if err != nil { + return fmt.Errorf("querying existing resolutions: %w", err) + } + defer rows.Close() + + var existingResolutions []DownsampleResolution + for rows.Next() { + var lName string + var resolution, retention time.Duration + if err := rows.Scan(&lName, &resolution, &retention); err != nil { + return fmt.Errorf("error scanning output rows for existing resolutions: %w", err) + } + existingResolutions = append(existingResolutions, DownsampleResolution{label: lName, resolution: util.DayDuration(resolution), retention: util.DayDuration(retention)}) + } + + // Determine which resolutions need to be created and deleted from the DB. + pendingCreation := diff(newResolutions, existingResolutions) + pendingDeletion := diff(existingResolutions, newResolutions) + + // Delete rollups that are no longer required. + if err = deleteRollups(conn, pendingDeletion); err != nil { + return fmt.Errorf("delete rollups: %w", err) + } + + // Create new rollups. + if err = createRollups(conn, pendingCreation); err != nil { + return fmt.Errorf("create rollups: %w", err) + } + return nil +} + +func createRollups(conn *pgx.Conn, res []DownsampleResolution) error { + for _, r := range res { + _, err := conn.Exec(context.Background(), "CALL _prom_catalog.create_metric_rollup($1, $2, $3)", r.label, time.Duration(r.resolution), time.Duration(r.retention)) + if err != nil { + return fmt.Errorf("error creating rollup for %s: %w", r.label, err) + } + } + return nil +} + +func deleteRollups(conn *pgx.Conn, res []DownsampleResolution) error { + for _, r := range res { + _, err := conn.Exec(context.Background(), "CALL _prom_catalog.delete_metric_rollup($1)", r.label) + if err != nil { + return fmt.Errorf("error deleting rollup for %s: %w", r.label, err) + } + } + return nil +} + +// diff returns the elements of a that are not in b. +func diff(a, b []DownsampleResolution) []DownsampleResolution { + var difference []DownsampleResolution + for i := range a { + found := false + for j := range b { + if a[i].label == b[j].label { + found = true + break + } + } + if !found { + difference = append(difference, a[i]) + } + } + return difference +} diff --git a/pkg/rollup/rollup_test.go b/pkg/rollup/rollup_test.go new file mode 100644 index 0000000000..10b4b78de1 --- /dev/null +++ b/pkg/rollup/rollup_test.go @@ -0,0 +1,138 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/util" +) + +func TestParse(t *testing.T) { + tcs := []struct { + name string + input string + numResolutions int + parsedResolution []DownsampleResolution + shouldFail bool + }{ + { + name: "default entries", + input: "short:5m:90d,long:1h:395d", + parsedResolution: []DownsampleResolution{ + {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + {"long", util.DayDuration(time.Hour), util.DayDuration(395 * 24 * time.Hour)}, + }, + numResolutions: 2, + }, + { + name: "default entries with space", + input: "short:5m:90d, long:1h:395d", + parsedResolution: []DownsampleResolution{ + {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + {"long", util.DayDuration(time.Hour), util.DayDuration(395 * 24 * time.Hour)}, + }, + numResolutions: 2, + }, + { + name: "only short", + input: "short:5m:90d", + parsedResolution: []DownsampleResolution{ + {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + }, + numResolutions: 1, + }, + { + name: "only short with comma", + input: "short:5m:90d,", + parsedResolution: []DownsampleResolution{ + {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + }, + numResolutions: 1, + }, + { + name: "only short with space", + input: "short:5m:90d, ", + parsedResolution: []DownsampleResolution{ + {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + }, + numResolutions: 1, + }, + { + name: "mix", + input: "short:5m:90d,long:1h:395d, very_long:2d:365d , very_short:1m:90d", + parsedResolution: []DownsampleResolution{ + {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + {"long", util.DayDuration(time.Hour), util.DayDuration(395 * 24 * time.Hour)}, + {"very_long", util.DayDuration(2 * 24 * time.Hour), util.DayDuration(365 * 24 * time.Hour)}, + {"very_short", util.DayDuration(time.Minute), util.DayDuration(90 * 24 * time.Hour)}, + }, + numResolutions: 4, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + resolutions, err := Parse(tc.input) + if tc.shouldFail { + require.Error(t, err, tc.name) + return + } + require.NoError(t, err, tc.name) + require.Equal(t, tc.numResolutions, len(resolutions), tc.name) + require.Equal(t, tc.parsedResolution, resolutions, tc.name) + }) + } +} + +func TestDiff(t *testing.T) { + tcs := []struct { + name string + a, b, expected []DownsampleResolution + }{ + { + name: "some inclusive elements", + a: []DownsampleResolution{{label: "a"}, {label: "b"}, {label: "c"}, {label: "d"}}, + b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, + expected: []DownsampleResolution{{label: "a"}, {label: "b"}}, + }, + { + name: "b superset of a", + a: []DownsampleResolution{{label: "c"}, {label: "d"}}, + b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, + expected: []DownsampleResolution(nil), + }, + { + name: "a empty", + a: []DownsampleResolution{}, + b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, + expected: []DownsampleResolution(nil), + }, + { + name: "all elements exclusive", + a: []DownsampleResolution{{label: "a"}}, + b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, + expected: []DownsampleResolution{{label: "a"}}, + }, + { + name: "same", + a: []DownsampleResolution{{label: "a"}, {label: "b"}, {label: "c"}, {label: "d"}}, + b: []DownsampleResolution{{label: "a"}, {label: "b"}, {label: "c"}, {label: "d"}}, + expected: []DownsampleResolution(nil), + }, + { + name: "empty", + a: []DownsampleResolution{}, + b: []DownsampleResolution{}, + expected: []DownsampleResolution(nil), + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, diff(tc.a, tc.b), tc.name) + }) + } +} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 63dbd49f4c..090fa07013 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -170,6 +170,13 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } + } else { + // We apply downsampling settings even when DatasetConfig is not given, which is the most common case. + downsampleCfg := &dataset.Downsample{} + err = downsampleCfg.Apply(conn) + if err != nil { + return nil, fmt.Errorf("error applying downsampling configuration: %w", err) + } } // client has to be initiated after migrate since migrate diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index 2468062add..2d264b9139 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -2,6 +2,7 @@ package end_to_end_tests import ( "context" + "github.com/timescale/promscale/pkg/util" "testing" "time" @@ -28,14 +29,14 @@ func TestDatasetConfigApply(t *testing.T) { cfg := dataset.Config{ Metrics: dataset.Metrics{ - ChunkInterval: dataset.DayDuration(4 * time.Hour), + ChunkInterval: util.DayDuration(4 * time.Hour), Compression: &disableCompression, - HALeaseRefresh: dataset.DayDuration(15 * time.Second), - HALeaseTimeout: dataset.DayDuration(2 * time.Minute), - RetentionPeriod: dataset.DayDuration(15 * 24 * time.Hour), + HALeaseRefresh: util.DayDuration(15 * time.Second), + HALeaseTimeout: util.DayDuration(2 * time.Minute), + RetentionPeriod: util.DayDuration(15 * 24 * time.Hour), }, Traces: dataset.Traces{ - RetentionPeriod: dataset.DayDuration(10 * 24 * time.Hour), + RetentionPeriod: util.DayDuration(10 * 24 * time.Hour), }, } diff --git a/pkg/dataset/duration.go b/pkg/util/day_duration.go similarity index 89% rename from pkg/dataset/duration.go rename to pkg/util/day_duration.go index 14d6b9bfaf..52e8302a80 100644 --- a/pkg/dataset/duration.go +++ b/pkg/util/day_duration.go @@ -1,4 +1,8 @@ -package dataset +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package util import ( "fmt" From 8d1dbbcdc5a4052129ec6d37fc2c84e70ac51334 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Tue, 25 Oct 2022 17:28:15 +0530 Subject: [PATCH 2/7] Update resolution config and add E2E tests for creation/deletion of rollups. Signed-off-by: Harkishen-Singh --- pkg/dataset/config.go | 8 +- pkg/dataset/downsample.go | 60 ++++++------- pkg/rollup/rollup.go | 67 +++++---------- pkg/rollup/rollup_test.go | 100 +++------------------- pkg/runner/client.go | 7 -- pkg/tests/end_to_end_tests/rollup_test.go | 71 +++++++++++++++ 6 files changed, 133 insertions(+), 180 deletions(-) create mode 100644 pkg/tests/end_to_end_tests/rollup_test.go diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index b6c7a41806..a3b7cfb7f9 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -49,7 +49,7 @@ type Metrics struct { HALeaseRefresh util.DayDuration `yaml:"ha_lease_refresh"` HALeaseTimeout util.DayDuration `yaml:"ha_lease_timeout"` RetentionPeriod util.DayDuration `yaml:"default_retention_period"` - Downsample `yaml:"downsample,omitempty"` + Downsample *Downsample `yaml:"downsample,omitempty"` } // Traces contains dataset configuration options for traces data. @@ -67,8 +67,10 @@ func NewConfig(contents string) (cfg Config, err error) { func (c *Config) Apply(conn *pgx.Conn) error { c.applyDefaults() - if err := c.Downsample.Apply(conn); err != nil { - return fmt.Errorf("error applying configuration for downsampling: %w", err) + if c.Metrics.Downsample != nil { + if err := c.Metrics.Downsample.Apply(conn); err != nil { + return fmt.Errorf("error applying configuration for downsampling: %w", err) + } } log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval)) diff --git a/pkg/dataset/downsample.go b/pkg/dataset/downsample.go index 4a61275408..cb4ab5b633 100644 --- a/pkg/dataset/downsample.go +++ b/pkg/dataset/downsample.go @@ -7,48 +7,51 @@ package dataset import ( "context" "fmt" + "time" "github.com/jackc/pgx/v4" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/rollup" + "github.com/timescale/promscale/pkg/util" ) -const ( - defaultDownsampleState = true - defaultDownsampleResolution = "short:5m:90d,long:1h:395d" // label:resolution:retention -) +const defaultDownsampleState = true var ( - setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" - setDefaultDownsampleResolutionSQL = "SELECT prom_api.set_downsample_resolution($1)" + setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" - defaultDownsampleStateVar = defaultDownsampleState + defaultDownsampleStateVar = defaultDownsampleState + defaultDownsampleResolution = []rollup.DownsampleResolution{ + { + Label: "short", + Resolution: util.DayDuration(5 * time.Minute), + Retention: util.DayDuration(90 * 24 * time.Hour), + }, + { + Label: "long", + Resolution: util.DayDuration(time.Hour), + Retention: util.DayDuration(395 * 24 * time.Hour), + }, + } ) type Downsample struct { - Automatic *bool `yaml:"automatic,omitempty"` - Resolution string `yaml:"resolution,omitempty"` + Automatic *bool `yaml:"automatic,omitempty"` + Resolution []rollup.DownsampleResolution `yaml:"resolutions,omitempty"` } func (d *Downsample) Apply(conn *pgx.Conn) error { d.applyDefaults() - queries := map[string]interface{}{ - setDefaultDownsampleStateSQL: d.Automatic, - } - if *d.Automatic { - if err := handleDownsampling(conn, d.Resolution); err != nil { - return fmt.Errorf("handle downsample: %w", err) - } - queries[setDefaultDownsampleResolutionSQL] = d.Resolution - log.Info("msg", fmt.Sprintf("Setting metric downsample resolution to %s", d.Resolution)) - } log.Info("msg", fmt.Sprintf("Setting metric automatic downsample to %t", *d.Automatic)) + if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Automatic); err != nil { + return err + } - for sql, param := range queries { - if _, err := conn.Exec(context.Background(), sql, param); err != nil { - return err + if *d.Automatic { + if err := rollup.EnsureRollupWith(conn, d.Resolution); err != nil { + return fmt.Errorf("ensure rollup with: %w", err) } } return nil @@ -59,18 +62,7 @@ func (d *Downsample) applyDefaults() { // In default case, we plan to downsample data. d.Automatic = &defaultDownsampleStateVar } - if d.Resolution == "" { + if d.Resolution == nil { d.Resolution = defaultDownsampleResolution } } - -func handleDownsampling(conn *pgx.Conn, resolutionStr string) error { - downsampleResolutions, err := rollup.Parse(resolutionStr) - if err != nil { - return fmt.Errorf("error parsing downsample resolution: %w", err) - } - if err = rollup.EnsureRollupWith(conn, downsampleResolutions); err != nil { - return fmt.Errorf("ensure rollup with: %w", err) - } - return nil -} diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go index c42cdf8ff3..55ede6ad67 100644 --- a/pkg/rollup/rollup.go +++ b/pkg/rollup/rollup.go @@ -7,60 +7,28 @@ package rollup import ( "context" "fmt" - "strings" "time" "github.com/jackc/pgx/v4" + "github.com/timescale/promscale/pkg/util" ) type DownsampleResolution struct { - label string - resolution util.DayDuration - retention util.DayDuration -} - -// Parse parses the rollup resolution string and returns true if it respects the format of downsampling resolution, -// which is a string of comma separated label:resolution:retention. -// Example: short:5m:90d,long:1h:395d -func Parse(resolutionStr string) ([]DownsampleResolution, error) { - resolutions := strings.Split(resolutionStr, ",") - if len(resolutions) < 1 { - return nil, fmt.Errorf("resolutions cannot be less than 1") - } - var r []DownsampleResolution - for _, resolution := range resolutions { - resolution = strings.TrimSpace(resolution) - if resolution == "" { - continue - } - - items := strings.Split(resolution, ":") - if len(items) != 3 { - return nil, fmt.Errorf("expected items not found: needed in format of label:resolution:retention but found %v for %s resolution", strings.Join(items, ":"), resolution) - } - lName := items[0] - - var res util.DayDuration - err := res.UnmarshalText([]byte(items[1])) - if err != nil { - return nil, fmt.Errorf("error parsing resolution %s: %w", items[1], err) - } - - var retention util.DayDuration - err = retention.UnmarshalText([]byte(items[2])) - if err != nil { - return nil, fmt.Errorf("error parsing retention %s: %w", items[2], err) - } - r = append(r, DownsampleResolution{lName, res, retention}) - } - return r, nil + Label string `yaml:"label"` + Resolution util.DayDuration `yaml:"resolution"` + Retention util.DayDuration `yaml:"retention"` } // EnsureRollupWith ensures "strictly" that the given new resolutions are applied in the database. +// // Note: It follows a "strict" behaviour, meaning any existing resolutions of downsampling in // the database will be removed, so that the all downsampling data in the database strictly // matches the provided newResolutions. +// +// Example: If the DB already contains metric rollups for `short` and `long`, and in dataset-config, +// connector sees `very_short` and `long`, then EnsureRollupWith will remove the `short` downsampled data +// and create `very_short`, while not touching `long`. func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) error { rows, err := conn.Query(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup") if err != nil { @@ -75,7 +43,7 @@ func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) err if err := rows.Scan(&lName, &resolution, &retention); err != nil { return fmt.Errorf("error scanning output rows for existing resolutions: %w", err) } - existingResolutions = append(existingResolutions, DownsampleResolution{label: lName, resolution: util.DayDuration(resolution), retention: util.DayDuration(retention)}) + existingResolutions = append(existingResolutions, DownsampleResolution{Label: lName, Resolution: util.DayDuration(resolution), Retention: util.DayDuration(retention)}) } // Determine which resolutions need to be created and deleted from the DB. @@ -96,9 +64,9 @@ func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) err func createRollups(conn *pgx.Conn, res []DownsampleResolution) error { for _, r := range res { - _, err := conn.Exec(context.Background(), "CALL _prom_catalog.create_metric_rollup($1, $2, $3)", r.label, time.Duration(r.resolution), time.Duration(r.retention)) + _, err := conn.Exec(context.Background(), "CALL _prom_catalog.create_rollup($1, $2, $3)", r.Label, time.Duration(r.Resolution), time.Duration(r.Retention)) if err != nil { - return fmt.Errorf("error creating rollup for %s: %w", r.label, err) + return fmt.Errorf("error creating rollup for %s: %w", r.Label, err) } } return nil @@ -106,21 +74,26 @@ func createRollups(conn *pgx.Conn, res []DownsampleResolution) error { func deleteRollups(conn *pgx.Conn, res []DownsampleResolution) error { for _, r := range res { - _, err := conn.Exec(context.Background(), "CALL _prom_catalog.delete_metric_rollup($1)", r.label) + _, err := conn.Exec(context.Background(), "CALL _prom_catalog.delete_rollup($1)", r.Label) if err != nil { - return fmt.Errorf("error deleting rollup for %s: %w", r.label, err) + return fmt.Errorf("error deleting rollup for %s: %w", r.Label, err) } } return nil } // diff returns the elements of a that are not in b. +// +// We need this since we want to support a "strict" behaviour in downsampling. This basically means, to have the exact +// downsampling data in the DB based on what's mentioned in the dataset-config. +// +// See the comment for EnsureRollupWith for example. func diff(a, b []DownsampleResolution) []DownsampleResolution { var difference []DownsampleResolution for i := range a { found := false for j := range b { - if a[i].label == b[j].label { + if a[i].Label == b[j].Label { found = true break } diff --git a/pkg/rollup/rollup_test.go b/pkg/rollup/rollup_test.go index 10b4b78de1..e9822541c8 100644 --- a/pkg/rollup/rollup_test.go +++ b/pkg/rollup/rollup_test.go @@ -6,88 +6,10 @@ package rollup import ( "testing" - "time" "github.com/stretchr/testify/require" - "github.com/timescale/promscale/pkg/util" ) -func TestParse(t *testing.T) { - tcs := []struct { - name string - input string - numResolutions int - parsedResolution []DownsampleResolution - shouldFail bool - }{ - { - name: "default entries", - input: "short:5m:90d,long:1h:395d", - parsedResolution: []DownsampleResolution{ - {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - {"long", util.DayDuration(time.Hour), util.DayDuration(395 * 24 * time.Hour)}, - }, - numResolutions: 2, - }, - { - name: "default entries with space", - input: "short:5m:90d, long:1h:395d", - parsedResolution: []DownsampleResolution{ - {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - {"long", util.DayDuration(time.Hour), util.DayDuration(395 * 24 * time.Hour)}, - }, - numResolutions: 2, - }, - { - name: "only short", - input: "short:5m:90d", - parsedResolution: []DownsampleResolution{ - {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - }, - numResolutions: 1, - }, - { - name: "only short with comma", - input: "short:5m:90d,", - parsedResolution: []DownsampleResolution{ - {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - }, - numResolutions: 1, - }, - { - name: "only short with space", - input: "short:5m:90d, ", - parsedResolution: []DownsampleResolution{ - {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - }, - numResolutions: 1, - }, - { - name: "mix", - input: "short:5m:90d,long:1h:395d, very_long:2d:365d , very_short:1m:90d", - parsedResolution: []DownsampleResolution{ - {"short", util.DayDuration(5 * time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - {"long", util.DayDuration(time.Hour), util.DayDuration(395 * 24 * time.Hour)}, - {"very_long", util.DayDuration(2 * 24 * time.Hour), util.DayDuration(365 * 24 * time.Hour)}, - {"very_short", util.DayDuration(time.Minute), util.DayDuration(90 * 24 * time.Hour)}, - }, - numResolutions: 4, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - resolutions, err := Parse(tc.input) - if tc.shouldFail { - require.Error(t, err, tc.name) - return - } - require.NoError(t, err, tc.name) - require.Equal(t, tc.numResolutions, len(resolutions), tc.name) - require.Equal(t, tc.parsedResolution, resolutions, tc.name) - }) - } -} - func TestDiff(t *testing.T) { tcs := []struct { name string @@ -95,32 +17,32 @@ func TestDiff(t *testing.T) { }{ { name: "some inclusive elements", - a: []DownsampleResolution{{label: "a"}, {label: "b"}, {label: "c"}, {label: "d"}}, - b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, - expected: []DownsampleResolution{{label: "a"}, {label: "b"}}, + a: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, + b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, + expected: []DownsampleResolution{{Label: "a"}, {Label: "b"}}, }, { name: "b superset of a", - a: []DownsampleResolution{{label: "c"}, {label: "d"}}, - b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, + a: []DownsampleResolution{{Label: "c"}, {Label: "d"}}, + b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, expected: []DownsampleResolution(nil), }, { name: "a empty", a: []DownsampleResolution{}, - b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, + b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, expected: []DownsampleResolution(nil), }, { name: "all elements exclusive", - a: []DownsampleResolution{{label: "a"}}, - b: []DownsampleResolution{{label: "c"}, {label: "d"}, {label: "e"}}, - expected: []DownsampleResolution{{label: "a"}}, + a: []DownsampleResolution{{Label: "a"}}, + b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, + expected: []DownsampleResolution{{Label: "a"}}, }, { name: "same", - a: []DownsampleResolution{{label: "a"}, {label: "b"}, {label: "c"}, {label: "d"}}, - b: []DownsampleResolution{{label: "a"}, {label: "b"}, {label: "c"}, {label: "d"}}, + a: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, + b: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, expected: []DownsampleResolution(nil), }, { diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 090fa07013..63dbd49f4c 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -170,13 +170,6 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } - } else { - // We apply downsampling settings even when DatasetConfig is not given, which is the most common case. - downsampleCfg := &dataset.Downsample{} - err = downsampleCfg.Apply(conn) - if err != nil { - return nil, fmt.Errorf("error applying downsampling configuration: %w", err) - } } // client has to be initiated after migrate since migrate diff --git a/pkg/tests/end_to_end_tests/rollup_test.go b/pkg/tests/end_to_end_tests/rollup_test.go new file mode 100644 index 0000000000..6806d38ba6 --- /dev/null +++ b/pkg/tests/end_to_end_tests/rollup_test.go @@ -0,0 +1,71 @@ +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + + "github.com/timescale/promscale/pkg/rollup" + "github.com/timescale/promscale/pkg/util" +) + +func TestRollupCreationDeletion(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + rollupResolutions := []rollup.DownsampleResolution{ + { + Label: "short", + Resolution: util.DayDuration(5 * time.Minute), + Retention: util.DayDuration(30 * 24 * time.Hour), + }, + } + + pgCon, err := db.Acquire(context.Background()) + require.NoError(t, err) + defer pgCon.Release() + + err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + + verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[0].Label, time.Duration(rollupResolutions[0].Resolution), time.Duration(rollupResolutions[0].Retention), false) + + rollupResolutions = append(rollupResolutions, rollup.DownsampleResolution{ + Label: "long", + Resolution: util.DayDuration(time.Hour), + Retention: util.DayDuration(395 * 24 * time.Hour), + }) + + err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + + verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[1].Label, time.Duration(rollupResolutions[1].Resolution), time.Duration(rollupResolutions[1].Retention), false) + + // Remove the first entry and see if the entry is removed or not. + newRes := rollupResolutions[1:] + err = rollup.EnsureRollupWith(pgCon.Conn(), newRes) + require.NoError(t, err) + // Check if long exists. + verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[1].Label, time.Duration(rollupResolutions[1].Resolution), time.Duration(rollupResolutions[1].Retention), false) + // Check if short does not exist. + verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[0].Label, time.Duration(rollupResolutions[0].Resolution), time.Duration(rollupResolutions[0].Retention), true) + }) +} + +func verifyRollupExistence(t testing.TB, pgCon *pgx.Conn, name string, resolution, retention time.Duration, shouldError bool) { + var ( + rName string + rResolution time.Duration + rRetention time.Duration + ) + err := pgCon.QueryRow(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup WHERE name = $1", name).Scan(&rName, &rResolution, &rRetention) + if shouldError { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, resolution, rResolution) + require.Equal(t, retention, rRetention) +} From 782dad8c05b4c5f8bd3d674c152e852e40aecaa0 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Wed, 2 Nov 2022 15:12:48 +0530 Subject: [PATCH 3/7] Refactor DayDuration -> day.Duation Signed-off-by: Harkishen-Singh --- pkg/dataset/config.go | 26 +++++++------- pkg/dataset/config_test.go | 34 +++++++++---------- pkg/dataset/downsample.go | 10 +++--- .../day/duration.go} | 12 +++---- pkg/rollup/rollup.go | 10 +++--- .../end_to_end_tests/config_dataset_test.go | 12 +++---- pkg/tests/end_to_end_tests/rollup_test.go | 10 +++--- 7 files changed, 57 insertions(+), 57 deletions(-) rename pkg/{util/day_duration.go => internal/day/duration.go} (86%) diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index a3b7cfb7f9..3d4e210035 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -12,8 +12,8 @@ import ( "github.com/jackc/pgx/v4" "gopkg.in/yaml.v2" + "github.com/timescale/promscale/pkg/internal/day" "github.com/timescale/promscale/pkg/log" - "github.com/timescale/promscale/pkg/util" ) const ( @@ -44,17 +44,17 @@ type Config struct { // Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval util.DayDuration `yaml:"default_chunk_interval"` - Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. - HALeaseRefresh util.DayDuration `yaml:"ha_lease_refresh"` - HALeaseTimeout util.DayDuration `yaml:"ha_lease_timeout"` - RetentionPeriod util.DayDuration `yaml:"default_retention_period"` - Downsample *Downsample `yaml:"downsample,omitempty"` + ChunkInterval day.Duration `yaml:"default_chunk_interval"` + Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. + HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` + HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` + RetentionPeriod day.Duration `yaml:"default_retention_period"` + Downsample *Downsample `yaml:"downsample,omitempty"` } // Traces contains dataset configuration options for traces data. type Traces struct { - RetentionPeriod util.DayDuration `yaml:"default_retention_period"` + RetentionPeriod day.Duration `yaml:"default_retention_period"` } // NewConfig creates a new dataset config based on the configuration YAML contents. @@ -100,21 +100,21 @@ func (c *Config) Apply(conn *pgx.Conn) error { func (c *Config) applyDefaults() { if c.Metrics.ChunkInterval <= 0 { - c.Metrics.ChunkInterval = util.DayDuration(defaultMetricChunkInterval) + c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval) } if c.Metrics.Compression == nil { c.Metrics.Compression = &defaultMetricCompressionVar } if c.Metrics.HALeaseRefresh <= 0 { - c.Metrics.HALeaseRefresh = util.DayDuration(defaultMetricHALeaseRefresh) + c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh) } if c.Metrics.HALeaseTimeout <= 0 { - c.Metrics.HALeaseTimeout = util.DayDuration(defaultMetricHALeaseTimeout) + c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout) } if c.Metrics.RetentionPeriod <= 0 { - c.Metrics.RetentionPeriod = util.DayDuration(defaultMetricRetentionPeriod) + c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod) } if c.Traces.RetentionPeriod <= 0 { - c.Traces.RetentionPeriod = util.DayDuration(defaultTraceRetentionPeriod) + c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod) } } diff --git a/pkg/dataset/config_test.go b/pkg/dataset/config_test.go index b2b07e5bc3..42468bc2f0 100644 --- a/pkg/dataset/config_test.go +++ b/pkg/dataset/config_test.go @@ -1,11 +1,11 @@ package dataset import ( - "github.com/timescale/promscale/pkg/util" "testing" "time" "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/internal/day" ) var testCompressionSetting = true @@ -46,7 +46,7 @@ func TestNewConfig(t *testing.T) { default_retention_period: 3d2h`, cfg: Config{ Metrics: Metrics{ - RetentionPeriod: util.DayDuration(((3 * 24) + 2) * time.Hour), + RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour), }, }, }, @@ -62,14 +62,14 @@ traces: default_retention_period: 15d`, cfg: Config{ Metrics: Metrics{ - ChunkInterval: util.DayDuration(3 * time.Hour), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: util.DayDuration(2 * time.Minute), - HALeaseTimeout: util.DayDuration(5 * time.Second), - RetentionPeriod: util.DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: util.DayDuration(15 * 24 * time.Hour), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, }, }, @@ -98,14 +98,14 @@ func TestApplyDefaults(t *testing.T) { t, Config{ Metrics: Metrics{ - ChunkInterval: util.DayDuration(defaultMetricChunkInterval), + ChunkInterval: day.Duration(defaultMetricChunkInterval), Compression: &defaultMetricCompressionVar, - HALeaseRefresh: util.DayDuration(defaultMetricHALeaseRefresh), - HALeaseTimeout: util.DayDuration(defaultMetricHALeaseTimeout), - RetentionPeriod: util.DayDuration(defaultMetricRetentionPeriod), + HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh), + HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout), + RetentionPeriod: day.Duration(defaultMetricRetentionPeriod), }, Traces: Traces{ - RetentionPeriod: util.DayDuration(defaultTraceRetentionPeriod), + RetentionPeriod: day.Duration(defaultTraceRetentionPeriod), }, }, c, @@ -113,14 +113,14 @@ func TestApplyDefaults(t *testing.T) { untouched := Config{ Metrics: Metrics{ - ChunkInterval: util.DayDuration(3 * time.Hour), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: util.DayDuration(2 * time.Minute), - HALeaseTimeout: util.DayDuration(5 * time.Second), - RetentionPeriod: util.DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: util.DayDuration(15 * 24 * time.Hour), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, } diff --git a/pkg/dataset/downsample.go b/pkg/dataset/downsample.go index cb4ab5b633..f65b0cff84 100644 --- a/pkg/dataset/downsample.go +++ b/pkg/dataset/downsample.go @@ -11,9 +11,9 @@ import ( "github.com/jackc/pgx/v4" + "github.com/timescale/promscale/pkg/internal/day" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/rollup" - "github.com/timescale/promscale/pkg/util" ) const defaultDownsampleState = true @@ -25,13 +25,13 @@ var ( defaultDownsampleResolution = []rollup.DownsampleResolution{ { Label: "short", - Resolution: util.DayDuration(5 * time.Minute), - Retention: util.DayDuration(90 * 24 * time.Hour), + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(90 * 24 * time.Hour), }, { Label: "long", - Resolution: util.DayDuration(time.Hour), - Retention: util.DayDuration(395 * 24 * time.Hour), + Resolution: day.Duration(time.Hour), + Retention: day.Duration(395 * 24 * time.Hour), }, } ) diff --git a/pkg/util/day_duration.go b/pkg/internal/day/duration.go similarity index 86% rename from pkg/util/day_duration.go rename to pkg/internal/day/duration.go index 52e8302a80..7d3546928b 100644 --- a/pkg/util/day_duration.go +++ b/pkg/internal/day/duration.go @@ -2,7 +2,7 @@ // Please see the included NOTICE for copyright information and // LICENSE for a copy of the license. -package util +package day import ( "fmt" @@ -15,13 +15,13 @@ const ( unknownUnitDErrorPrefix = `time: unknown unit "d"` ) -// DayDuration acts like a time.Duration with support for "d" unit +// Duration acts like a time.Duration with support for "d" unit // which is used for specifying number of days in duration. -type DayDuration time.Duration +type Duration time.Duration // UnmarshalText unmarshals strings into DayDuration values while // handling the day unit. It leans heavily into time.ParseDuration. -func (d *DayDuration) UnmarshalText(s []byte) error { +func (d *Duration) UnmarshalText(s []byte) error { val, err := time.ParseDuration(string(s)) if err != nil { // Check for specific error indicating we are using days unit. @@ -34,7 +34,7 @@ func (d *DayDuration) UnmarshalText(s []byte) error { return err } } - *d = DayDuration(val) + *d = Duration(val) return nil } @@ -65,6 +65,6 @@ func handleDays(s []byte) (time.Duration, error) { } // String returns a string value of DayDuration. -func (d DayDuration) String() string { +func (d Duration) String() string { return time.Duration(d).String() } diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go index 55ede6ad67..663659d4e9 100644 --- a/pkg/rollup/rollup.go +++ b/pkg/rollup/rollup.go @@ -11,13 +11,13 @@ import ( "github.com/jackc/pgx/v4" - "github.com/timescale/promscale/pkg/util" + "github.com/timescale/promscale/pkg/internal/day" ) type DownsampleResolution struct { - Label string `yaml:"label"` - Resolution util.DayDuration `yaml:"resolution"` - Retention util.DayDuration `yaml:"retention"` + Label string `yaml:"label"` + Resolution day.Duration `yaml:"resolution"` + Retention day.Duration `yaml:"retention"` } // EnsureRollupWith ensures "strictly" that the given new resolutions are applied in the database. @@ -43,7 +43,7 @@ func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) err if err := rows.Scan(&lName, &resolution, &retention); err != nil { return fmt.Errorf("error scanning output rows for existing resolutions: %w", err) } - existingResolutions = append(existingResolutions, DownsampleResolution{Label: lName, Resolution: util.DayDuration(resolution), Retention: util.DayDuration(retention)}) + existingResolutions = append(existingResolutions, DownsampleResolution{Label: lName, Resolution: day.Duration(resolution), Retention: day.Duration(retention)}) } // Determine which resolutions need to be created and deleted from the DB. diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index 2d264b9139..f9b1f95541 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -2,7 +2,6 @@ package end_to_end_tests import ( "context" - "github.com/timescale/promscale/pkg/util" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/dataset" + "github.com/timescale/promscale/pkg/internal/day" ) func TestDatasetConfigApply(t *testing.T) { @@ -29,14 +29,14 @@ func TestDatasetConfigApply(t *testing.T) { cfg := dataset.Config{ Metrics: dataset.Metrics{ - ChunkInterval: util.DayDuration(4 * time.Hour), + ChunkInterval: day.Duration(4 * time.Hour), Compression: &disableCompression, - HALeaseRefresh: util.DayDuration(15 * time.Second), - HALeaseTimeout: util.DayDuration(2 * time.Minute), - RetentionPeriod: util.DayDuration(15 * 24 * time.Hour), + HALeaseRefresh: day.Duration(15 * time.Second), + HALeaseTimeout: day.Duration(2 * time.Minute), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, Traces: dataset.Traces{ - RetentionPeriod: util.DayDuration(10 * 24 * time.Hour), + RetentionPeriod: day.Duration(10 * 24 * time.Hour), }, } diff --git a/pkg/tests/end_to_end_tests/rollup_test.go b/pkg/tests/end_to_end_tests/rollup_test.go index 6806d38ba6..ce36dc51e9 100644 --- a/pkg/tests/end_to_end_tests/rollup_test.go +++ b/pkg/tests/end_to_end_tests/rollup_test.go @@ -9,8 +9,8 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/internal/day" "github.com/timescale/promscale/pkg/rollup" - "github.com/timescale/promscale/pkg/util" ) func TestRollupCreationDeletion(t *testing.T) { @@ -18,8 +18,8 @@ func TestRollupCreationDeletion(t *testing.T) { rollupResolutions := []rollup.DownsampleResolution{ { Label: "short", - Resolution: util.DayDuration(5 * time.Minute), - Retention: util.DayDuration(30 * 24 * time.Hour), + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), }, } @@ -34,8 +34,8 @@ func TestRollupCreationDeletion(t *testing.T) { rollupResolutions = append(rollupResolutions, rollup.DownsampleResolution{ Label: "long", - Resolution: util.DayDuration(time.Hour), - Retention: util.DayDuration(395 * 24 * time.Hour), + Resolution: day.Duration(time.Hour), + Retention: day.Duration(395 * 24 * time.Hour), }) err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) From 540951177eb0e6f52f0bf7424f66a65e5b92e188 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Fri, 11 Nov 2022 12:25:42 +0530 Subject: [PATCH 4/7] Update tests related to register_metric_view(). Signed-off-by: Harkishen-Singh --- .../end_to_end_tests/continuous_agg_test.go | 6 +++--- pkg/tests/end_to_end_tests/create_test.go | 18 +++++++++--------- .../end_to_end_tests/query_integration_test.go | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 850140e590..56883b6d29 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -190,7 +190,7 @@ WITH (timescaledb.continuous) AS t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg', NULL)"); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } @@ -372,7 +372,7 @@ WITH (timescaledb.continuous) AS FROM prom_data.test GROUP BY public.time_bucket('1hour', time), series_id`) require.NoError(t, err) - _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')") + _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg', NULL)") require.NoError(t, err) _, err = db.Exec(context.Background(), "SELECT prom_api.set_metric_retention_period('cagg_schema', 'cagg', INTERVAL '180 days')") @@ -450,7 +450,7 @@ WITH (timescaledb.continuous) AS t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour', NULL)"); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } diff --git a/pkg/tests/end_to_end_tests/create_test.go b/pkg/tests/end_to_end_tests/create_test.go index 1d3d9a9edd..bb21aacf39 100644 --- a/pkg/tests/end_to_end_tests/create_test.go +++ b/pkg/tests/end_to_end_tests/create_test.go @@ -1597,7 +1597,7 @@ func TestRegisterMetricView(t *testing.T) { withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { // Cannot register non-existant schema. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('nonexistant', 'missing')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('nonexistant', 'missing', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view from a non-existant schema") } @@ -1606,7 +1606,7 @@ func TestRegisterMetricView(t *testing.T) { } // Cannot register non-existant view. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'missing')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'missing', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view from a non-existant metric view") } @@ -1640,7 +1640,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_in_data_schema AS SELECT * FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view in data schema: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_data', 'metric_view_in_data_schema')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_data', 'metric_view_in_data_schema', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view in data schema") } @@ -1648,7 +1648,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_bad_columns AS SELECT time, series_id, true as bad_column FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_columns')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_columns', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with different columns than raw metric") } @@ -1656,7 +1656,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_bad_column_types AS SELECT time, series_id, true as value FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_column_types')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_column_types', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with column types different than raw metric") } @@ -1664,7 +1664,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_not_based AS SELECT time, series_id, 1.0 as value FROM prom_view."metric_view_bad_columns"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_not_based')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_not_based', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with column types different than raw metric") } @@ -1672,7 +1672,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view AS SELECT * FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL)"); err != nil { t.Fatalf("Error creating valid metric view: %v", err) } @@ -1694,12 +1694,12 @@ func TestRegisterMetricView(t *testing.T) { } // Cannot register the same view twice. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL)"); err == nil { t.Fatal("Should not be able to register the same view twice") } // Should succeed if we register same view twice but also use `if_not_exists` - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', true)"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL, true)"); err != nil { t.Fatalf("Should be able to register the same view twice when using `if_not_exists`: %v", err) } diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index 2b25312305..0f75d24b40 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -766,7 +766,7 @@ func createMetricView(db *pgxpool.Pool, t testing.TB, schemaName, viewName, metr if _, err := db.Exec(context.Background(), fmt.Sprintf(`CREATE VIEW "%s"."%s" AS SELECT * FROM prom_data."%s"`, schemaName, viewName, metricName)); err != nil { t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), fmt.Sprintf("SELECT prom_api.register_metric_view('%s', '%s')", schemaName, viewName)); err != nil { + if _, err := db.Exec(context.Background(), fmt.Sprintf("SELECT prom_api.register_metric_view('%s', '%s', NULL)", schemaName, viewName)); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } } From 8281a598effa23282521c3f0fcce1f99b0f7311d Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Tue, 22 Nov 2022 22:51:44 +0530 Subject: [PATCH 5/7] Adds support to instrument metric-rollups behaviour Signed-off-by: Harkishen-Singh --- pkg/pgmodel/metrics/database/database.go | 23 +++++-- pkg/pgmodel/metrics/database/metric_series.go | 66 +++++++++++++++++++ pkg/pgmodel/metrics/database/metrics.go | 42 ++++++++++++ 3 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 pkg/pgmodel/metrics/database/metric_series.go diff --git a/pkg/pgmodel/metrics/database/database.go b/pkg/pgmodel/metrics/database/database.go index 56f128156c..35d5f87f7e 100644 --- a/pkg/pgmodel/metrics/database/database.go +++ b/pkg/pgmodel/metrics/database/database.go @@ -19,10 +19,11 @@ type Engine interface { } type metricsEngineImpl struct { - conn pgxconn.PgxConn - ctx context.Context - isRunning atomic.Value - metrics []metricQueryWrap + conn pgxconn.PgxConn + ctx context.Context + isRunning atomic.Value + metrics []metricQueryWrap + metricSeries []metricsWithSeries } // NewEngine creates an engine that performs database metrics evaluation every evalInterval. @@ -33,9 +34,10 @@ type metricsEngineImpl struct { // will cause evaluation errors. func NewEngine(ctx context.Context, conn pgxconn.PgxConn) *metricsEngineImpl { engine := &metricsEngineImpl{ - conn: conn, - ctx: ctx, - metrics: metrics, + conn: conn, + ctx: ctx, + metrics: metrics, + metricSeries: metricSeries, } engine.isRunning.Store(false) return engine @@ -136,6 +138,13 @@ func (e *metricsEngineImpl) Update() error { return err } handleResults(results, batchMetrics) + + for _, ms := range e.metricSeries { + if err = ms.update(e.conn); err != nil { + log.Warn("msg", "error evaluating metrics with series", "err", err.Error()) + continue // We shouldn't stop everything if this fails. + } + } return results.Close() } diff --git a/pkg/pgmodel/metrics/database/metric_series.go b/pkg/pgmodel/metrics/database/metric_series.go new file mode 100644 index 0000000000..fd84fc63ee --- /dev/null +++ b/pkg/pgmodel/metrics/database/metric_series.go @@ -0,0 +1,66 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/util" +) + +var ( + caggsRefreshTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_refresh_total", + Help: "Total number of caggs policy executed.", + }, []string{"refresh_interval"}) + caggsRefreshSuccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_refresh_success", + Help: "Total number of caggs policy executed successfully.", + }, []string{"refresh_interval"}) +) + +func init() { + prometheus.MustRegister(caggsRefreshSuccess, caggsRefreshTotal) +} + +type metricsWithSeries struct { + update func(conn pgxconn.PgxConn) error +} + +var metricSeries = []metricsWithSeries{ + { + update: func(conn pgxconn.PgxConn) error { + rows, err := conn.Query(context.Background(), ` +SELECT + total_successes, + total_runs, + (config ->> 'refresh_interval')::INTERVAL +FROM timescaledb_information.jobs j +INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_refresh_policy') + `) + if err != nil { + return fmt.Errorf("error running instrumentation for execute_caggs_refresh_policy: %w", err) + } + defer rows.Close() + for rows.Next() { + var ( + success, total int64 + refreshInterval time.Duration + ) + err = rows.Scan(&success, &total, &refreshInterval) + if err != nil { + return fmt.Errorf("error scanning values for execute_caggs_refresh_policy: %w", err) + } + caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(success)) + caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(total)) + } + return nil + }, + }, +} diff --git a/pkg/pgmodel/metrics/database/metrics.go b/pkg/pgmodel/metrics/database/metrics.go index bf63cb5b79..cccd8c4f51 100644 --- a/pkg/pgmodel/metrics/database/metrics.go +++ b/pkg/pgmodel/metrics/database/metrics.go @@ -424,6 +424,48 @@ var metrics = []metricQueryWrap{ }, ), query: `select count(*)::bigint from _prom_catalog.metric`, + }, { + metrics: gauges( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_compression_policy_total", + Help: "Total number of caggs compression policy executed.", + }, + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_compression_policy_success", + Help: "Total number of caggs compression policy executed successfully.", + }, + ), + query: ` +SELECT + total_runs, + total_successes +FROM timescaledb_information.jobs j + INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_compression_policy')`, + }, { + metrics: gauges( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_retention_policy_total", + Help: "Total number of caggs retention policy executed.", + }, + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_retention_policy_success", + Help: "Total number of caggs retention policy executed successfully.", + }, + ), + query: ` +SELECT + total_runs, + total_successes +FROM timescaledb_information.jobs j + INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_retention_policy')`, }, } From ec7c1d556f249aba0d7d2104225254f834bb6e68 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Thu, 1 Dec 2022 17:12:50 +0530 Subject: [PATCH 6/7] Update rollups creation/deletion/updation using dataset-config. Signed-off-by: Harkishen-Singh --- pkg/dataset/config.go | 19 +-- pkg/dataset/downsample.go | 68 ---------- pkg/rollup/config.go | 97 ++++++++++++++ pkg/rollup/rollup.go | 118 ++++++++++-------- pkg/rollup/rollup_test.go | 60 --------- pkg/runner/client.go | 6 +- .../end_to_end_tests/config_dataset_test.go | 4 +- pkg/tests/end_to_end_tests/rollup_test.go | 72 ++++++++--- 8 files changed, 235 insertions(+), 209 deletions(-) delete mode 100644 pkg/dataset/downsample.go create mode 100644 pkg/rollup/config.go delete mode 100644 pkg/rollup/rollup_test.go diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index 3d4e210035..a97b61966b 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -14,6 +14,7 @@ import ( "github.com/timescale/promscale/pkg/internal/day" "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/rollup" ) const ( @@ -44,12 +45,12 @@ type Config struct { // Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval day.Duration `yaml:"default_chunk_interval"` - Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. - HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` - HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` - RetentionPeriod day.Duration `yaml:"default_retention_period"` - Downsample *Downsample `yaml:"downsample,omitempty"` + ChunkInterval day.Duration `yaml:"default_chunk_interval"` + Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. + HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` + HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` + RetentionPeriod day.Duration `yaml:"default_retention_period"` + Rollups *rollup.Config `yaml:"rollups,omitempty"` } // Traces contains dataset configuration options for traces data. @@ -64,11 +65,11 @@ func NewConfig(contents string) (cfg Config, err error) { } // Apply applies the configuration to the database via the supplied DB connection. -func (c *Config) Apply(conn *pgx.Conn) error { +func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { c.applyDefaults() - if c.Metrics.Downsample != nil { - if err := c.Metrics.Downsample.Apply(conn); err != nil { + if c.Metrics.Rollups != nil { + if err := c.Metrics.Rollups.Apply(ctx, conn); err != nil { return fmt.Errorf("error applying configuration for downsampling: %w", err) } } diff --git a/pkg/dataset/downsample.go b/pkg/dataset/downsample.go deleted file mode 100644 index f65b0cff84..0000000000 --- a/pkg/dataset/downsample.go +++ /dev/null @@ -1,68 +0,0 @@ -// This file and its contents are licensed under the Apache License 2.0. -// Please see the included NOTICE for copyright information and -// LICENSE for a copy of the license. - -package dataset - -import ( - "context" - "fmt" - "time" - - "github.com/jackc/pgx/v4" - - "github.com/timescale/promscale/pkg/internal/day" - "github.com/timescale/promscale/pkg/log" - "github.com/timescale/promscale/pkg/rollup" -) - -const defaultDownsampleState = true - -var ( - setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" - - defaultDownsampleStateVar = defaultDownsampleState - defaultDownsampleResolution = []rollup.DownsampleResolution{ - { - Label: "short", - Resolution: day.Duration(5 * time.Minute), - Retention: day.Duration(90 * 24 * time.Hour), - }, - { - Label: "long", - Resolution: day.Duration(time.Hour), - Retention: day.Duration(395 * 24 * time.Hour), - }, - } -) - -type Downsample struct { - Automatic *bool `yaml:"automatic,omitempty"` - Resolution []rollup.DownsampleResolution `yaml:"resolutions,omitempty"` -} - -func (d *Downsample) Apply(conn *pgx.Conn) error { - d.applyDefaults() - - log.Info("msg", fmt.Sprintf("Setting metric automatic downsample to %t", *d.Automatic)) - if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Automatic); err != nil { - return err - } - - if *d.Automatic { - if err := rollup.EnsureRollupWith(conn, d.Resolution); err != nil { - return fmt.Errorf("ensure rollup with: %w", err) - } - } - return nil -} - -func (d *Downsample) applyDefaults() { - if d.Automatic == nil { - // In default case, we plan to downsample data. - d.Automatic = &defaultDownsampleStateVar - } - if d.Resolution == nil { - d.Resolution = defaultDownsampleResolution - } -} diff --git a/pkg/rollup/config.go b/pkg/rollup/config.go new file mode 100644 index 0000000000..d1794bf3b5 --- /dev/null +++ b/pkg/rollup/config.go @@ -0,0 +1,97 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/jackc/pgx/v4" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/log" +) + +const ( + setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" + + // short and long represent system resolutions. + short = "short" + long = "long" +) + +var ( + defaultDownsampleState = false + useDefaultResolution = false + systemResolution = map[string]Definition{ + short: { + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(90 * 24 * time.Hour), + }, + long: { + Resolution: day.Duration(time.Hour), + Retention: day.Duration(395 * 24 * time.Hour), + }, + } +) + +type Config struct { + Enabled *bool `yaml:"enabled,omitempty"` + UseDefaultResolution *bool `yaml:"use_default_resolution"` + Resolutions `yaml:"resolutions,omitempty"` +} + +type Definition struct { + Resolution day.Duration `yaml:"resolution"` + Retention day.Duration `yaml:"retention"` + Delete bool `yaml:"delete"` +} + +type Resolutions map[string]Definition + +func (d *Config) Apply(ctx context.Context, conn *pgx.Conn) error { + d.applyDefaults() + + if containsSystemResolutions(d.Resolutions) { + return fmt.Errorf("'short' and 'long' are system resolutions. These cannot be applied as rollup labels") + } + + log.Info("msg", fmt.Sprintf("Setting automatic metric downsample to %t", *d.Enabled)) + if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Enabled); err != nil { + return err + } + + if *d.Enabled { + if *d.UseDefaultResolution { + d.Resolutions["short"] = systemResolution["short"] + d.Resolutions["long"] = systemResolution["long"] + } + if err := Sync(ctx, conn, d.Resolutions); err != nil { + return fmt.Errorf("ensure rollup with: %w", err) + } + } + return nil +} + +func (d *Config) applyDefaults() { + if d.Enabled == nil { + d.Enabled = &defaultDownsampleState + } + if d.UseDefaultResolution == nil { + d.UseDefaultResolution = &useDefaultResolution + } +} + +func containsSystemResolutions(r Resolutions) bool { + for k := range r { + k = strings.ToLower(k) + if k == short || k == long { + return true + } + } + return false +} diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go index 663659d4e9..fe6a9134aa 100644 --- a/pkg/rollup/rollup.go +++ b/pkg/rollup/rollup.go @@ -14,93 +14,109 @@ import ( "github.com/timescale/promscale/pkg/internal/day" ) -type DownsampleResolution struct { - Label string `yaml:"label"` - Resolution day.Duration `yaml:"resolution"` - Retention day.Duration `yaml:"retention"` -} - -// EnsureRollupWith ensures "strictly" that the given new resolutions are applied in the database. -// -// Note: It follows a "strict" behaviour, meaning any existing resolutions of downsampling in -// the database will be removed, so that the all downsampling data in the database strictly -// matches the provided newResolutions. -// -// Example: If the DB already contains metric rollups for `short` and `long`, and in dataset-config, -// connector sees `very_short` and `long`, then EnsureRollupWith will remove the `short` downsampled data -// and create `very_short`, while not touching `long`. -func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) error { +// Sync updates the rollups in the DB in accordance with the given resolutions. It handles: +// 1. Creating of new rollups +// 2. Deletion of rollups that have `delete: true` +// 3. Update retention duration of rollups that have same label name but different retention duration. If resolution of +// existing rollups are updated, an error is returned +func Sync(ctx context.Context, conn *pgx.Conn, r Resolutions) error { rows, err := conn.Query(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup") if err != nil { return fmt.Errorf("querying existing resolutions: %w", err) } defer rows.Close() - var existingResolutions []DownsampleResolution + existingResolutions := make(Resolutions) for rows.Next() { var lName string var resolution, retention time.Duration if err := rows.Scan(&lName, &resolution, &retention); err != nil { return fmt.Errorf("error scanning output rows for existing resolutions: %w", err) } - existingResolutions = append(existingResolutions, DownsampleResolution{Label: lName, Resolution: day.Duration(resolution), Retention: day.Duration(retention)}) + existingResolutions[lName] = Definition{Resolution: day.Duration(resolution), Retention: day.Duration(retention)} } - // Determine which resolutions need to be created and deleted from the DB. - pendingCreation := diff(newResolutions, existingResolutions) - pendingDeletion := diff(existingResolutions, newResolutions) + if err := errOnResolutionMismatch(existingResolutions, r); err != nil { + return fmt.Errorf("error on existing resolution mismatch: %w", err) + } + + if err := updateExistingRollups(ctx, conn, existingResolutions, r); err != nil { + return fmt.Errorf("update existing rollups: %w", err) + } // Delete rollups that are no longer required. - if err = deleteRollups(conn, pendingDeletion); err != nil { + if err = deleteRollups(ctx, conn, existingResolutions, r); err != nil { return fmt.Errorf("delete rollups: %w", err) } // Create new rollups. - if err = createRollups(conn, pendingCreation); err != nil { + if err = createRollups(ctx, conn, existingResolutions, r); err != nil { return fmt.Errorf("create rollups: %w", err) } return nil } -func createRollups(conn *pgx.Conn, res []DownsampleResolution) error { - for _, r := range res { - _, err := conn.Exec(context.Background(), "CALL _prom_catalog.create_rollup($1, $2, $3)", r.Label, time.Duration(r.Resolution), time.Duration(r.Retention)) - if err != nil { - return fmt.Errorf("error creating rollup for %s: %w", r.Label, err) +// errOnResolutionMismatch returns an error if a given resolution exists in the DB with a different resolution duration. +func errOnResolutionMismatch(existing, r Resolutions) error { + for labelName, res := range r { + if oldRes, exists := existing[labelName]; exists { + if oldRes.Resolution != res.Resolution { + return fmt.Errorf("existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them") + } } } return nil } -func deleteRollups(conn *pgx.Conn, res []DownsampleResolution) error { - for _, r := range res { - _, err := conn.Exec(context.Background(), "CALL _prom_catalog.delete_rollup($1)", r.Label) - if err != nil { - return fmt.Errorf("error deleting rollup for %s: %w", r.Label, err) +// updateExistingRollups updates the existing rollups retention if the new resolutions with a same name has +// different retention duration. +func updateExistingRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for labelName, res := range r { + if oldRes, exists := existingRes[labelName]; exists && oldRes.Retention != res.Retention { + batch.Queue("UPDATE _prom_catalog.rollup SET retention = $1 WHERE name = $2", time.Duration(res.Retention), labelName) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error closing batch: %w", err) } } return nil } -// diff returns the elements of a that are not in b. -// -// We need this since we want to support a "strict" behaviour in downsampling. This basically means, to have the exact -// downsampling data in the DB based on what's mentioned in the dataset-config. -// -// See the comment for EnsureRollupWith for example. -func diff(a, b []DownsampleResolution) []DownsampleResolution { - var difference []DownsampleResolution - for i := range a { - found := false - for j := range b { - if a[i].Label == b[j].Label { - found = true - break - } +func createRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for lName, res := range r { + _, exists := existingRes[lName] + if !exists && !res.Delete { + batch.Queue("CALL _prom_catalog.create_rollup($1, $2, $3)", lName, time.Duration(res.Resolution), time.Duration(res.Retention)) } - if !found { - difference = append(difference, a[i]) + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error creating new rollups: %w", err) } } - return difference + return nil +} + +func deleteRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for lName, res := range r { + _, exists := existingRes[lName] + if exists && res.Delete { + // Delete the rollup only if it exists in the DB. + batch.Queue("CALL _prom_catalog.delete_rollup($1)", lName) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error deleting new rollups: %w", err) + } + } + return nil } diff --git a/pkg/rollup/rollup_test.go b/pkg/rollup/rollup_test.go deleted file mode 100644 index e9822541c8..0000000000 --- a/pkg/rollup/rollup_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// This file and its contents are licensed under the Apache License 2.0. -// Please see the included NOTICE for copyright information and -// LICENSE for a copy of the license. - -package rollup - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestDiff(t *testing.T) { - tcs := []struct { - name string - a, b, expected []DownsampleResolution - }{ - { - name: "some inclusive elements", - a: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution{{Label: "a"}, {Label: "b"}}, - }, - { - name: "b superset of a", - a: []DownsampleResolution{{Label: "c"}, {Label: "d"}}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution(nil), - }, - { - name: "a empty", - a: []DownsampleResolution{}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution(nil), - }, - { - name: "all elements exclusive", - a: []DownsampleResolution{{Label: "a"}}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution{{Label: "a"}}, - }, - { - name: "same", - a: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, - b: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, - expected: []DownsampleResolution(nil), - }, - { - name: "empty", - a: []DownsampleResolution{}, - b: []DownsampleResolution{}, - expected: []DownsampleResolution(nil), - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expected, diff(tc.a, tc.b), tc.name) - }) - } -} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 63dbd49f4c..2c3a141f35 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -166,7 +166,7 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error } if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(conn, cfg.DatasetConfig) + err = ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } @@ -226,13 +226,13 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(conn *pgx.Conn, cfgFilename string) error { +func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) error { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { return err } - return cfg.Apply(conn) + return cfg.Apply(ctx, conn) } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index f9b1f95541..12b186b5ca 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -40,7 +40,7 @@ func TestDatasetConfigApply(t *testing.T) { }, } - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 4*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) @@ -54,7 +54,7 @@ func TestDatasetConfigApply(t *testing.T) { cfg, err = dataset.NewConfig("") require.NoError(t, err) - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 8*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) diff --git a/pkg/tests/end_to_end_tests/rollup_test.go b/pkg/tests/end_to_end_tests/rollup_test.go index ce36dc51e9..7f38a9f32c 100644 --- a/pkg/tests/end_to_end_tests/rollup_test.go +++ b/pkg/tests/end_to_end_tests/rollup_test.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package end_to_end_tests import ( @@ -13,11 +17,10 @@ import ( "github.com/timescale/promscale/pkg/rollup" ) -func TestRollupCreationDeletion(t *testing.T) { +func TestRollupSync(t *testing.T) { withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { - rollupResolutions := []rollup.DownsampleResolution{ - { - Label: "short", + rollupResolutions := rollup.Resolutions{ + "short": { Resolution: day.Duration(5 * time.Minute), Retention: day.Duration(30 * 24 * time.Hour), }, @@ -27,30 +30,67 @@ func TestRollupCreationDeletion(t *testing.T) { require.NoError(t, err) defer pgCon.Release() - err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) + // Test 1: Check if 'short' rollup is created. + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) require.NoError(t, err) - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[0].Label, time.Duration(rollupResolutions[0].Resolution), time.Duration(rollupResolutions[0].Retention), false) + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), false) - rollupResolutions = append(rollupResolutions, rollup.DownsampleResolution{ - Label: "long", + rollupResolutions["long"] = rollup.Definition{ Resolution: day.Duration(time.Hour), Retention: day.Duration(395 * 24 * time.Hour), - }) + } - err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) + // Test 2: Check if 'long' rollup is created. + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) require.NoError(t, err) - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[1].Label, time.Duration(rollupResolutions[1].Resolution), time.Duration(rollupResolutions[1].Retention), false) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + + // Test 3: Update the resolution and check if error is returned. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(4 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.Equal(t, + "error on existing resolution mismatch: existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them", + err.Error()) + // Reset back to original resolution. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + } - // Remove the first entry and see if the entry is removed or not. - newRes := rollupResolutions[1:] - err = rollup.EnsureRollupWith(pgCon.Conn(), newRes) + // Test 4: Remove the first entry and see if the entry is removed or not. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + Delete: true, + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) require.NoError(t, err) // Check if long exists. - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[1].Label, time.Duration(rollupResolutions[1].Resolution), time.Duration(rollupResolutions[1].Retention), false) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) // Check if short does not exist. - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[0].Label, time.Duration(rollupResolutions[0].Resolution), time.Duration(rollupResolutions[0].Retention), true) + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), true) + + // Test 5: Update retention of long and check if the same is reflected in the DB. + rollupResolutions["long"] = rollup.Definition{ + Resolution: day.Duration(time.Hour), + Retention: day.Duration(500 * 24 * time.Hour), // Updated retention duration. + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + // Short should still not exists. + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), true) }) } From 9e1f5c5c9a2299693db6930501676f35096c05cc Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Fri, 2 Dec 2022 17:31:07 +0530 Subject: [PATCH 7/7] Implement rollup resolution decider for incoming queries. Signed-off-by: Harkishen-Singh --- pkg/dataset/config.go | 6 +- pkg/pgclient/client.go | 15 +- pkg/pgclient/config.go | 3 + pkg/pgmodel/querier/querier.go | 21 ++- pkg/pgmodel/querier/querier_sql_test.go | 7 +- pkg/pgmodel/querier/query_remote_read.go | 2 +- pkg/pgmodel/querier/query_sample.go | 27 ++- pkg/rollup/config.go | 1 + pkg/rollup/decider.go | 177 ++++++++++++++++++ pkg/rollup/decider_test.go | 113 +++++++++++ pkg/runner/client.go | 16 +- pkg/tests/constants.go | 5 +- pkg/tests/end_to_end_tests/alerts_test.go | 2 +- .../end_to_end_tests/continuous_agg_test.go | 3 +- pkg/tests/end_to_end_tests/exemplar_test.go | 5 +- .../end_to_end_tests/multi_tenancy_test.go | 28 +-- pkg/tests/end_to_end_tests/nan_test.go | 4 +- pkg/tests/end_to_end_tests/null_chars_test.go | 3 +- .../promql_endpoint_integration_test.go | 2 +- .../query_integration_test.go | 17 +- .../rollup_query_helper_test.go | 102 ++++++++++ pkg/tests/end_to_end_tests/rules_test.go | 2 +- 22 files changed, 511 insertions(+), 50 deletions(-) create mode 100644 pkg/rollup/decider.go create mode 100644 pkg/rollup/decider_test.go create mode 100644 pkg/tests/end_to_end_tests/rollup_query_helper_test.go diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index a97b61966b..469413491e 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -50,7 +50,7 @@ type Metrics struct { HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` RetentionPeriod day.Duration `yaml:"default_retention_period"` - Rollups *rollup.Config `yaml:"rollups,omitempty"` + Rollup *rollup.Config `yaml:"rollup,omitempty"` } // Traces contains dataset configuration options for traces data. @@ -68,8 +68,8 @@ func NewConfig(contents string) (cfg Config, err error) { func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { c.applyDefaults() - if c.Metrics.Rollups != nil { - if err := c.Metrics.Rollups.Apply(ctx, conn); err != nil { + if c.Metrics.Rollup != nil { + if err := c.Metrics.Rollup.Apply(ctx, conn); err != nil { return fmt.Errorf("error applying configuration for downsampling: %w", err) } } diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index e6747082c0..b53cfa4bfc 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -61,7 +61,7 @@ type Client struct { } // NewClient creates a new PostgreSQL client -func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) { +func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) { var ( err error dbMaxConns int @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche if err != nil { return nil, fmt.Errorf("err creating reader connection pool: %w", err) } - client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly) + client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups) if err != nil { return client, err } @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string { } // NewClientWithPool creates a new PostgreSQL client with an existing connection pool. -func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) { +func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) { sigClose := make(chan struct{}) metricsCache := cache.NewMetricCache(cfg.CacheConfig) labelsCache := cache.NewLabelsCache(cfg.CacheConfig) @@ -223,7 +223,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig) labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer()) - dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer()) + dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), cfg.MetricsScrapeInterval, useRollups) + if err != nil { + return nil, fmt.Errorf("error starting querier: %w", err) + } + queryable := query.NewQueryable(dbQuerier, labelsReader) dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{}) @@ -232,8 +236,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri writerConn = pgxconn.NewPgxConn(writerPool) dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c) if err != nil { - log.Error("msg", "err starting the ingestor", "err", err) - return nil, err + return nil, fmt.Errorf("error starting ingestor: %w", err) } } if maintPool != nil { diff --git a/pkg/pgclient/config.go b/pkg/pgclient/config.go index 8d03f5be4a..4a8be24b93 100644 --- a/pkg/pgclient/config.go +++ b/pkg/pgclient/config.go @@ -18,6 +18,7 @@ import ( "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/ingestor/trace" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/version" ) @@ -34,6 +35,7 @@ type Config struct { DbConnectionTimeout time.Duration IgnoreCompressedChunks bool MetricsAsyncAcks bool + MetricsScrapeInterval time.Duration TracesAsyncAcks bool WriteConnections int WriterPoolSize int @@ -107,6 +109,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB") fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created") fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.") + fs.DurationVar(&cfg.MetricsScrapeInterval, "metrics.rollup.scrape-interval", rollup.DefaultScrapeInterval, "Default scrape interval in Prometheus. This is used to estimate samples while choosing rollup for querying.") return cfg } diff --git a/pkg/pgmodel/querier/querier.go b/pkg/pgmodel/querier/querier.go index d6951d08bf..12f19f62f0 100644 --- a/pkg/pgmodel/querier/querier.go +++ b/pkg/pgmodel/querier/querier.go @@ -6,17 +6,21 @@ package querier import ( "context" + "fmt" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/lreader" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/tenancy" ) type pgxQuerier struct { - tools *queryTools + tools *queryTools + schema *rollup.Decider } var _ Querier = (*pgxQuerier)(nil) @@ -29,7 +33,9 @@ func NewQuerier( labelsReader lreader.LabelsReader, exemplarCache cache.PositionCache, rAuth tenancy.ReadAuthorizer, -) Querier { + scrapeInterval time.Duration, + useRollups bool, +) (Querier, error) { querier := &pgxQuerier{ tools: &queryTools{ conn: conn, @@ -39,7 +45,14 @@ func NewQuerier( rAuth: rAuth, }, } - return querier + if useRollups { + decider, err := rollup.NewDecider(context.Background(), conn, scrapeInterval) + if err != nil { + return nil, fmt.Errorf("error creating rollups schema decider: %w", err) + } + querier.schema = decider + } + return querier, nil } func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { @@ -47,7 +60,7 @@ func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { } func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier { - return newQuerySamples(ctx, q) + return newQuerySamples(ctx, q, q.schema) } func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier { diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index ce43cddc1f..a8ee0c582f 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -722,7 +722,12 @@ func TestPGXQuerierQuery(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}} + querier := pgxQuerier{ + &queryTools{ + conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()), + }, + nil, + } result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query) diff --git a/pkg/pgmodel/querier/query_remote_read.go b/pkg/pgmodel/querier/query_remote_read.go index 4644dbecc5..8ac59fffe6 100644 --- a/pkg/pgmodel/querier/query_remote_read.go +++ b/pkg/pgmodel/querier/query_remote_read.go @@ -28,7 +28,7 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro return nil, err } - qrySamples := newQuerySamples(q.ctx, q.pgxQuerier) + qrySamples := newQuerySamples(q.ctx, q.pgxQuerier, nil) sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) if err != nil { return nil, err diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index afd4ce0bf0..3127127b33 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -9,17 +9,20 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" + "github.com/timescale/promscale/pkg/rollup" ) type querySamples struct { *pgxQuerier - ctx context.Context + ctx context.Context + schema *rollup.Decider } -func newQuerySamples(ctx context.Context, qr *pgxQuerier) *querySamples { - return &querySamples{qr, ctx} +func newQuerySamples(ctx context.Context, qr *pgxQuerier, schema *rollup.Decider) *querySamples { + return &querySamples{qr, ctx, schema} } // Select implements the SamplesQuerier interface. It is the entry point for our @@ -39,6 +42,24 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH return nil, nil, fmt.Errorf("get evaluation metadata: %w", err) } + var useRollups bool + if q.schema != nil { + // Querying via rollups is available. + supported := q.schema.SupportsRollup(metadata.metric) // Ensure that rollups for the given metric is supported. + if !supported { + // Rollups for the given metric wasn't supported. Let's refresh and check again. + if err := q.schema.Refresh(); err != nil { + log.Error("msg", "error refreshing schema decider", "error", err.Error()) + } + supported = q.schema.SupportsRollup(metadata.metric) // If supported is still false, then rollups really don't exist for 'metadata.metric'. + } + if supported { + schemaName := q.schema.Decide(mint, maxt) + useRollups = schemaName != rollup.DefaultSchema + } + } + _ = useRollups // To avoid unused error. This will be used in the following PRs for querying rollups. + filter := metadata.timeFilter if metadata.isSingleMetric { // Single vector selector case. diff --git a/pkg/rollup/config.go b/pkg/rollup/config.go index d1794bf3b5..45a054bee4 100644 --- a/pkg/rollup/config.go +++ b/pkg/rollup/config.go @@ -17,6 +17,7 @@ import ( ) const ( + DefaultScrapeInterval = time.Second * 30 setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" // short and long represent system resolutions. diff --git a/pkg/rollup/decider.go b/pkg/rollup/decider.go new file mode 100644 index 0000000000..e570001a7b --- /dev/null +++ b/pkg/rollup/decider.go @@ -0,0 +1,177 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/pgxconn" +) + +const ( + DefaultSchema = "prom_data" + upperLimit = 5000 // Maximum samples allowed + refreshRollupInterval = time.Minute * 30 +) + +type rollupInfo struct { + schemaName string + refreshInterval time.Duration +} + +type Decider struct { + conn pgxconn.PgxConn + ctx context.Context + refreshMtx sync.RWMutex + + scrapeInterval time.Duration + downsamplingEnabled bool + + supportedMetrics map[string]struct{} + rollups []rollupInfo // {schemaName, refreshInterval} in ascending order of interval. Lesser the interval, more the granularity. +} + +func NewDecider(ctx context.Context, conn pgxconn.PgxConn, scrapeInterval time.Duration) (*Decider, error) { + helper := &Decider{ + ctx: ctx, + conn: conn, + scrapeInterval: scrapeInterval, + } + if err := helper.runRefreshRoutine(refreshRollupInterval); err != nil { + return nil, fmt.Errorf("refresh: %w", err) + } + return helper, nil +} + +// Decide returns the schema name of the rollups that should be used for querying. +// The returned schema represents a downsampled Prometheus data that should provide optimal +// granularity for querying. +// +// If no rollups exists or if downsampling is disabled, DefaultSchema (i.e., "prom_data") is returned. +func (h *Decider) Decide(minTs, maxTs int64) string { + h.refreshMtx.RLock() + defer h.refreshMtx.RUnlock() + + if !h.downsamplingEnabled || len(h.rollups) == 0 { + return DefaultSchema + } + estimateSamples := func(interval time.Duration) int64 { + return int64(float64(maxTs-minTs) / interval.Seconds()) + } + + numRawSamples := estimateSamples(h.scrapeInterval) + if numRawSamples < upperLimit { + return DefaultSchema + } + + for _, info := range h.rollups { + samples := estimateSamples(info.refreshInterval) // Interval between 2 samples. + if samples < upperLimit { + // h.rollups is sorted by interval. So, the first rollup that is below upper limit is our answer. + // This is because it gives the maximum granularity while being in acceptable limits. + return info.schemaName + } + } + // All rollups are above upper limit. Hence, send the schema of the highest interval so the granularity + // is minimum and we do not affect the performance of PromQL engine. + highestInterval := h.rollups[len(h.rollups)-1] + return highestInterval.schemaName +} + +func (h *Decider) SupportsRollup(metricName string) bool { + _, rollupExists := h.supportedMetrics[metricName] + return rollupExists +} + +func (h *Decider) Refresh() error { + h.refreshMtx.Lock() + defer h.refreshMtx.Unlock() + + if err := h.refreshDownsamplingState(); err != nil { + return fmt.Errorf("downsampling state: %w", err) + } + if err := h.refreshSupportedMetrics(); err != nil { + return fmt.Errorf("metric-type: %w", err) + } + if err := h.refreshRollup(); err != nil { + return fmt.Errorf("rollup: %w", err) + } + return nil +} + +func (h *Decider) runRefreshRoutine(refreshInterval time.Duration) error { + if err := h.Refresh(); err != nil { + return fmt.Errorf("refresh: %w", err) + } + go func() { + t := time.NewTicker(refreshInterval) + defer t.Stop() + for { + select { + case <-h.ctx.Done(): + return + case <-t.C: + } + if err := h.Refresh(); err != nil { + log.Error("msg", "error refreshing rollups", "error", err.Error()) + } + } + }() + return nil +} + +func (h *Decider) refreshDownsamplingState() error { + var state bool + if err := h.conn.QueryRow(h.ctx, "SELECT prom_api.get_automatic_downsample()::BOOLEAN").Scan(&state); err != nil { + return fmt.Errorf("fetching automatic downsampling state: %w", err) + } + h.downsamplingEnabled = state + return nil +} + +const supportedMetricsQuery = `SELECT m.metric_name AS supported_metrics FROM _prom_catalog.metric_rollup mr INNER JOIN _prom_catalog.metric m ON mr.metric_id = m.id GROUP BY supported_metrics;` + +func (h *Decider) refreshSupportedMetrics() error { + rows, err := h.conn.Query(h.ctx, supportedMetricsQuery) + if err != nil { + return fmt.Errorf("fetching supported metrics for rollups: %w", err) + } + defer rows.Close() + + h.supportedMetrics = make(map[string]struct{}) // metric_name: metric_type + for rows.Next() { + var supportedMetric string + err = rows.Scan(&supportedMetric) + if err != nil { + return fmt.Errorf("error scanning the fetched supported metric: %w", err) + } + h.supportedMetrics[supportedMetric] = struct{}{} + } + return nil +} + +func (h *Decider) refreshRollup() error { + rows, err := h.conn.Query(h.ctx, "SELECT schema_name, resolution FROM _prom_catalog.rollup ORDER BY resolution ASC") + if err != nil { + return fmt.Errorf("fetching rollup: %w", err) + } + defer rows.Close() + h.rollups = []rollupInfo{} + for rows.Next() { + var ( + schemaName string + refreshInterval time.Duration + ) + if err = rows.Scan(&schemaName, &refreshInterval); err != nil { + return fmt.Errorf("error scanning rows: %w", err) + } + h.rollups = append(h.rollups, rollupInfo{schemaName: schemaName, refreshInterval: refreshInterval}) + } + return nil +} diff --git a/pkg/rollup/decider_test.go b/pkg/rollup/decider_test.go new file mode 100644 index 0000000000..7e6b88b7be --- /dev/null +++ b/pkg/rollup/decider_test.go @@ -0,0 +1,113 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDecider(t *testing.T) { + r := &Decider{ + scrapeInterval: DefaultScrapeInterval, + downsamplingEnabled: true, + rollups: []rollupInfo{ + {"5_minute", 5 * time.Minute}, + {"15_minute", 15 * time.Minute}, + {"1_hour", time.Hour}, + {"1_week", 7 * 24 * time.Hour}, + }, + } + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: DefaultSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: DefaultSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: DefaultSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: DefaultSchema, + }, { + // DRY RUN + // ------- + // + // Assumed default scrape interval being 30 secs + // raw -> 2,880 <-- Falls in the acceptable range. + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: DefaultSchema, + }, + { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup intervals, num samples: + // 5 mins -> 2,016 <-- Falls in the acceptable range. + // 15 mins -> 672 + // 1 hour -> 168 + // 1 week -> 1 + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "5_minute", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "15_minute", + }, { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup intervals, num samples: + // 5 mins -> 1,051,200 <-- Falls in the acceptable range. + // 15 mins -> 35,040 + // 1 hour -> 8,760 + // 1 week -> 52 + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + schemaName := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, schemaName, tc.name) + }) + } +} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 2c3a141f35..af8164c21e 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -165,16 +165,18 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error cfg.APICfg.MultiTenancy = multiTenancy } + var useRollups bool if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) + cfg, err := ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } + useRollups = *cfg.Metrics.Rollup.Enabled } // client has to be initiated after migrate since migrate // can change database GUC settings - client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly) + client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly, useRollups) if err != nil { return nil, fmt.Errorf("client creation error: %w", err) } @@ -226,13 +228,15 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) error { +func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) (*dataset.Config, error) { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { - return err + return nil, err } - - return cfg.Apply(ctx, conn) + if err = cfg.Apply(ctx, conn); err != nil { + return nil, fmt.Errorf("error applying dataset config: %w", err) + } + return &cfg, nil } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index e88e8aa959..463f757289 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -10,6 +10,8 @@ var ( PromscaleExtensionContainer string ) +const rollupsDBImage = "ghcr.io/timescale/dev_promscale_extension:rollups-development-ts2.8-pg14" + func init() { content, err := os.ReadFile("../../../EXTENSION_VERSION") if err != nil { @@ -17,5 +19,6 @@ func init() { } PromscaleExtensionVersion = strings.TrimSpace(string(content)) - PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + //PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + PromscaleExtensionContainer = rollupsDBImage // This will be removed once we plan to merge with master. } diff --git a/pkg/tests/end_to_end_tests/alerts_test.go b/pkg/tests/end_to_end_tests/alerts_test.go index 9f7902c670..4fbecab42c 100644 --- a/pkg/tests/end_to_end_tests/alerts_test.go +++ b/pkg/tests/end_to_end_tests/alerts_test.go @@ -47,7 +47,7 @@ func TestAlerts(t *testing.T) { MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false, false) require.NoError(t, err) defer pgClient.Close() err = pgClient.InitPromQLEngine(&query.Config{ diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 56883b6d29..3dbec38a17 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -209,7 +209,8 @@ WITH (timescaledb.continuous) AS lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/exemplar_test.go b/pkg/tests/end_to_end_tests/exemplar_test.go index 2cedc067d8..3e9382a51d 100644 --- a/pkg/tests/end_to_end_tests/exemplar_test.go +++ b/pkg/tests/end_to_end_tests/exemplar_test.go @@ -205,11 +205,12 @@ func TestExemplarQueryingAPI(t *testing.T) { // since the return will be 0, as they have already been ingested by TestExemplarIngestion. labelsReader := lreader.NewLabelsReader(pgxconn.NewPgxConn(db), cache.NewLabelsCache(cache.DefaultConfig), tenancy.NewNoopAuthorizer().ReadAuthorizer()) - r := querier.NewQuerier( + r, err := querier.NewQuerier( pgxconn.NewPgxConn(db), cache.NewMetricCache(cache.DefaultConfig), labelsReader, - cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil) + cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) // Query all exemplars corresponding to metric_2 histogram. diff --git a/pkg/tests/end_to_end_tests/multi_tenancy_test.go b/pkg/tests/end_to_end_tests/multi_tenancy_test.go index 956bcb497e..999e359385 100644 --- a/pkg/tests/end_to_end_tests/multi_tenancy_test.go +++ b/pkg/tests/end_to_end_tests/multi_tenancy_test.go @@ -36,7 +36,7 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -56,7 +56,8 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -224,7 +225,7 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -256,7 +257,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a valid tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -382,7 +384,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{} @@ -413,7 +416,7 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -461,7 +464,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a non-tenant ----- expectedResult := []prompb.TimeSeries{ @@ -550,7 +554,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{ { @@ -627,7 +632,7 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -659,7 +664,8 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-b) ----- expectedResult := []prompb.TimeSeries{ @@ -759,7 +765,7 @@ func TestMultiTenancyLabelNamesValues(t *testing.T) { ts, _ := generateSmallMultiTenantTimeseries() withDB(t, *testDatabase, func(db *pgxpool.Pool, tb testing.TB) { getClient := func(auth tenancy.Authorizer) *pgclient.Client { - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, auth, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, auth, false, false) require.NoError(t, err) return client } diff --git a/pkg/tests/end_to_end_tests/nan_test.go b/pkg/tests/end_to_end_tests/nan_test.go index 578923bf4e..95b0d2b9a3 100644 --- a/pkg/tests/end_to_end_tests/nan_test.go +++ b/pkg/tests/end_to_end_tests/nan_test.go @@ -14,6 +14,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" _ "github.com/jackc/pgx/v4/stdlib" "github.com/prometheus/prometheus/model/value" + "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/clockcache" "github.com/timescale/promscale/pkg/internal/testhelpers" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -129,7 +130,8 @@ func TestSQLStaleNaN(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(c.query) if err != nil { t.Fatalf("unexpected error while ingesting test dataset: %s", err) diff --git a/pkg/tests/end_to_end_tests/null_chars_test.go b/pkg/tests/end_to_end_tests/null_chars_test.go index 073a7f9a7f..acf6b50e2f 100644 --- a/pkg/tests/end_to_end_tests/null_chars_test.go +++ b/pkg/tests/end_to_end_tests/null_chars_test.go @@ -67,7 +67,8 @@ func TestOperationWithNullChars(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { diff --git a/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go b/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go index 363db64b0b..a57dab1031 100644 --- a/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go +++ b/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go @@ -311,7 +311,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config, authWrapper m MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, nil, tenancy.NewNoopAuthorizer(), cfg.ReadOnly) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, nil, tenancy.NewNoopAuthorizer(), cfg.ReadOnly, false) if err != nil { return nil, pgClient, fmt.Errorf("cannot run test, cannot instantiate pgClient: %w", err) } diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index 0f75d24b40..9fda0862b0 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -114,8 +114,9 @@ func TestDroppedViewQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) - _, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) + _, err = r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, @@ -692,7 +693,8 @@ func TestSQLQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { resp, err := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1099,7 +1101,8 @@ func TestPromQL(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { connResp, connErr := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1300,7 +1303,8 @@ func TestPushdownDelta(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { @@ -1375,7 +1379,8 @@ func TestPushdownVecSel(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/rollup_query_helper_test.go b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go new file mode 100644 index 0000000000..828a6655bf --- /dev/null +++ b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go @@ -0,0 +1,102 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" +) + +func TestRollupQueryHelper(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + _, err := db.Exec(context.Background(), "SELECT prom_api.set_automatic_downsample($1)", true) + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('short', interval '5 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('medium', interval '15 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('long', interval '1 hour', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('very_long', interval '1 week', interval '30 days')") + require.NoError(t, err) + + var numRollups int + err = db.QueryRow(context.Background(), "SELECT count(*) FROM _prom_catalog.rollup").Scan(&numRollups) + require.NoError(t, err) + require.Equal(t, 4, numRollups) + + helper, err := rollup.NewDecider(context.Background(), pgxconn.NewPgxConn(db), rollup.DefaultScrapeInterval) + require.NoError(t, err) + require.NotNil(t, helper) + + const originalSchema = "prom_data" + + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: originalSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: originalSchema, + }, { + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: originalSchema, + }, + { + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "ps_short", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "ps_medium", + }, { + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, + } + for _, tc := range tcs { + recommendedSchema := helper.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, recommendedSchema, tc.name) + } + }) +} diff --git a/pkg/tests/end_to_end_tests/rules_test.go b/pkg/tests/end_to_end_tests/rules_test.go index 56b2be39df..5b558bfbe0 100644 --- a/pkg/tests/end_to_end_tests/rules_test.go +++ b/pkg/tests/end_to_end_tests/rules_test.go @@ -35,7 +35,7 @@ func TestRecordingRulesEval(t *testing.T) { MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false, false) require.NoError(t, err) defer pgClient.Close() err = pgClient.InitPromQLEngine(&query.Config{