diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index d4e9964c76..948da83a56 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -42,6 +43,7 @@ services: PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317" PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1" PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true otel-collector: platform: linux/amd64 diff --git a/docker-compose/high-availability/docker-compose.yaml b/docker-compose/high-availability/docker-compose.yaml index 60baa70d25..bed5b4d9b6 100644 --- a/docker-compose/high-availability/docker-compose.yaml +++ b/docker-compose/high-availability/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -44,6 +45,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true promscale-connector2: image: timescale/promscale:latest @@ -61,6 +63,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true node_exporter: image: quay.io/prometheus/node-exporter diff --git a/pkg/clockcache/cache.go b/pkg/clockcache/cache.go index ea67802744..5b26cdd433 100644 --- a/pkg/clockcache/cache.go +++ b/pkg/clockcache/cache.go @@ -415,3 +415,47 @@ func packUsedAndTimestamp(used bool, ts int32) uint32 { } return uint32(usedBit<<31) | uint32(ts) } + +// RemoveMatchingItems removes items whose value passes the check applied by +// the passed `matcher` function. It returns the number of entries which were +// removed from the cache. +// Note: Due to the locks that this function takes, it is best used when very +// few items stored in the cache are to be removed. +func (self *Cache) RemoveMatchingItems(matcher func(value interface{}) bool) int { + // Note: we take an RLock to find matching items, then release it and take + // a full Lock to remove them. This has two assumptions: + // 1. len(matchingItems) << len(storage): taking a big lock should be fine. + // 2. Missing the addition of an item to the cache is not problematic. + // Locking like this doesn't provide a consistent view of the cache + // contents. Either an entry is removed or added under our feet. An + // entry being removed is not a problem, we will just skip it. An entry + // being added is fine for the one consumer of this method (cache + // invalidation): if an item is added to the cache, then it was just + // created/resurrected in the DB, so it wouldn't have been a candidate + // for removal. + + self.elementsLock.RLock() + matchingItems := make([]interface{}, 0) + for k, v := range self.elements { + if matcher(v.value) { + matchingItems = append(matchingItems, k) + } + } + self.elementsLock.RUnlock() + self.elementsLock.Lock() + for i := range matchingItems { + self.remove(i) + } + self.elementsLock.Unlock() + return len(matchingItems) +} + +func (self *Cache) remove(key interface{}) { + value, present := self.elements[key] + if !present { + // do nothing + return + } + delete(self.elements, key) + *value = element{} +} diff --git a/pkg/pgmodel/cache/series_cache.go b/pkg/pgmodel/cache/series_cache.go index a3b505a8d6..e502754199 100644 --- a/pkg/pgmodel/cache/series_cache.go +++ b/pkg/pgmodel/cache/series_cache.go @@ -30,22 +30,27 @@ var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting eleme // SeriesCache is a cache of model.Series entries. type SeriesCache interface { - Reset() + Reset(epoch model.SeriesEpoch) GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error) Len() int Cap() int Evictions() uint64 + EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int + CacheEpoch() model.SeriesEpoch + SetCacheEpoch(epoch model.SeriesEpoch) } type SeriesCacheImpl struct { cache *clockcache.Cache maxSizeBytes uint64 + cacheEpoch model.SeriesEpoch } func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { cache := &SeriesCacheImpl{ clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize), config.SeriesCacheMemoryMaxBytes, + model.SeriesEpoch(0), } if sigClose != nil { @@ -54,6 +59,14 @@ func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { return cache } +func (t *SeriesCacheImpl) CacheEpoch() model.SeriesEpoch { + return t.cacheEpoch +} + +func (t *SeriesCacheImpl) SetCacheEpoch(epoch model.SeriesEpoch) { + t.cacheEpoch = epoch +} + func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) { ticker := time.NewTicker(growCheckDuration) for { @@ -115,8 +128,11 @@ func (t *SeriesCacheImpl) Evictions() uint64 { } // Reset should be concurrency-safe -func (t *SeriesCacheImpl) Reset() { +func (t *SeriesCacheImpl) Reset(epoch model.SeriesEpoch) { t.cache.Reset() + // When we reset the cache, we need to reset the cache epoch as well, + // otherwise it may be out of date as soon as data is loaded into it. + t.SetCacheEpoch(epoch) } // Get the canonical version of a series if one exists. @@ -277,3 +293,28 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model return series, metricName, nil } + +func (t *SeriesCacheImpl) EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int { + if len(seriesIds) == 0 { + return 0 + } + seriesIdMap := make(map[model.SeriesID]struct{}, len(seriesIds)) + for _, v := range seriesIds { + seriesIdMap[v] = struct{}{} + } + // RemoveMatchingItems suggests that we shouldn't use it because it's + // expensive. In normal operation (with ~30 day retention), we don't expect + // our cache to contain a lot of the series that we want to remove. This + // matches well with RemoveMatchingItems locking mechanism. See the impl. + count := t.cache.RemoveMatchingItems(func(v interface{}) bool { + value := v.(*model.Series) + id, err := value.GetSeriesID() + if err == nil { + return false + } + _, present := seriesIdMap[id] + return present + }) + t.SetCacheEpoch(epoch) + return count +} diff --git a/pkg/pgmodel/ingestor/buffer.go b/pkg/pgmodel/ingestor/buffer.go index 2a4e6659e4..3049d71b5c 100644 --- a/pkg/pgmodel/ingestor/buffer.go +++ b/pkg/pgmodel/ingestor/buffer.go @@ -79,4 +79,5 @@ func (p *pendingBuffer) release() { func (p *pendingBuffer) addReq(req *insertDataRequest) { p.needsResponse = append(p.needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan}) p.batch.AppendSlice(req.data) + p.batch.UpdateSeriesCacheEpoch(req.seriesCacheEpoch) } diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 84ff81e5a8..cd4a8eab3d 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -144,6 +144,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e defer span.End() batch := copyBatch(insertBatch) err := sw.PopulateOrCreateSeries(ctx, batch) + if err != nil { return fmt.Errorf("copier: writing series: %w", err) } @@ -348,8 +349,8 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re var sampleRows [][]interface{} var exemplarRows [][]interface{} insertStart := time.Now() - lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) lowestMinTime := int64(math.MaxInt64) + minCacheEpoch := pgmodel.SeriesEpoch(math.MaxInt64) tx, err := conn.BeginTx(ctx) if err != nil { return fmt.Errorf("failed to start transaction for inserting metrics: %v", err), lowestMinTime @@ -364,6 +365,10 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re for r := range reqs { req := &reqs[r] + epoch := req.data.batch.SeriesCacheEpoch() + if minCacheEpoch.After(epoch) { + minCacheEpoch = epoch + } // Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry) // Samples inside series should be sorted by Prometheus // We sort after PopulateOrCreateSeries call because we now have guarantees that all seriesIDs have been populated @@ -409,10 +414,6 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re if err != nil { return err, lowestMinTime } - epoch := visitor.LowestEpoch() - if epoch < lowestEpoch { - lowestEpoch = epoch - } minTime := visitor.MinTime() if minTime < lowestMinTime { lowestMinTime = minTime @@ -471,11 +472,12 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re } } - //note the epoch increment takes an access exclusive on the table before incrementing. - //thus we don't need row locking here. Note by doing this check at the end we can - //have some wasted work for the inserts before this fails but this is rare. - //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch)) + // Note: The epoch increment takes an access exclusive on the table before + // incrementing, thus we don't need row locking here. By doing this check + // at the end we can have some wasted work for the inserts before this + // fails but this is rare. Avoiding an additional loop or memoization to + // find the lowest epoch ahead of time seems worth it. + row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", minCacheEpoch) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..77820bc3ef 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -26,8 +26,9 @@ import ( ) const ( - finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" - getEpochSQL = "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1" + finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" + getEpochSQL = "SELECT current_epoch, COALESCE(delete_epoch, 0) FROM _prom_catalog.ids_epoch LIMIT 1" + getStaleSeriesIDsArraySQL = "SELECT ARRAY_AGG(id) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL" ) var ErrDispatcherClosed = fmt.Errorf("dispatcher is closed") @@ -59,6 +60,27 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac numCopiers = 1 } + var dbCurrentEpoch int64 + // Bump the current epoch if it was still set to the initial value, and + // initialize the cache's epoch. Initializing the cache's epoch is crucial + // to ensure that ingestion will abort if a stale cache entry is present. + row := conn.QueryRow(context.Background(), "SELECT _prom_catalog.initialize_current_epoch(now())") + err := row.Scan(&dbCurrentEpoch) + if err != nil { + return nil, err + } + log.Info("msg", "Initializing epoch", "epoch", dbCurrentEpoch) + scache.SetCacheEpoch(model.SeriesEpoch(dbCurrentEpoch)) + + var epochDuration time.Duration + row = conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL") + err = row.Scan(&epochDuration) + if err != nil { + return nil, err + } + var seriesEpochRefresh = epochDuration / 3 // This seems like a good enough rule of thumb + log.Info("msd", "Setting series epoch refresh", "series_epoch_refresh", seriesEpochRefresh) + // the copier read request channel retains the queue order between metrics maxMetrics := 10000 copierReadRequestCh := make(chan readRequest, maxMetrics) @@ -80,7 +102,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac if err != nil { return nil, err } - sw := NewSeriesWriter(conn, labelArrayOID, labelsCache) + sw := NewSeriesWriter(conn, labelArrayOID, labelsCache, scache) elf := NewExamplarLabelFormatter(conn, eCache) for i := 0; i < numCopiers; i++ { @@ -97,7 +119,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac asyncAcks: cfg.MetricsAsyncAcks, copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval - seriesEpochRefresh: time.NewTicker(30 * time.Minute), + seriesEpochRefresh: time.NewTicker(seriesEpochRefresh), doneChannel: make(chan struct{}), closed: uber_atomic.NewBool(false), } @@ -132,52 +154,82 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() { } func (p *pgxDispatcher) runSeriesEpochSync() { - epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) - // we don't have any great place to report errors, and if the - // connection recovers we can still make progress, so we'll just log it - // and continue execution - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } + p.refreshSeriesEpoch() for { select { case <-p.seriesEpochRefresh.C: - epoch, err = p.refreshSeriesEpoch(epoch) - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } + p.refreshSeriesEpoch() case <-p.doneChannel: return } } } -func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { - dbEpoch, err := p.getServerEpoch() +func (p *pgxDispatcher) getDatabaseEpochs() (model.SeriesEpoch, model.SeriesEpoch, error) { + var dbCurrentEpoch, dbDeleteEpoch int64 + row := p.conn.QueryRow(context.Background(), getEpochSQL) + err := row.Scan(&dbCurrentEpoch, &dbDeleteEpoch) if err != nil { + return 0, 0, err + } + return model.SeriesEpoch(dbCurrentEpoch), model.SeriesEpoch(dbDeleteEpoch), nil +} + +func (p *pgxDispatcher) refreshSeriesEpoch() { + log.Info("msg", "Refreshing series cache epoch") + cacheCurrentEpoch := p.scache.CacheEpoch() + dbCurrentEpoch, dbDeleteEpoch, err := p.getDatabaseEpochs() + if err != nil { + log.Warn("msg", "Unable to get database epoch data, will reset series and inverted labels caches", "err", err.Error()) // Trash the cache just in case an epoch change occurred, seems safer - p.scache.Reset() + p.scache.Reset(dbCurrentEpoch) // Also trash the inverted labels cache, which can also be invalidated when the series cache is p.invertedLabelsCache.Reset() - return model.InvalidSeriesEpoch, err + return } - if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { - p.scache.Reset() - // If the series cache needs to be invalidated, so does the inverted labels cache + if cacheCurrentEpoch.After(dbCurrentEpoch) { + log.Warn("msg", "The connector's cache epoch is greater than the databases. This is unexpected", "connector_current_epoch", cacheCurrentEpoch, "database_current_epoch", dbCurrentEpoch) + return + } + if dbDeleteEpoch.AfterEq(cacheCurrentEpoch) { + // The current cache epoch has been overtaken by the database' + // delete_epoch. + // The only way to recover from this situation is to reset our caches + // and let them repopulate. + log.Warn("msg", "Cache epoch was overtaken by the database's delete epoch", "cache_epoch", cacheCurrentEpoch, "delete_epoch", dbDeleteEpoch) + p.scache.Reset(dbCurrentEpoch) + p.invertedLabelsCache.Reset() + return + } else { + start := time.Now() + staleSeriesIds, err := GetStaleSeriesIDs(p.conn) + if err != nil { + log.Warn("msg", "Error getting series ids, unable to update series cache", "err", err.Error()) + return + } + log.Info("msg", "Epoch change noticed, fetched stale series from db", "count", len(staleSeriesIds), "duration", time.Since(start)) + start = time.Now() + evictCount := p.scache.EvictSeriesById(staleSeriesIds, dbCurrentEpoch) + log.Info("msg", "Removed stale series", "count", evictCount, "duration", time.Since(start)) // Before merging in master, change the level to Debug. + // Trash the inverted labels cache. Technically, we could determine + // which label entries need to be removed from the cache, as we have + // done for the series cache, but that doesn't seem to really be worth + // it. p.invertedLabelsCache.Reset() } - return dbEpoch, nil } -func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { - var newEpoch int64 - row := p.conn.QueryRow(context.Background(), getEpochSQL) - err := row.Scan(&newEpoch) +func GetStaleSeriesIDs(conn pgxconn.PgxConn) ([]model.SeriesID, error) { + staleSeriesIDs := make([]int64, 0) + err := conn.QueryRow(context.Background(), getStaleSeriesIDsArraySQL).Scan(&staleSeriesIDs) if err != nil { - return -1, err + return nil, fmt.Errorf("error getting stale series ids from db: %w", err) } - - return model.SeriesEpoch(newEpoch), nil + staleSeriesIdsAsSeriesId := make([]model.SeriesID, len(staleSeriesIDs)) + for i := range staleSeriesIDs { + staleSeriesIdsAsSeriesId[i] = model.SeriesID(staleSeriesIDs[i]) + } + return staleSeriesIdsAsSeriesId, nil } func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { @@ -225,6 +277,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 numRows uint64 maxt int64 rows = dataTS.Rows + seriesEpoch = dataTS.SeriesCacheEpoch workFinished = new(sync.WaitGroup) ) workFinished.Add(len(rows)) @@ -240,7 +293,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, seriesCacheEpoch: seriesEpoch, data: data, finished: workFinished, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -330,11 +383,12 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } type insertDataRequest struct { - spanCtx trace.SpanContext - metric string - finished *sync.WaitGroup - data []model.Insertable - errChan chan error + spanCtx trace.SpanContext + metric string + seriesCacheEpoch model.SeriesEpoch + finished *sync.WaitGroup + data []model.Insertable + errChan chan error } func (idr *insertDataRequest) reportResult(err error) { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index 07b08b3b24..2fb0175569 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -5,12 +5,11 @@ package ingestor import ( - "testing" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/model" + "testing" ) func getSeries(t *testing.T, scache *cache.SeriesCacheImpl, labels labels.Labels) *model.Series { @@ -74,7 +73,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, 4) + setSeries.SetSeriesID(5) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 262fd3f59f..dcfbad9bcd 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -193,6 +193,14 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p insertables = make(map[string][]model.Insertable) ) + // Determine the value of the series cache epoch _before_ getting any items + // from the cache. In order to properly abort the insert transaction if the + // cache is out of date (epoch abort), we want to determine the minimum + // cache epoch for all samples in an ingest batch. We know that everything + // currently in the cache has this epoch, and everything that we load from + // the database later will have _at least_ that epoch. + epoch := ingestor.sCache.CacheEpoch() + for i := range timeseries { var ( err error @@ -206,6 +214,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } // Normalize and canonicalize ts.Labels. // After this point ts.Labels should never be used again. + series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels) if err != nil { return 0, err @@ -237,7 +246,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } releaseMem() - numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now()}) + numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now(), SeriesCacheEpoch: epoch}) if errSamples == nil && numInsertablesIngested != totalRowsExpected { return numInsertablesIngested, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRowsExpected, numInsertablesIngested) } diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..6a7c9571c9 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -62,6 +62,7 @@ func (c sVisitor) VisitSeries(cb func(info *pgmodel.MetricInfo, s *pgmodel.Serie func TestPGXInserterInsertSeries(t *testing.T) { // Set test env so that cache metrics uses a new registry and avoid panic on duplicate register. require.NoError(t, os.Setenv("IS_TEST", "true")) + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) testCases := []struct { name string series []labels.Labels @@ -84,7 +85,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -134,7 +135,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -186,7 +187,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -235,7 +236,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -277,7 +278,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { scache := cache.NewSeriesCache(cache.DefaultConfig, nil) scache.Reset() lCache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lCache) + sw := NewSeriesWriter(mock, 0, lCache, scache) lsi := make([]model.Insertable, 0) for _, ser := range c.series { @@ -288,7 +289,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { lsi = append(lsi, model.NewPromExemplars(ls, nil)) } - err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(lsi)) + _, err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(lsi)) if err != nil { foundErr := false for _, q := range c.sqlQueries { @@ -306,10 +307,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { if err == nil { for _, si := range lsi { - si, se, err := si.Series().GetSeriesID() + si, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se > 0, "epoch not set") } } }) @@ -328,6 +328,9 @@ func TestPGXInserterCacheReset(t *testing.T) { }, } + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC).Unix() + sqlQueries := []model.SqlQuery{ // first series cache fetch @@ -335,7 +338,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -371,7 +374,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, @@ -379,7 +382,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "BEGIN;"}, @@ -388,7 +391,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -435,7 +438,7 @@ func TestPGXInserterCacheReset(t *testing.T) { mock := model.NewSqlRecorder(sqlQueries, t) scache := cache.NewSeriesCache(cache.DefaultConfig, nil) lcache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lcache) + sw := NewSeriesWriter(mock, 0, lcache, scache) inserter := pgxDispatcher{ conn: mock, scache: scache, @@ -455,7 +458,7 @@ func TestPGXInserterCacheReset(t *testing.T) { } samples := makeSamples(series) - err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) + _, err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) if err != nil { t.Fatal(err) } @@ -469,7 +472,7 @@ func TestPGXInserterCacheReset(t *testing.T) { _, _, ok := si.Series().NameValues() require.False(t, ok) expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() + gotId, err := si.Series().GetSeriesID() require.NoError(t, err) if gotId != expectedId { t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) @@ -477,11 +480,11 @@ func TestPGXInserterCacheReset(t *testing.T) { } // refreshing during the same epoch gives the same IDs without checking the DB - _, err = inserter.refreshSeriesEpoch(1) + inserter.refreshSeriesEpoch() require.NoError(t, err) samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) + _, err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) if err != nil { t.Fatal(err) } @@ -490,7 +493,7 @@ func TestPGXInserterCacheReset(t *testing.T) { _, _, ok := si.Series().NameValues() require.False(t, ok) expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() + gotId, err := si.Series().GetSeriesID() require.NoError(t, err) if gotId != expectedId { t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) @@ -498,12 +501,12 @@ func TestPGXInserterCacheReset(t *testing.T) { } // trash the cache - _, err = inserter.refreshSeriesEpoch(1) + inserter.refreshSeriesEpoch() require.NoError(t, err) // retrying rechecks the DB and uses the new IDs samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) + _, err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) if err != nil { t.Fatal(err) } @@ -517,7 +520,7 @@ func TestPGXInserterCacheReset(t *testing.T) { _, _, ok := si.Series().NameValues() require.False(t, ok) expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() + gotId, err := si.Series().GetSeriesID() require.NoError(t, err) if gotId != expectedId { t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) @@ -529,9 +532,10 @@ func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } + testTime := time.Now().Unix() makeLabel := func() *model.Series { l := &model.Series{} - l.SetSeriesID(1, 1) + l.SetSeriesID(1) return l } @@ -575,8 +579,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -620,8 +624,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -680,8 +684,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -710,8 +714,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -833,8 +837,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -900,8 +904,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 8431165c39..6bfec941b2 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..407c639333 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -136,7 +136,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), 1) + l.SetSeriesID(pgmodel.SeriesID(seriesID)) return l } var workFinished sync.WaitGroup @@ -154,7 +154,7 @@ func TestSendBatches(t *testing.T) { // we make sure that we receive batch data for i := 0; i < 3; i++ { - id, _, err := batch.data.batch.Data()[i].Series().GetSeriesID() + id, err := batch.data.batch.Data()[i].Series().GetSeriesID() if err != nil { t.Fatal(err) } diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 44ae573f5b..8b1cb63ae7 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -27,6 +26,7 @@ type seriesWriter struct { conn pgxconn.PgxConn labelArrayOID uint32 labelsCache *cache.InvertedLabelsCache + seriesCache cache.SeriesCache } type SeriesVisitor interface { @@ -35,8 +35,8 @@ type SeriesVisitor interface { func labelArrayTranscoder() pgtype.ValueTranscoder { return &pgtype.Int4Array{} } -func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache) *seriesWriter { - return &seriesWriter{conn, labelArrayOID, labelsCache} +func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache, seriesCache cache.SeriesCache) *seriesWriter { + return &seriesWriter{conn, labelArrayOID, labelsCache, seriesCache} } type perMetricInfo struct { @@ -59,6 +59,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi defer span.End() infos := make(map[string]*perMetricInfo) seriesCount := 0 + err := sv.VisitSeries(func(metricInfo *pgmodel.MetricInfo, series *model.Series) error { if !series.IsSeriesIDSet() { metricName := series.MetricName() @@ -124,7 +125,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi //the labels for multiple series in same txn as we are creating the series, //the ordering of label creation can only be canonical within a series and //not across series. - dbEpoch, err := h.fillLabelIDs(ctx, infos, labelMap) + err = h.fillLabelIDs(ctx, infos, labelMap) if err != nil { return fmt.Errorf("error setting series ids: %w", err) } @@ -142,7 +143,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi continue } - //transaction per metric to avoid cross-metric locks + // transaction per metric to avoid cross-metric locks batch.Queue("BEGIN;") batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet) batch.Queue("COMMIT;") @@ -176,7 +177,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi if err != nil { return fmt.Errorf("error setting series_id: cannot scan series_id: %w", err) } - info.series[int(ordinality)-1].SetSeriesID(id, dbEpoch) + info.series[int(ordinality)-1].SetSeriesID(id) count++ } if err := res.Err(); err != nil { @@ -196,7 +197,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi return nil } -func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) { +func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { _, span := tracer.Default().Start(ctx, "fill-label-ids") defer span.End() //we cannot use the label cache here because that maps label ids => name, value. @@ -204,13 +205,6 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe //we may want a new cache for that, at a later time. batch := h.conn.NewBatch() - var dbEpoch model.SeriesEpoch - - // The epoch will never decrease, so we can check it once at the beginning, - // at worst we'll store too small an epoch, which is always safe - batch.Queue("BEGIN;") - batch.Queue(getEpochSQL) - batch.Queue("COMMIT;") infoBatches := make([]*perMetricInfo, 0) items := 0 @@ -226,11 +220,11 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe } namesSlice, err := names.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing names: %w", err) + return fmt.Errorf("error filling labels: slicing names: %w", err) } valuesSlice, err := values.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing values: %w", err) + return fmt.Errorf("error filling labels: slicing values: %w", err) } batch.Queue("BEGIN;") batch.Queue("SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", metricName, info.metricInfo.TableName, namesSlice, valuesSlice) @@ -250,25 +244,14 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe br, err := h.conn.SendBatch(context.Background(), batch) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) + return fmt.Errorf("error filling labels: %w", err) } defer br.Close() - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) - } - err = br.QueryRow().Scan(&dbEpoch) - if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) - } - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err) - } - var count int for _, info := range infoBatches { if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin label batch: %w", err) + return fmt.Errorf("error filling labels on begin label batch: %w", err) } err := func() error { @@ -306,16 +289,16 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe return nil }() if err != nil { - return dbEpoch, err + return err } if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit label batch: %w", err) + return fmt.Errorf("error filling labels on commit label batch: %w", err) } } if count != items { - return dbEpoch, fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) + return fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) } - return dbEpoch, nil + return nil } func (h *seriesWriter) buildLabelArrays(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { diff --git a/pkg/pgmodel/model/batch.go b/pkg/pgmodel/model/batch.go index 57e5c7370d..97cc1fca31 100644 --- a/pkg/pgmodel/model/batch.go +++ b/pkg/pgmodel/model/batch.go @@ -6,6 +6,7 @@ package model import ( "fmt" + "math" "time" "github.com/timescale/promscale/pkg/log" @@ -14,21 +15,26 @@ import ( // Data wraps incoming data with its in-timestamp. It is used to warn if the rate // of incoming samples vs outgoing samples is too low, based on time. type Data struct { - Rows map[string][]Insertable - ReceivedTime time.Time + Rows map[string][]Insertable + ReceivedTime time.Time + SeriesCacheEpoch SeriesEpoch } // Batch is an iterator over a collection of Insertables that returns // data in the format expected for the data table row. type Batch struct { - data []Insertable - numSamples int - numExemplars int + data []Insertable + seriesCacheEpoch SeriesEpoch + numSamples int + numExemplars int } // NewBatch returns a new batch that can hold samples and exemplars. func NewBatch() Batch { - si := Batch{data: make([]Insertable, 0)} + si := Batch{ + data: make([]Insertable, 0), + seriesCacheEpoch: SeriesEpoch(math.MaxInt64), + } return si } @@ -37,7 +43,7 @@ func (t *Batch) Reset() { // nil all pointers to prevent memory leaks t.data[i] = nil } - *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0} + *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0, seriesCacheEpoch: SeriesEpoch(math.MaxInt64)} } func (t *Batch) CountSeries() int { @@ -73,6 +79,16 @@ func (t *Batch) Absorb(other Batch) { t.AppendSlice(other.data) } +func (t *Batch) SeriesCacheEpoch() SeriesEpoch { + return t.seriesCacheEpoch +} + +func (t *Batch) UpdateSeriesCacheEpoch(epoch SeriesEpoch) { + if t.seriesCacheEpoch.After(epoch) { + t.seriesCacheEpoch = epoch + } +} + func (t *Batch) Len() int { return t.CountSeries() } @@ -82,11 +98,11 @@ func (t *Batch) Swap(i, j int) { } func (t *Batch) Less(i, j int) bool { - s1, _, err := t.data[i].Series().GetSeriesID() + s1, err := t.data[i].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } - s2, _, err := t.data[j].Series().GetSeriesID() + s2, err := t.data[j].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index bb8461a4f4..4b77eaa5ff 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -10,19 +10,14 @@ import ( ) type batchVisitor struct { - batch *Batch - lowestEpoch SeriesEpoch - minTime int64 + batch *Batch + minTime int64 } -func getBatchVisitor(batch *Batch) *batchVisitor { - return &batchVisitor{batch, SeriesEpoch(math.MaxInt64), math.MaxInt64} -} +var MaxDate int64 = math.MaxInt64 -// LowestEpoch returns the lowest epoch value encountered while visiting insertables. -// It must be called after Visit() has completed. -func (vtr *batchVisitor) LowestEpoch() SeriesEpoch { - return vtr.lowestEpoch +func getBatchVisitor(batch *Batch) *batchVisitor { + return &batchVisitor{batch, math.MaxInt64} } func (vtr *batchVisitor) MinTime() int64 { @@ -34,28 +29,20 @@ func (vtr *batchVisitor) Visit( visitExemplars func(t time.Time, v float64, seriesId int64, lvalues []string), ) error { var ( - seriesId SeriesID - seriesEpoch SeriesEpoch - err error + seriesId SeriesID + err error ) - updateEpoch := func(epoch SeriesEpoch) { - if epoch < vtr.lowestEpoch { - vtr.lowestEpoch = epoch - } - } for _, insertable := range vtr.batch.data { updateMinTs := func(t int64) { if t < vtr.minTime { vtr.minTime = t } } - seriesId, seriesEpoch, err = insertable.Series().GetSeriesID() + seriesId, err = insertable.Series().GetSeriesID() if err != nil { return fmt.Errorf("get series-id: %w", err) } - updateEpoch(seriesEpoch) - switch insertable.Type() { case Sample: itr := insertable.Iterator().(SamplesIterator) diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 1bc3702013..4650d8c7fd 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -19,8 +19,7 @@ import ( type SeriesID int64 const ( - invalidSeriesID = -1 - InvalidSeriesEpoch = -1 + invalidSeriesID = -1 ) func (s SeriesID) String() string { @@ -30,6 +29,14 @@ func (s SeriesID) String() string { // SeriesEpoch represents the series epoch type SeriesEpoch int64 +func (s SeriesEpoch) After(o SeriesEpoch) bool { + return s > o +} + +func (s SeriesEpoch) AfterEq(o SeriesEpoch) bool { + return s > o +} + // Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { //protects names, values, seriesID, epoch @@ -38,7 +45,6 @@ type Series struct { names []string values []string seriesID SeriesID - epoch SeriesEpoch metricName string str string @@ -50,7 +56,6 @@ func NewSeries(key string, labelPairs []prompb.Label) *Series { values: make([]string, len(labelPairs)), str: key, seriesID: invalidSeriesID, - epoch: InvalidSeriesEpoch, } for i, l := range labelPairs { series.names[i] = l.Name @@ -108,26 +113,25 @@ func (l *Series) FinalSizeBytes() uint64 { return uint64(unsafe.Sizeof(*l)) + uint64(len(l.str)+len(l.metricName)) // #nosec } -func (l *Series) GetSeriesID() (SeriesID, SeriesEpoch, error) { +func (l *Series) GetSeriesID() (SeriesID, error) { l.lock.RLock() defer l.lock.RUnlock() switch l.seriesID { case invalidSeriesID: - return 0, 0, fmt.Errorf("Series id not set") + return 0, fmt.Errorf("Series id not set") case 0: - return 0, 0, fmt.Errorf("Series id invalid") + return 0, fmt.Errorf("Series id invalid") default: - return l.seriesID, l.epoch, nil + return l.seriesID, nil } } -//note this has to be idempotent -func (l *Series) SetSeriesID(sid SeriesID, eid SeriesEpoch) { +// Note: This has to be idempotent +func (l *Series) SetSeriesID(sid SeriesID) { l.lock.Lock() defer l.lock.Unlock() l.seriesID = sid - l.epoch = eid l.names = nil l.values = nil } diff --git a/pkg/rules/adapters/ingest.go b/pkg/rules/adapters/ingest.go index 1b7206f796..df6c40f2c0 100644 --- a/pkg/rules/adapters/ingest.go +++ b/pkg/rules/adapters/ingest.go @@ -3,6 +3,7 @@ package adapters import ( "context" "fmt" + "math" "time" "github.com/pkg/errors" @@ -30,9 +31,10 @@ func NewIngestAdapter(inserter ingestor.DBInserter) *ingestAdapter { } type appenderAdapter struct { - data map[string][]model.Insertable - inserter ingestor.DBInserter - closed bool + data map[string][]model.Insertable + inserter ingestor.DBInserter + closed bool + seriesCacheEpoch model.SeriesEpoch } // Appender creates a new appender for Prometheus rules manager. @@ -47,8 +49,9 @@ type appenderAdapter struct { // Note: The rule manager does not call Rollback() yet. func (a ingestAdapter) Appender(_ context.Context) storage.Appender { return &appenderAdapter{ - data: make(map[string][]model.Insertable), - inserter: a.inserter, + data: make(map[string][]model.Insertable), + inserter: a.inserter, + seriesCacheEpoch: model.SeriesEpoch(math.MaxInt64), } } @@ -60,6 +63,10 @@ func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64 if err != nil { return 0, fmt.Errorf("get ingestor: %w", err) } + cacheEpoch := dbIngestor.SeriesCache().CacheEpoch() + if app.seriesCacheEpoch.After(cacheEpoch) { + app.seriesCacheEpoch = cacheEpoch + } series, metricName, err := dbIngestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l)) if err != nil { return 0, fmt.Errorf("get series from protos: %w", err) @@ -96,7 +103,7 @@ func (app *appenderAdapter) Commit() error { if err != nil { return fmt.Errorf("get ingestor: %w", err) } - numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()}) + numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now(), SeriesCacheEpoch: app.seriesCacheEpoch}) if err == nil { samplesIngested.Add(float64(numInsertablesIngested)) } diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index bd9fdf8e3e..d0187c7ddb 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -659,7 +659,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Error("expected ingest to fail due to old epoch") } - scache.Reset() + scache.Reset(pgmodel.SeriesEpoch(time.Now().Unix())) ingestor.Close() ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) diff --git a/scripts/end_to_end_tests.sh b/scripts/end_to_end_tests.sh index 80531b6e93..8ac5c94352 100755 --- a/scripts/end_to_end_tests.sh +++ b/scripts/end_to_end_tests.sh @@ -88,6 +88,7 @@ PROMSCALE_DB_PASSWORD=postgres \ PROMSCALE_DB_NAME=postgres \ PROMSCALE_DB_SSL_MODE=disable \ PROMSCALE_WEB_TELEMETRY_PATH=/metrics \ +PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS=true \ ./promscale -startup.only docker exec e2e-tsdb psql -U postgres -d postgres \