diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index 38726b28d7..d4c4a41895 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -42,6 +43,7 @@ services: PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317" PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1" PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true otel-collector: platform: linux/amd64 diff --git a/docker-compose/high-availability/docker-compose.yaml b/docker-compose/high-availability/docker-compose.yaml index 60baa70d25..bed5b4d9b6 100644 --- a/docker-compose/high-availability/docker-compose.yaml +++ b/docker-compose/high-availability/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -44,6 +45,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true promscale-connector2: image: timescale/promscale:latest @@ -61,6 +63,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true node_exporter: image: quay.io/prometheus/node-exporter diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 84ff81e5a8..4b6d7d4ac4 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -348,7 +348,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re var sampleRows [][]interface{} var exemplarRows [][]interface{} insertStart := time.Now() - lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) + lowestEpoch := pgmodel.NewSeriesEpoch(pgmodel.MaxDate) lowestMinTime := int64(math.MaxInt64) tx, err := conn.BeginTx(ctx) if err != nil { @@ -410,7 +410,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re return err, lowestMinTime } epoch := visitor.LowestEpoch() - if epoch < lowestEpoch { + if lowestEpoch.After(epoch) { lowestEpoch = epoch } minTime := visitor.MinTime() @@ -475,7 +475,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re //thus we don't need row locking here. Note by doing this check at the end we can //have some wasted work for the inserts before this fails but this is rare. //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch)) + row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", lowestEpoch.Time()) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..601497ce46 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -132,7 +132,7 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() { } func (p *pgxDispatcher) runSeriesEpochSync() { - epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) + epoch, err := p.refreshSeriesEpoch(nil) // we don't have any great place to report errors, and if the // connection recovers we can still make progress, so we'll just log it // and continue execution @@ -152,16 +152,19 @@ func (p *pgxDispatcher) runSeriesEpochSync() { } } -func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { +func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch *model.SeriesEpoch) (*model.SeriesEpoch, error) { dbEpoch, err := p.getServerEpoch() + log.Info("msg", "Refreshing series cache epoch") if err != nil { + log.Info("msg", "An error occurred refreshing, will reset series and inverted labels caches") // Trash the cache just in case an epoch change occurred, seems safer p.scache.Reset() // Also trash the inverted labels cache, which can also be invalidated when the series cache is p.invertedLabelsCache.Reset() - return model.InvalidSeriesEpoch, err + return nil, err } - if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { + if existingEpoch == nil || *dbEpoch != *existingEpoch { + log.Info("msg", "The local epoch is no longer up-to-date, will reset series and inverted labels caches") p.scache.Reset() // If the series cache needs to be invalidated, so does the inverted labels cache p.invertedLabelsCache.Reset() @@ -169,15 +172,15 @@ func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (mod return dbEpoch, nil } -func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { +func (p *pgxDispatcher) getServerEpoch() (*model.SeriesEpoch, error) { var newEpoch int64 row := p.conn.QueryRow(context.Background(), getEpochSQL) err := row.Scan(&newEpoch) if err != nil { - return -1, err + return nil, err } - return model.SeriesEpoch(newEpoch), nil + return model.NewSeriesEpoch(newEpoch), nil } func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index 07b08b3b24..32036f733f 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -6,6 +6,7 @@ package ingestor import ( "testing" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -74,7 +75,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, 4) + setSeries.SetSeriesID(5, model.NewSeriesEpoch(time.Now().Unix())) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..c5eabb25a8 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -62,6 +62,7 @@ func (c sVisitor) VisitSeries(cb func(info *pgmodel.MetricInfo, s *pgmodel.Serie func TestPGXInserterInsertSeries(t *testing.T) { // Set test env so that cache metrics uses a new registry and avoid panic on duplicate register. require.NoError(t, os.Setenv("IS_TEST", "true")) + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) testCases := []struct { name string series []labels.Labels @@ -84,7 +85,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -134,7 +135,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -186,7 +187,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -235,7 +236,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -309,7 +310,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { si, se, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se > 0, "epoch not set") + require.True(t, se.Time() == time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), "epoch not set") } } }) @@ -328,6 +329,9 @@ func TestPGXInserterCacheReset(t *testing.T) { }, } + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC).Unix() + sqlQueries := []model.SqlQuery{ // first series cache fetch @@ -335,7 +339,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -371,7 +375,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, @@ -379,7 +383,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "BEGIN;"}, @@ -388,7 +392,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -477,7 +481,7 @@ func TestPGXInserterCacheReset(t *testing.T) { } // refreshing during the same epoch gives the same IDs without checking the DB - _, err = inserter.refreshSeriesEpoch(1) + _, err = inserter.refreshSeriesEpoch(model.NewSeriesEpoch(initialTime)) require.NoError(t, err) samples = makeSamples(series) @@ -498,7 +502,7 @@ func TestPGXInserterCacheReset(t *testing.T) { } // trash the cache - _, err = inserter.refreshSeriesEpoch(1) + _, err = inserter.refreshSeriesEpoch(model.NewSeriesEpoch(initialTime)) require.NoError(t, err) // retrying rechecks the DB and uses the new IDs @@ -529,9 +533,10 @@ func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } + testTime := time.Now().Unix() makeLabel := func() *model.Series { l := &model.Series{} - l.SetSeriesID(1, 1) + l.SetSeriesID(1, model.NewSeriesEpoch(testTime)) return l } @@ -575,8 +580,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -620,8 +625,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -680,8 +685,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -710,8 +715,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -833,8 +838,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -900,8 +905,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..d17cb38676 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -136,7 +137,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), 1) + l.SetSeriesID(pgmodel.SeriesID(seriesID), model.NewSeriesEpoch(time.Now().Unix())) return l } var workFinished sync.WaitGroup diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 44ae573f5b..c826881392 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -142,7 +141,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi continue } - //transaction per metric to avoid cross-metric locks + // transaction per metric to avoid cross-metric locks batch.Queue("BEGIN;") batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet) batch.Queue("COMMIT;") @@ -196,7 +195,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi return nil } -func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) { +func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (*model.SeriesEpoch, error) { _, span := tracer.Default().Start(ctx, "fill-label-ids") defer span.End() //we cannot use the label cache here because that maps label ids => name, value. @@ -204,7 +203,7 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe //we may want a new cache for that, at a later time. batch := h.conn.NewBatch() - var dbEpoch model.SeriesEpoch + var dbEpoch *model.SeriesEpoch // The epoch will never decrease, so we can check it once at the beginning, // at worst we'll store too small an epoch, which is always safe @@ -257,10 +256,12 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe if _, err := br.Exec(); err != nil { return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) } - err = br.QueryRow().Scan(&dbEpoch) + var epochTime int64 + err = br.QueryRow().Scan(&epochTime) if err != nil { return dbEpoch, fmt.Errorf("error filling labels: %w", err) } + dbEpoch = pgmodel.NewSeriesEpoch(epochTime) if _, err := br.Exec(); err != nil { return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err) } diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index bb8461a4f4..db02b00085 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -11,17 +11,19 @@ import ( type batchVisitor struct { batch *Batch - lowestEpoch SeriesEpoch + lowestEpoch *SeriesEpoch minTime int64 } +var MaxDate int64 = math.MaxInt64 + func getBatchVisitor(batch *Batch) *batchVisitor { - return &batchVisitor{batch, SeriesEpoch(math.MaxInt64), math.MaxInt64} + return &batchVisitor{batch, NewSeriesEpoch(MaxDate), math.MaxInt64} } // LowestEpoch returns the lowest epoch value encountered while visiting insertables. // It must be called after Visit() has completed. -func (vtr *batchVisitor) LowestEpoch() SeriesEpoch { +func (vtr *batchVisitor) LowestEpoch() *SeriesEpoch { return vtr.lowestEpoch } @@ -35,11 +37,11 @@ func (vtr *batchVisitor) Visit( ) error { var ( seriesId SeriesID - seriesEpoch SeriesEpoch + seriesEpoch *SeriesEpoch err error ) - updateEpoch := func(epoch SeriesEpoch) { - if epoch < vtr.lowestEpoch { + updateEpoch := func(epoch *SeriesEpoch) { + if vtr.lowestEpoch.After(epoch) { vtr.lowestEpoch = epoch } } diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 1bc3702013..7147f071b5 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -19,8 +19,7 @@ import ( type SeriesID int64 const ( - invalidSeriesID = -1 - InvalidSeriesEpoch = -1 + invalidSeriesID = -1 ) func (s SeriesID) String() string { @@ -28,7 +27,23 @@ func (s SeriesID) String() string { } // SeriesEpoch represents the series epoch -type SeriesEpoch int64 +type SeriesEpoch struct { + time int64 +} + +func NewSeriesEpoch(epochTime int64) *SeriesEpoch { + return &SeriesEpoch{ + time: epochTime, + } +} + +func (s *SeriesEpoch) After(o *SeriesEpoch) bool { + return s.time > o.time +} + +func (s *SeriesEpoch) Time() int64 { + return s.time +} // Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { @@ -38,7 +53,7 @@ type Series struct { names []string values []string seriesID SeriesID - epoch SeriesEpoch + epoch *SeriesEpoch metricName string str string @@ -50,7 +65,7 @@ func NewSeries(key string, labelPairs []prompb.Label) *Series { values: make([]string, len(labelPairs)), str: key, seriesID: invalidSeriesID, - epoch: InvalidSeriesEpoch, + epoch: nil, } for i, l := range labelPairs { series.names[i] = l.Name @@ -108,22 +123,22 @@ func (l *Series) FinalSizeBytes() uint64 { return uint64(unsafe.Sizeof(*l)) + uint64(len(l.str)+len(l.metricName)) // #nosec } -func (l *Series) GetSeriesID() (SeriesID, SeriesEpoch, error) { +func (l *Series) GetSeriesID() (SeriesID, *SeriesEpoch, error) { l.lock.RLock() defer l.lock.RUnlock() switch l.seriesID { case invalidSeriesID: - return 0, 0, fmt.Errorf("Series id not set") + return 0, nil, fmt.Errorf("Series id not set") case 0: - return 0, 0, fmt.Errorf("Series id invalid") + return 0, nil, fmt.Errorf("Series id invalid") default: return l.seriesID, l.epoch, nil } } //note this has to be idempotent -func (l *Series) SetSeriesID(sid SeriesID, eid SeriesEpoch) { +func (l *Series) SetSeriesID(sid SeriesID, eid *SeriesEpoch) { l.lock.Lock() defer l.lock.Unlock() l.seriesID = sid diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 63dbd49f4c..f4dbc45c32 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -87,6 +87,12 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error log.Info("msg", "Migration successful, exiting") return nil, nil } + + // Bump the current epoch if it was still set to the initial value. + _, err = conn.Exec(context.Background(), "SELECT _prom_catalog.initialize_current_epoch(now())") + if err != nil { + return nil, err + } } else { log.Info("msg", "Skipping migration") } diff --git a/scripts/end_to_end_tests.sh b/scripts/end_to_end_tests.sh index 80531b6e93..8ac5c94352 100755 --- a/scripts/end_to_end_tests.sh +++ b/scripts/end_to_end_tests.sh @@ -88,6 +88,7 @@ PROMSCALE_DB_PASSWORD=postgres \ PROMSCALE_DB_NAME=postgres \ PROMSCALE_DB_SSL_MODE=disable \ PROMSCALE_WEB_TELEMETRY_PATH=/metrics \ +PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS=true \ ./promscale -startup.only docker exec e2e-tsdb psql -U postgres -d postgres \