From e44d86dcacfa11ea3a9554ae5a4353292593fcd5 Mon Sep 17 00:00:00 2001 From: James Guthrie Date: Mon, 19 Sep 2022 14:25:05 +0200 Subject: [PATCH 1/3] Switch to time-based epoch for series cache This is a companion change to [1]. That change modifies the database schema to switch to a time-based epoch. This change propagates the time-based epoch into the Promscale connector. For more details about the rationale behind this change, see [1]. [1]: https://github.com/timescale/promscale_extension/pull/512 --- docker-compose/docker-compose.yaml | 4 +- .../high-availability/docker-compose.yaml | 5 +- pkg/pgmodel/ingestor/copier.go | 6 +- pkg/pgmodel/ingestor/dispatcher.go | 18 ++--- pkg/pgmodel/ingestor/handler_test.go | 3 +- pkg/pgmodel/ingestor/ingestor_sql_test.go | 69 ++++++++++--------- pkg/pgmodel/ingestor/metric_batcher_test.go | 3 +- pkg/pgmodel/ingestor/series_writer.go | 11 +-- pkg/pgmodel/model/batch_visitor.go | 14 ++-- pkg/pgmodel/model/series.go | 34 ++++++--- pkg/runner/client.go | 6 ++ .../end_to_end_tests/continuous_agg_test.go | 4 +- pkg/tests/end_to_end_tests/drop_test.go | 10 +-- scripts/end_to_end_tests.sh | 1 + 14 files changed, 114 insertions(+), 74 deletions(-) diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index 38726b28d7..d4c4a41895 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -42,6 +43,7 @@ services: PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317" PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1" PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true otel-collector: platform: linux/amd64 diff --git a/docker-compose/high-availability/docker-compose.yaml b/docker-compose/high-availability/docker-compose.yaml index 60baa70d25..bed5b4d9b6 100644 --- a/docker-compose/high-availability/docker-compose.yaml +++ b/docker-compose/high-availability/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -44,6 +45,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true promscale-connector2: image: timescale/promscale:latest @@ -61,6 +63,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true node_exporter: image: quay.io/prometheus/node-exporter diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 84ff81e5a8..b8b6b67457 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -348,7 +348,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re var sampleRows [][]interface{} var exemplarRows [][]interface{} insertStart := time.Now() - lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) + lowestEpoch := pgmodel.NewSeriesEpoch(pgmodel.MaxDate) lowestMinTime := int64(math.MaxInt64) tx, err := conn.BeginTx(ctx) if err != nil { @@ -410,7 +410,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re return err, lowestMinTime } epoch := visitor.LowestEpoch() - if epoch < lowestEpoch { + if lowestEpoch.After(epoch) { lowestEpoch = epoch } minTime := visitor.MinTime() @@ -475,7 +475,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re //thus we don't need row locking here. Note by doing this check at the end we can //have some wasted work for the inserts before this fails but this is rare. //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch)) + row := tx.QueryRow(ctx, "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", lowestEpoch.Time()) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..105d76990b 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -27,7 +27,7 @@ import ( const ( finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" - getEpochSQL = "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1" + getEpochSQL = "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1" ) var ErrDispatcherClosed = fmt.Errorf("dispatcher is closed") @@ -132,7 +132,7 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() { } func (p *pgxDispatcher) runSeriesEpochSync() { - epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) + epoch, err := p.refreshSeriesEpoch(nil) // we don't have any great place to report errors, and if the // connection recovers we can still make progress, so we'll just log it // and continue execution @@ -152,16 +152,16 @@ func (p *pgxDispatcher) runSeriesEpochSync() { } } -func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { +func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch *model.SeriesEpoch) (*model.SeriesEpoch, error) { dbEpoch, err := p.getServerEpoch() if err != nil { // Trash the cache just in case an epoch change occurred, seems safer p.scache.Reset() // Also trash the inverted labels cache, which can also be invalidated when the series cache is p.invertedLabelsCache.Reset() - return model.InvalidSeriesEpoch, err + return nil, err } - if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { + if existingEpoch == nil || *dbEpoch != *existingEpoch { p.scache.Reset() // If the series cache needs to be invalidated, so does the inverted labels cache p.invertedLabelsCache.Reset() @@ -169,15 +169,15 @@ func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (mod return dbEpoch, nil } -func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { - var newEpoch int64 +func (p *pgxDispatcher) getServerEpoch() (*model.SeriesEpoch, error) { + var newEpoch time.Time row := p.conn.QueryRow(context.Background(), getEpochSQL) err := row.Scan(&newEpoch) if err != nil { - return -1, err + return nil, err } - return model.SeriesEpoch(newEpoch), nil + return model.NewSeriesEpoch(newEpoch), nil } func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index 07b08b3b24..adac54b6b5 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -6,6 +6,7 @@ package ingestor import ( "testing" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -74,7 +75,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, 4) + setSeries.SetSeriesID(5, model.NewSeriesEpoch(time.Now())) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..8b0c41e796 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -62,6 +62,7 @@ func (c sVisitor) VisitSeries(cb func(info *pgmodel.MetricInfo, s *pgmodel.Serie func TestPGXInserterInsertSeries(t *testing.T) { // Set test env so that cache metrics uses a new registry and avoid panic on duplicate register. require.NoError(t, os.Setenv("IS_TEST", "true")) + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) testCases := []struct { name string series []labels.Labels @@ -82,9 +83,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -132,9 +133,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -184,9 +185,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -233,9 +234,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -309,7 +310,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { si, se, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se > 0, "epoch not set") + require.True(t, se.Time().Equal(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), "epoch not set") } } }) @@ -328,14 +329,17 @@ func TestPGXInserterCacheReset(t *testing.T) { }, } + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) + newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC) + sqlQueries := []model.SqlQuery{ // first series cache fetch {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -369,26 +373,26 @@ func TestPGXInserterCacheReset(t *testing.T) { // first labels cache refresh, does not trash { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, // second labels cache refresh, trash the cache { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "BEGIN;"}, // repopulate the cache { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -477,7 +481,7 @@ func TestPGXInserterCacheReset(t *testing.T) { } // refreshing during the same epoch gives the same IDs without checking the DB - _, err = inserter.refreshSeriesEpoch(1) + _, err = inserter.refreshSeriesEpoch(model.NewSeriesEpoch(initialTime)) require.NoError(t, err) samples = makeSamples(series) @@ -498,7 +502,7 @@ func TestPGXInserterCacheReset(t *testing.T) { } // trash the cache - _, err = inserter.refreshSeriesEpoch(1) + _, err = inserter.refreshSeriesEpoch(model.NewSeriesEpoch(initialTime)) require.NoError(t, err) // retrying rechecks the DB and uses the new IDs @@ -529,9 +533,10 @@ func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } + testTime := time.Now() makeLabel := func() *model.Series { l := &model.Series{} - l.SetSeriesID(1, 1) + l.SetSeriesID(1, model.NewSeriesEpoch(testTime)) return l } @@ -575,8 +580,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -620,8 +625,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -680,8 +685,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -710,8 +715,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -833,8 +838,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -900,8 +905,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..7f0e595684 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -136,7 +137,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), 1) + l.SetSeriesID(pgmodel.SeriesID(seriesID), model.NewSeriesEpoch(time.Now())) return l } var workFinished sync.WaitGroup diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 44ae573f5b..a8448dd4c9 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "fmt" + "time" "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" @@ -142,7 +143,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi continue } - //transaction per metric to avoid cross-metric locks + // transaction per metric to avoid cross-metric locks batch.Queue("BEGIN;") batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet) batch.Queue("COMMIT;") @@ -196,7 +197,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi return nil } -func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) { +func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (*model.SeriesEpoch, error) { _, span := tracer.Default().Start(ctx, "fill-label-ids") defer span.End() //we cannot use the label cache here because that maps label ids => name, value. @@ -204,7 +205,7 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe //we may want a new cache for that, at a later time. batch := h.conn.NewBatch() - var dbEpoch model.SeriesEpoch + var dbEpoch *model.SeriesEpoch // The epoch will never decrease, so we can check it once at the beginning, // at worst we'll store too small an epoch, which is always safe @@ -257,10 +258,12 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe if _, err := br.Exec(); err != nil { return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) } - err = br.QueryRow().Scan(&dbEpoch) + var epochTime time.Time + err = br.QueryRow().Scan(&epochTime) if err != nil { return dbEpoch, fmt.Errorf("error filling labels: %w", err) } + dbEpoch = pgmodel.NewSeriesEpoch(epochTime) if _, err := br.Exec(); err != nil { return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err) } diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index bb8461a4f4..f3fb84cb85 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -11,17 +11,19 @@ import ( type batchVisitor struct { batch *Batch - lowestEpoch SeriesEpoch + lowestEpoch *SeriesEpoch minTime int64 } +var MaxDate = time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC) + func getBatchVisitor(batch *Batch) *batchVisitor { - return &batchVisitor{batch, SeriesEpoch(math.MaxInt64), math.MaxInt64} + return &batchVisitor{batch, NewSeriesEpoch(MaxDate), math.MaxInt64} } // LowestEpoch returns the lowest epoch value encountered while visiting insertables. // It must be called after Visit() has completed. -func (vtr *batchVisitor) LowestEpoch() SeriesEpoch { +func (vtr *batchVisitor) LowestEpoch() *SeriesEpoch { return vtr.lowestEpoch } @@ -35,11 +37,11 @@ func (vtr *batchVisitor) Visit( ) error { var ( seriesId SeriesID - seriesEpoch SeriesEpoch + seriesEpoch *SeriesEpoch err error ) - updateEpoch := func(epoch SeriesEpoch) { - if epoch < vtr.lowestEpoch { + updateEpoch := func(epoch *SeriesEpoch) { + if vtr.lowestEpoch.After(epoch) { vtr.lowestEpoch = epoch } } diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 1bc3702013..6b3c18a44f 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "time" "unsafe" "github.com/timescale/promscale/pkg/prompb" @@ -19,8 +20,7 @@ import ( type SeriesID int64 const ( - invalidSeriesID = -1 - InvalidSeriesEpoch = -1 + invalidSeriesID = -1 ) func (s SeriesID) String() string { @@ -28,7 +28,23 @@ func (s SeriesID) String() string { } // SeriesEpoch represents the series epoch -type SeriesEpoch int64 +type SeriesEpoch struct { + time time.Time +} + +func NewSeriesEpoch(epochTime time.Time) *SeriesEpoch { + return &SeriesEpoch{ + time: epochTime, + } +} + +func (s *SeriesEpoch) After(o *SeriesEpoch) bool { + return s.time.After(o.time) +} + +func (s *SeriesEpoch) Time() time.Time { + return s.time +} // Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { @@ -38,7 +54,7 @@ type Series struct { names []string values []string seriesID SeriesID - epoch SeriesEpoch + epoch *SeriesEpoch metricName string str string @@ -50,7 +66,7 @@ func NewSeries(key string, labelPairs []prompb.Label) *Series { values: make([]string, len(labelPairs)), str: key, seriesID: invalidSeriesID, - epoch: InvalidSeriesEpoch, + epoch: nil, } for i, l := range labelPairs { series.names[i] = l.Name @@ -108,22 +124,22 @@ func (l *Series) FinalSizeBytes() uint64 { return uint64(unsafe.Sizeof(*l)) + uint64(len(l.str)+len(l.metricName)) // #nosec } -func (l *Series) GetSeriesID() (SeriesID, SeriesEpoch, error) { +func (l *Series) GetSeriesID() (SeriesID, *SeriesEpoch, error) { l.lock.RLock() defer l.lock.RUnlock() switch l.seriesID { case invalidSeriesID: - return 0, 0, fmt.Errorf("Series id not set") + return 0, nil, fmt.Errorf("Series id not set") case 0: - return 0, 0, fmt.Errorf("Series id invalid") + return 0, nil, fmt.Errorf("Series id invalid") default: return l.seriesID, l.epoch, nil } } //note this has to be idempotent -func (l *Series) SetSeriesID(sid SeriesID, eid SeriesEpoch) { +func (l *Series) SetSeriesID(sid SeriesID, eid *SeriesEpoch) { l.lock.Lock() defer l.lock.Unlock() l.seriesID = sid diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 63dbd49f4c..2ef3a51eb6 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -87,6 +87,12 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error log.Info("msg", "Migration successful, exiting") return nil, nil } + + // Bump the current epoch if it was still set to the initial value. + _, err = conn.Exec(context.Background(), "SELECT _prom_catalog.initialize_current_epoch()") + if err != nil { + return nil, err + } } else { log.Info("msg", "Skipping migration") } diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 850140e590..7d61e1f023 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -274,7 +274,7 @@ WITH (timescaledb.continuous) AS t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error("error fetching series marked for deletion count", err) } @@ -314,7 +314,7 @@ WITH (timescaledb.continuous) AS t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error("error fetching series marked for deletion count", err) } diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index bd9fdf8e3e..1eaf1b56a5 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -497,7 +497,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Errorf("unexpected series count: %v @ %v", count, loc) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(loc, err) } @@ -545,7 +545,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Errorf("unexpected series count: %v @ %v", count, loc) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(loc, err) } @@ -608,7 +608,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Errorf("unexpected series count: %v @ %v", count, loc) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(loc, err) } @@ -753,7 +753,7 @@ func TestSQLDropAllMetricData(t *testing.T) { t.Errorf("unexpected series count: %v", count) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(err) } @@ -817,7 +817,7 @@ func TestSQLDropAllMetricData(t *testing.T) { t.Errorf("unexpected series count: %v", count) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(err) } diff --git a/scripts/end_to_end_tests.sh b/scripts/end_to_end_tests.sh index 80531b6e93..8ac5c94352 100755 --- a/scripts/end_to_end_tests.sh +++ b/scripts/end_to_end_tests.sh @@ -88,6 +88,7 @@ PROMSCALE_DB_PASSWORD=postgres \ PROMSCALE_DB_NAME=postgres \ PROMSCALE_DB_SSL_MODE=disable \ PROMSCALE_WEB_TELEMETRY_PATH=/metrics \ +PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS=true \ ./promscale -startup.only docker exec e2e-tsdb psql -U postgres -d postgres \ From fac8f094acba7bc24ecb42e7c141ecaa33c9ac48 Mon Sep 17 00:00:00 2001 From: James Guthrie Date: Fri, 30 Sep 2022 15:22:58 +0200 Subject: [PATCH 2/3] Alternative implementation with timestamp in unix epoch format --- pkg/pgmodel/ingestor/copier.go | 2 +- pkg/pgmodel/ingestor/dispatcher.go | 4 +-- pkg/pgmodel/ingestor/handler_test.go | 2 +- pkg/pgmodel/ingestor/ingestor_sql_test.go | 36 +++++++++---------- pkg/pgmodel/ingestor/metric_batcher_test.go | 2 +- pkg/pgmodel/ingestor/series_writer.go | 4 +-- pkg/pgmodel/model/batch_visitor.go | 2 +- pkg/pgmodel/model/series.go | 9 +++-- pkg/runner/client.go | 2 +- .../end_to_end_tests/continuous_agg_test.go | 4 +-- pkg/tests/end_to_end_tests/drop_test.go | 10 +++--- 11 files changed, 37 insertions(+), 40 deletions(-) diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index b8b6b67457..4b6d7d4ac4 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -475,7 +475,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re //thus we don't need row locking here. Note by doing this check at the end we can //have some wasted work for the inserts before this fails but this is rare. //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", lowestEpoch.Time()) + row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", lowestEpoch.Time()) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index 105d76990b..9784cbd54c 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -27,7 +27,7 @@ import ( const ( finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" - getEpochSQL = "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1" + getEpochSQL = "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1" ) var ErrDispatcherClosed = fmt.Errorf("dispatcher is closed") @@ -170,7 +170,7 @@ func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch *model.SeriesEpoch) (*m } func (p *pgxDispatcher) getServerEpoch() (*model.SeriesEpoch, error) { - var newEpoch time.Time + var newEpoch int64 row := p.conn.QueryRow(context.Background(), getEpochSQL) err := row.Scan(&newEpoch) if err != nil { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index adac54b6b5..32036f733f 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -75,7 +75,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, model.NewSeriesEpoch(time.Now())) + setSeries.SetSeriesID(5, model.NewSeriesEpoch(time.Now().Unix())) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index 8b0c41e796..c5eabb25a8 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -83,7 +83,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{initialTime}}, Err: error(nil), @@ -133,7 +133,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{initialTime}}, Err: error(nil), @@ -185,7 +185,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{initialTime}}, Err: error(nil), @@ -234,7 +234,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { sqlQueries: []model.SqlQuery{ {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{initialTime}}, Err: error(nil), @@ -310,7 +310,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { si, se, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se.Time().Equal(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), "epoch not set") + require.True(t, se.Time() == time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), "epoch not set") } } }) @@ -329,15 +329,15 @@ func TestPGXInserterCacheReset(t *testing.T) { }, } - initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) - newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC) + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC).Unix() sqlQueries := []model.SqlQuery{ // first series cache fetch {Sql: "BEGIN;"}, { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{initialTime}}, Err: error(nil), @@ -373,7 +373,7 @@ func TestPGXInserterCacheReset(t *testing.T) { // first labels cache refresh, does not trash { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{initialTime}}, Err: error(nil), @@ -381,7 +381,7 @@ func TestPGXInserterCacheReset(t *testing.T) { // second labels cache refresh, trash the cache { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{newTime}}, Err: error(nil), @@ -390,7 +390,7 @@ func TestPGXInserterCacheReset(t *testing.T) { // repopulate the cache { - Sql: "SELECT current_epoch FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), Results: model.RowResults{{newTime}}, Err: error(nil), @@ -533,7 +533,7 @@ func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } - testTime := time.Now() + testTime := time.Now().Unix() makeLabel := func() *model.Series { l := &model.Series{} l.SetSeriesID(1, model.NewSeriesEpoch(testTime)) @@ -580,7 +580,7 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), @@ -625,7 +625,7 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), @@ -685,7 +685,7 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), @@ -715,7 +715,7 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), @@ -838,7 +838,7 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), @@ -905,7 +905,7 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE $1::TIMESTAMPTZ <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.global_epoch LIMIT 1", + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index 7f0e595684..d17cb38676 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -137,7 +137,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), model.NewSeriesEpoch(time.Now())) + l.SetSeriesID(pgmodel.SeriesID(seriesID), model.NewSeriesEpoch(time.Now().Unix())) return l } var workFinished sync.WaitGroup diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index a8448dd4c9..c826881392 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,8 +7,6 @@ package ingestor import ( "context" "fmt" - "time" - "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -258,7 +256,7 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe if _, err := br.Exec(); err != nil { return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) } - var epochTime time.Time + var epochTime int64 err = br.QueryRow().Scan(&epochTime) if err != nil { return dbEpoch, fmt.Errorf("error filling labels: %w", err) diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index f3fb84cb85..db02b00085 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -15,7 +15,7 @@ type batchVisitor struct { minTime int64 } -var MaxDate = time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC) +var MaxDate int64 = math.MaxInt64 func getBatchVisitor(batch *Batch) *batchVisitor { return &batchVisitor{batch, NewSeriesEpoch(MaxDate), math.MaxInt64} diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 6b3c18a44f..7147f071b5 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -9,7 +9,6 @@ import ( "strconv" "strings" "sync" - "time" "unsafe" "github.com/timescale/promscale/pkg/prompb" @@ -29,20 +28,20 @@ func (s SeriesID) String() string { // SeriesEpoch represents the series epoch type SeriesEpoch struct { - time time.Time + time int64 } -func NewSeriesEpoch(epochTime time.Time) *SeriesEpoch { +func NewSeriesEpoch(epochTime int64) *SeriesEpoch { return &SeriesEpoch{ time: epochTime, } } func (s *SeriesEpoch) After(o *SeriesEpoch) bool { - return s.time.After(o.time) + return s.time > o.time } -func (s *SeriesEpoch) Time() time.Time { +func (s *SeriesEpoch) Time() int64 { return s.time } diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 2ef3a51eb6..f4dbc45c32 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -89,7 +89,7 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error } // Bump the current epoch if it was still set to the initial value. - _, err = conn.Exec(context.Background(), "SELECT _prom_catalog.initialize_current_epoch()") + _, err = conn.Exec(context.Background(), "SELECT _prom_catalog.initialize_current_epoch(now())") if err != nil { return nil, err } diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 7d61e1f023..850140e590 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -274,7 +274,7 @@ WITH (timescaledb.continuous) AS t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error("error fetching series marked for deletion count", err) } @@ -314,7 +314,7 @@ WITH (timescaledb.continuous) AS t.Errorf("unexpected series count: got %v, wanted %v", afterCount, seriesCount) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error("error fetching series marked for deletion count", err) } diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index 1eaf1b56a5..bd9fdf8e3e 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -497,7 +497,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Errorf("unexpected series count: %v @ %v", count, loc) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(loc, err) } @@ -545,7 +545,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Errorf("unexpected series count: %v @ %v", count, loc) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(loc, err) } @@ -608,7 +608,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Errorf("unexpected series count: %v @ %v", count, loc) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(loc, err) } @@ -753,7 +753,7 @@ func TestSQLDropAllMetricData(t *testing.T) { t.Errorf("unexpected series count: %v", count) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(err) } @@ -817,7 +817,7 @@ func TestSQLDropAllMetricData(t *testing.T) { t.Errorf("unexpected series count: %v", count) } - err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE mark_for_deletion_epoch IS NOT NULL`).Scan(&count) + err = db.QueryRow(context.Background(), `SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL`).Scan(&count) if err != nil { t.Error(err) } From c498b091e248590c1cf009fea8d235f24f61eda1 Mon Sep 17 00:00:00 2001 From: James Guthrie Date: Mon, 3 Oct 2022 11:32:29 +0200 Subject: [PATCH 3/3] Add logging for cache reset --- pkg/pgmodel/ingestor/dispatcher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index 9784cbd54c..601497ce46 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -154,7 +154,9 @@ func (p *pgxDispatcher) runSeriesEpochSync() { func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch *model.SeriesEpoch) (*model.SeriesEpoch, error) { dbEpoch, err := p.getServerEpoch() + log.Info("msg", "Refreshing series cache epoch") if err != nil { + log.Info("msg", "An error occurred refreshing, will reset series and inverted labels caches") // Trash the cache just in case an epoch change occurred, seems safer p.scache.Reset() // Also trash the inverted labels cache, which can also be invalidated when the series cache is @@ -162,6 +164,7 @@ func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch *model.SeriesEpoch) (*m return nil, err } if existingEpoch == nil || *dbEpoch != *existingEpoch { + log.Info("msg", "The local epoch is no longer up-to-date, will reset series and inverted labels caches") p.scache.Reset() // If the series cache needs to be invalidated, so does the inverted labels cache p.invertedLabelsCache.Reset()