From 7e8395fd4b42bab685f1c771031de35ee2ee28a0 Mon Sep 17 00:00:00 2001 From: James Guthrie Date: Mon, 19 Sep 2022 14:25:05 +0200 Subject: [PATCH] Implement series cache invalidation This change implements invalidation of the series cache, and mechanisms to prevent the ingestion of data based on stale cache information. In principle, the cache invalidation mechanism works as follows: In the database, we track two values: `current_epoch`, and `delete_epoch`. These are unix timestamps (which, for reasons of backwards-compatibility, are stored in a BIGINT field), updated every time that rows in the series table are deleted. `current_epoch` is set from `now()`, and `delete_epoch` is set from `now() - epoch_duration`. `epoch_duration` is a configurable parameter. When a series row is to be deleted, instead of immediately deleting it, we set the `delete_epoch` column of the series row to the `current_epoch` timestamp (the time at which we decided that it will be deleted). After `epoch_duration` time elapses, the row is removed. When the connector starts, it reads `current_epoch` from the database and stores this value with the series cache as `cache_current_epoch`. The connector periodically fetches the ids of series rows where `delete_epoch` is not null, together with `current_epoch`. It removes these entries from the cache, and updates `cache_current_epoch` to the value of `current_epoch` which was fetched from the database. As the connector receives samples to be inserted, it tracks the smallest value of `cache_current_epoch` that it saw for that batch. When it comes to inserting the samples in a batch into the database, it asserts (in the database) that the cache which was read from was not stale. This is expressed as: `cache_current_epoch > delete_epoch`. If this is not the case, the insert is aborted. This is a companion change to [1] which implements the database-side logic required for cache invalidation. [1]: https://github.com/timescale/promscale_extension/pull/529 --- docker-compose/docker-compose.yaml | 4 +- .../high-availability/docker-compose.yaml | 5 +- pkg/clockcache/cache.go | 44 ++++++ pkg/pgmodel/cache/series_cache.go | 45 ++++++- pkg/pgmodel/ingestor/buffer.go | 1 + pkg/pgmodel/ingestor/copier.go | 22 +-- pkg/pgmodel/ingestor/dispatcher.go | 126 +++++++++++++----- pkg/pgmodel/ingestor/handler_test.go | 5 +- pkg/pgmodel/ingestor/ingestor.go | 11 +- pkg/pgmodel/ingestor/ingestor_sql_test.go | 72 +++++----- pkg/pgmodel/ingestor/metric_batcher.go | 1 - pkg/pgmodel/ingestor/metric_batcher_test.go | 4 +- pkg/pgmodel/ingestor/series_writer.go | 49 +++---- pkg/pgmodel/model/batch.go | 34 +++-- pkg/pgmodel/model/batch_visitor.go | 29 ++-- pkg/pgmodel/model/series.go | 26 ++-- pkg/rules/adapters/ingest.go | 19 ++- pkg/tests/end_to_end_tests/drop_test.go | 2 +- scripts/end_to_end_tests.sh | 1 + 19 files changed, 328 insertions(+), 172 deletions(-) diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index d4e9964c76..948da83a56 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -42,6 +43,7 @@ services: PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317" PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1" PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true otel-collector: platform: linux/amd64 diff --git a/docker-compose/high-availability/docker-compose.yaml b/docker-compose/high-availability/docker-compose.yaml index 60baa70d25..bed5b4d9b6 100644 --- a/docker-compose/high-availability/docker-compose.yaml +++ b/docker-compose/high-availability/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when Promscale extension 0.7.0 published + image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -44,6 +45,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true promscale-connector2: image: timescale/promscale:latest @@ -61,6 +63,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true node_exporter: image: quay.io/prometheus/node-exporter diff --git a/pkg/clockcache/cache.go b/pkg/clockcache/cache.go index ea67802744..5b26cdd433 100644 --- a/pkg/clockcache/cache.go +++ b/pkg/clockcache/cache.go @@ -415,3 +415,47 @@ func packUsedAndTimestamp(used bool, ts int32) uint32 { } return uint32(usedBit<<31) | uint32(ts) } + +// RemoveMatchingItems removes items whose value passes the check applied by +// the passed `matcher` function. It returns the number of entries which were +// removed from the cache. +// Note: Due to the locks that this function takes, it is best used when very +// few items stored in the cache are to be removed. +func (self *Cache) RemoveMatchingItems(matcher func(value interface{}) bool) int { + // Note: we take an RLock to find matching items, then release it and take + // a full Lock to remove them. This has two assumptions: + // 1. len(matchingItems) << len(storage): taking a big lock should be fine. + // 2. Missing the addition of an item to the cache is not problematic. + // Locking like this doesn't provide a consistent view of the cache + // contents. Either an entry is removed or added under our feet. An + // entry being removed is not a problem, we will just skip it. An entry + // being added is fine for the one consumer of this method (cache + // invalidation): if an item is added to the cache, then it was just + // created/resurrected in the DB, so it wouldn't have been a candidate + // for removal. + + self.elementsLock.RLock() + matchingItems := make([]interface{}, 0) + for k, v := range self.elements { + if matcher(v.value) { + matchingItems = append(matchingItems, k) + } + } + self.elementsLock.RUnlock() + self.elementsLock.Lock() + for i := range matchingItems { + self.remove(i) + } + self.elementsLock.Unlock() + return len(matchingItems) +} + +func (self *Cache) remove(key interface{}) { + value, present := self.elements[key] + if !present { + // do nothing + return + } + delete(self.elements, key) + *value = element{} +} diff --git a/pkg/pgmodel/cache/series_cache.go b/pkg/pgmodel/cache/series_cache.go index a3b505a8d6..e502754199 100644 --- a/pkg/pgmodel/cache/series_cache.go +++ b/pkg/pgmodel/cache/series_cache.go @@ -30,22 +30,27 @@ var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting eleme // SeriesCache is a cache of model.Series entries. type SeriesCache interface { - Reset() + Reset(epoch model.SeriesEpoch) GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error) Len() int Cap() int Evictions() uint64 + EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int + CacheEpoch() model.SeriesEpoch + SetCacheEpoch(epoch model.SeriesEpoch) } type SeriesCacheImpl struct { cache *clockcache.Cache maxSizeBytes uint64 + cacheEpoch model.SeriesEpoch } func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { cache := &SeriesCacheImpl{ clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize), config.SeriesCacheMemoryMaxBytes, + model.SeriesEpoch(0), } if sigClose != nil { @@ -54,6 +59,14 @@ func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { return cache } +func (t *SeriesCacheImpl) CacheEpoch() model.SeriesEpoch { + return t.cacheEpoch +} + +func (t *SeriesCacheImpl) SetCacheEpoch(epoch model.SeriesEpoch) { + t.cacheEpoch = epoch +} + func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) { ticker := time.NewTicker(growCheckDuration) for { @@ -115,8 +128,11 @@ func (t *SeriesCacheImpl) Evictions() uint64 { } // Reset should be concurrency-safe -func (t *SeriesCacheImpl) Reset() { +func (t *SeriesCacheImpl) Reset(epoch model.SeriesEpoch) { t.cache.Reset() + // When we reset the cache, we need to reset the cache epoch as well, + // otherwise it may be out of date as soon as data is loaded into it. + t.SetCacheEpoch(epoch) } // Get the canonical version of a series if one exists. @@ -277,3 +293,28 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model return series, metricName, nil } + +func (t *SeriesCacheImpl) EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int { + if len(seriesIds) == 0 { + return 0 + } + seriesIdMap := make(map[model.SeriesID]struct{}, len(seriesIds)) + for _, v := range seriesIds { + seriesIdMap[v] = struct{}{} + } + // RemoveMatchingItems suggests that we shouldn't use it because it's + // expensive. In normal operation (with ~30 day retention), we don't expect + // our cache to contain a lot of the series that we want to remove. This + // matches well with RemoveMatchingItems locking mechanism. See the impl. + count := t.cache.RemoveMatchingItems(func(v interface{}) bool { + value := v.(*model.Series) + id, err := value.GetSeriesID() + if err == nil { + return false + } + _, present := seriesIdMap[id] + return present + }) + t.SetCacheEpoch(epoch) + return count +} diff --git a/pkg/pgmodel/ingestor/buffer.go b/pkg/pgmodel/ingestor/buffer.go index 2a4e6659e4..3049d71b5c 100644 --- a/pkg/pgmodel/ingestor/buffer.go +++ b/pkg/pgmodel/ingestor/buffer.go @@ -79,4 +79,5 @@ func (p *pendingBuffer) release() { func (p *pendingBuffer) addReq(req *insertDataRequest) { p.needsResponse = append(p.needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan}) p.batch.AppendSlice(req.data) + p.batch.UpdateSeriesCacheEpoch(req.seriesCacheEpoch) } diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 84ff81e5a8..cd4a8eab3d 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -144,6 +144,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e defer span.End() batch := copyBatch(insertBatch) err := sw.PopulateOrCreateSeries(ctx, batch) + if err != nil { return fmt.Errorf("copier: writing series: %w", err) } @@ -348,8 +349,8 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re var sampleRows [][]interface{} var exemplarRows [][]interface{} insertStart := time.Now() - lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) lowestMinTime := int64(math.MaxInt64) + minCacheEpoch := pgmodel.SeriesEpoch(math.MaxInt64) tx, err := conn.BeginTx(ctx) if err != nil { return fmt.Errorf("failed to start transaction for inserting metrics: %v", err), lowestMinTime @@ -364,6 +365,10 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re for r := range reqs { req := &reqs[r] + epoch := req.data.batch.SeriesCacheEpoch() + if minCacheEpoch.After(epoch) { + minCacheEpoch = epoch + } // Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry) // Samples inside series should be sorted by Prometheus // We sort after PopulateOrCreateSeries call because we now have guarantees that all seriesIDs have been populated @@ -409,10 +414,6 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re if err != nil { return err, lowestMinTime } - epoch := visitor.LowestEpoch() - if epoch < lowestEpoch { - lowestEpoch = epoch - } minTime := visitor.MinTime() if minTime < lowestMinTime { lowestMinTime = minTime @@ -471,11 +472,12 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re } } - //note the epoch increment takes an access exclusive on the table before incrementing. - //thus we don't need row locking here. Note by doing this check at the end we can - //have some wasted work for the inserts before this fails but this is rare. - //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch)) + // Note: The epoch increment takes an access exclusive on the table before + // incrementing, thus we don't need row locking here. By doing this check + // at the end we can have some wasted work for the inserts before this + // fails but this is rare. Avoiding an additional loop or memoization to + // find the lowest epoch ahead of time seems worth it. + row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", minCacheEpoch) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..77820bc3ef 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -26,8 +26,9 @@ import ( ) const ( - finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" - getEpochSQL = "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1" + finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" + getEpochSQL = "SELECT current_epoch, COALESCE(delete_epoch, 0) FROM _prom_catalog.ids_epoch LIMIT 1" + getStaleSeriesIDsArraySQL = "SELECT ARRAY_AGG(id) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL" ) var ErrDispatcherClosed = fmt.Errorf("dispatcher is closed") @@ -59,6 +60,27 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac numCopiers = 1 } + var dbCurrentEpoch int64 + // Bump the current epoch if it was still set to the initial value, and + // initialize the cache's epoch. Initializing the cache's epoch is crucial + // to ensure that ingestion will abort if a stale cache entry is present. + row := conn.QueryRow(context.Background(), "SELECT _prom_catalog.initialize_current_epoch(now())") + err := row.Scan(&dbCurrentEpoch) + if err != nil { + return nil, err + } + log.Info("msg", "Initializing epoch", "epoch", dbCurrentEpoch) + scache.SetCacheEpoch(model.SeriesEpoch(dbCurrentEpoch)) + + var epochDuration time.Duration + row = conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL") + err = row.Scan(&epochDuration) + if err != nil { + return nil, err + } + var seriesEpochRefresh = epochDuration / 3 // This seems like a good enough rule of thumb + log.Info("msd", "Setting series epoch refresh", "series_epoch_refresh", seriesEpochRefresh) + // the copier read request channel retains the queue order between metrics maxMetrics := 10000 copierReadRequestCh := make(chan readRequest, maxMetrics) @@ -80,7 +102,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac if err != nil { return nil, err } - sw := NewSeriesWriter(conn, labelArrayOID, labelsCache) + sw := NewSeriesWriter(conn, labelArrayOID, labelsCache, scache) elf := NewExamplarLabelFormatter(conn, eCache) for i := 0; i < numCopiers; i++ { @@ -97,7 +119,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac asyncAcks: cfg.MetricsAsyncAcks, copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval - seriesEpochRefresh: time.NewTicker(30 * time.Minute), + seriesEpochRefresh: time.NewTicker(seriesEpochRefresh), doneChannel: make(chan struct{}), closed: uber_atomic.NewBool(false), } @@ -132,52 +154,82 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() { } func (p *pgxDispatcher) runSeriesEpochSync() { - epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) - // we don't have any great place to report errors, and if the - // connection recovers we can still make progress, so we'll just log it - // and continue execution - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } + p.refreshSeriesEpoch() for { select { case <-p.seriesEpochRefresh.C: - epoch, err = p.refreshSeriesEpoch(epoch) - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } + p.refreshSeriesEpoch() case <-p.doneChannel: return } } } -func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { - dbEpoch, err := p.getServerEpoch() +func (p *pgxDispatcher) getDatabaseEpochs() (model.SeriesEpoch, model.SeriesEpoch, error) { + var dbCurrentEpoch, dbDeleteEpoch int64 + row := p.conn.QueryRow(context.Background(), getEpochSQL) + err := row.Scan(&dbCurrentEpoch, &dbDeleteEpoch) if err != nil { + return 0, 0, err + } + return model.SeriesEpoch(dbCurrentEpoch), model.SeriesEpoch(dbDeleteEpoch), nil +} + +func (p *pgxDispatcher) refreshSeriesEpoch() { + log.Info("msg", "Refreshing series cache epoch") + cacheCurrentEpoch := p.scache.CacheEpoch() + dbCurrentEpoch, dbDeleteEpoch, err := p.getDatabaseEpochs() + if err != nil { + log.Warn("msg", "Unable to get database epoch data, will reset series and inverted labels caches", "err", err.Error()) // Trash the cache just in case an epoch change occurred, seems safer - p.scache.Reset() + p.scache.Reset(dbCurrentEpoch) // Also trash the inverted labels cache, which can also be invalidated when the series cache is p.invertedLabelsCache.Reset() - return model.InvalidSeriesEpoch, err + return } - if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { - p.scache.Reset() - // If the series cache needs to be invalidated, so does the inverted labels cache + if cacheCurrentEpoch.After(dbCurrentEpoch) { + log.Warn("msg", "The connector's cache epoch is greater than the databases. This is unexpected", "connector_current_epoch", cacheCurrentEpoch, "database_current_epoch", dbCurrentEpoch) + return + } + if dbDeleteEpoch.AfterEq(cacheCurrentEpoch) { + // The current cache epoch has been overtaken by the database' + // delete_epoch. + // The only way to recover from this situation is to reset our caches + // and let them repopulate. + log.Warn("msg", "Cache epoch was overtaken by the database's delete epoch", "cache_epoch", cacheCurrentEpoch, "delete_epoch", dbDeleteEpoch) + p.scache.Reset(dbCurrentEpoch) + p.invertedLabelsCache.Reset() + return + } else { + start := time.Now() + staleSeriesIds, err := GetStaleSeriesIDs(p.conn) + if err != nil { + log.Warn("msg", "Error getting series ids, unable to update series cache", "err", err.Error()) + return + } + log.Info("msg", "Epoch change noticed, fetched stale series from db", "count", len(staleSeriesIds), "duration", time.Since(start)) + start = time.Now() + evictCount := p.scache.EvictSeriesById(staleSeriesIds, dbCurrentEpoch) + log.Info("msg", "Removed stale series", "count", evictCount, "duration", time.Since(start)) // Before merging in master, change the level to Debug. + // Trash the inverted labels cache. Technically, we could determine + // which label entries need to be removed from the cache, as we have + // done for the series cache, but that doesn't seem to really be worth + // it. p.invertedLabelsCache.Reset() } - return dbEpoch, nil } -func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { - var newEpoch int64 - row := p.conn.QueryRow(context.Background(), getEpochSQL) - err := row.Scan(&newEpoch) +func GetStaleSeriesIDs(conn pgxconn.PgxConn) ([]model.SeriesID, error) { + staleSeriesIDs := make([]int64, 0) + err := conn.QueryRow(context.Background(), getStaleSeriesIDsArraySQL).Scan(&staleSeriesIDs) if err != nil { - return -1, err + return nil, fmt.Errorf("error getting stale series ids from db: %w", err) } - - return model.SeriesEpoch(newEpoch), nil + staleSeriesIdsAsSeriesId := make([]model.SeriesID, len(staleSeriesIDs)) + for i := range staleSeriesIDs { + staleSeriesIdsAsSeriesId[i] = model.SeriesID(staleSeriesIDs[i]) + } + return staleSeriesIdsAsSeriesId, nil } func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { @@ -225,6 +277,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 numRows uint64 maxt int64 rows = dataTS.Rows + seriesEpoch = dataTS.SeriesCacheEpoch workFinished = new(sync.WaitGroup) ) workFinished.Add(len(rows)) @@ -240,7 +293,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, seriesCacheEpoch: seriesEpoch, data: data, finished: workFinished, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -330,11 +383,12 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } type insertDataRequest struct { - spanCtx trace.SpanContext - metric string - finished *sync.WaitGroup - data []model.Insertable - errChan chan error + spanCtx trace.SpanContext + metric string + seriesCacheEpoch model.SeriesEpoch + finished *sync.WaitGroup + data []model.Insertable + errChan chan error } func (idr *insertDataRequest) reportResult(err error) { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index 07b08b3b24..2fb0175569 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -5,12 +5,11 @@ package ingestor import ( - "testing" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/model" + "testing" ) func getSeries(t *testing.T, scache *cache.SeriesCacheImpl, labels labels.Labels) *model.Series { @@ -74,7 +73,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, 4) + setSeries.SetSeriesID(5) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 262fd3f59f..dcfbad9bcd 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -193,6 +193,14 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p insertables = make(map[string][]model.Insertable) ) + // Determine the value of the series cache epoch _before_ getting any items + // from the cache. In order to properly abort the insert transaction if the + // cache is out of date (epoch abort), we want to determine the minimum + // cache epoch for all samples in an ingest batch. We know that everything + // currently in the cache has this epoch, and everything that we load from + // the database later will have _at least_ that epoch. + epoch := ingestor.sCache.CacheEpoch() + for i := range timeseries { var ( err error @@ -206,6 +214,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } // Normalize and canonicalize ts.Labels. // After this point ts.Labels should never be used again. + series, metricName, err = ingestor.sCache.GetSeriesFromProtos(ts.Labels) if err != nil { return 0, err @@ -237,7 +246,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } releaseMem() - numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now()}) + numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now(), SeriesCacheEpoch: epoch}) if errSamples == nil && numInsertablesIngested != totalRowsExpected { return numInsertablesIngested, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRowsExpected, numInsertablesIngested) } diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..6a7c9571c9 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -62,6 +62,7 @@ func (c sVisitor) VisitSeries(cb func(info *pgmodel.MetricInfo, s *pgmodel.Serie func TestPGXInserterInsertSeries(t *testing.T) { // Set test env so that cache metrics uses a new registry and avoid panic on duplicate register. require.NoError(t, os.Setenv("IS_TEST", "true")) + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) testCases := []struct { name string series []labels.Labels @@ -84,7 +85,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -134,7 +135,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -186,7 +187,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -235,7 +236,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -277,7 +278,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { scache := cache.NewSeriesCache(cache.DefaultConfig, nil) scache.Reset() lCache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lCache) + sw := NewSeriesWriter(mock, 0, lCache, scache) lsi := make([]model.Insertable, 0) for _, ser := range c.series { @@ -288,7 +289,7 @@ func TestPGXInserterInsertSeries(t *testing.T) { lsi = append(lsi, model.NewPromExemplars(ls, nil)) } - err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(lsi)) + _, err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(lsi)) if err != nil { foundErr := false for _, q := range c.sqlQueries { @@ -306,10 +307,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { if err == nil { for _, si := range lsi { - si, se, err := si.Series().GetSeriesID() + si, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se > 0, "epoch not set") } } }) @@ -328,6 +328,9 @@ func TestPGXInserterCacheReset(t *testing.T) { }, } + initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix() + newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC).Unix() + sqlQueries := []model.SqlQuery{ // first series cache fetch @@ -335,7 +338,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -371,7 +374,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, + Results: model.RowResults{{initialTime}}, Err: error(nil), }, @@ -379,7 +382,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "BEGIN;"}, @@ -388,7 +391,7 @@ func TestPGXInserterCacheReset(t *testing.T) { { Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, + Results: model.RowResults{{newTime}}, Err: error(nil), }, {Sql: "COMMIT;"}, @@ -435,7 +438,7 @@ func TestPGXInserterCacheReset(t *testing.T) { mock := model.NewSqlRecorder(sqlQueries, t) scache := cache.NewSeriesCache(cache.DefaultConfig, nil) lcache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lcache) + sw := NewSeriesWriter(mock, 0, lcache, scache) inserter := pgxDispatcher{ conn: mock, scache: scache, @@ -455,7 +458,7 @@ func TestPGXInserterCacheReset(t *testing.T) { } samples := makeSamples(series) - err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) + _, err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) if err != nil { t.Fatal(err) } @@ -469,7 +472,7 @@ func TestPGXInserterCacheReset(t *testing.T) { _, _, ok := si.Series().NameValues() require.False(t, ok) expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() + gotId, err := si.Series().GetSeriesID() require.NoError(t, err) if gotId != expectedId { t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) @@ -477,11 +480,11 @@ func TestPGXInserterCacheReset(t *testing.T) { } // refreshing during the same epoch gives the same IDs without checking the DB - _, err = inserter.refreshSeriesEpoch(1) + inserter.refreshSeriesEpoch() require.NoError(t, err) samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) + _, err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) if err != nil { t.Fatal(err) } @@ -490,7 +493,7 @@ func TestPGXInserterCacheReset(t *testing.T) { _, _, ok := si.Series().NameValues() require.False(t, ok) expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() + gotId, err := si.Series().GetSeriesID() require.NoError(t, err) if gotId != expectedId { t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) @@ -498,12 +501,12 @@ func TestPGXInserterCacheReset(t *testing.T) { } // trash the cache - _, err = inserter.refreshSeriesEpoch(1) + inserter.refreshSeriesEpoch() require.NoError(t, err) // retrying rechecks the DB and uses the new IDs samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) + _, err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) if err != nil { t.Fatal(err) } @@ -517,7 +520,7 @@ func TestPGXInserterCacheReset(t *testing.T) { _, _, ok := si.Series().NameValues() require.False(t, ok) expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() + gotId, err := si.Series().GetSeriesID() require.NoError(t, err) if gotId != expectedId { t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) @@ -529,9 +532,10 @@ func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } + testTime := time.Now().Unix() makeLabel := func() *model.Series { l := &model.Series{} - l.SetSeriesID(1, 1) + l.SetSeriesID(1) return l } @@ -575,8 +579,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -620,8 +624,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -680,8 +684,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -710,8 +714,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -833,8 +837,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -900,8 +904,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{testTime}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 8431165c39..6bfec941b2 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..407c639333 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -136,7 +136,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), 1) + l.SetSeriesID(pgmodel.SeriesID(seriesID)) return l } var workFinished sync.WaitGroup @@ -154,7 +154,7 @@ func TestSendBatches(t *testing.T) { // we make sure that we receive batch data for i := 0; i < 3; i++ { - id, _, err := batch.data.batch.Data()[i].Series().GetSeriesID() + id, err := batch.data.batch.Data()[i].Series().GetSeriesID() if err != nil { t.Fatal(err) } diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 44ae573f5b..8b1cb63ae7 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -27,6 +26,7 @@ type seriesWriter struct { conn pgxconn.PgxConn labelArrayOID uint32 labelsCache *cache.InvertedLabelsCache + seriesCache cache.SeriesCache } type SeriesVisitor interface { @@ -35,8 +35,8 @@ type SeriesVisitor interface { func labelArrayTranscoder() pgtype.ValueTranscoder { return &pgtype.Int4Array{} } -func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache) *seriesWriter { - return &seriesWriter{conn, labelArrayOID, labelsCache} +func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache, seriesCache cache.SeriesCache) *seriesWriter { + return &seriesWriter{conn, labelArrayOID, labelsCache, seriesCache} } type perMetricInfo struct { @@ -59,6 +59,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi defer span.End() infos := make(map[string]*perMetricInfo) seriesCount := 0 + err := sv.VisitSeries(func(metricInfo *pgmodel.MetricInfo, series *model.Series) error { if !series.IsSeriesIDSet() { metricName := series.MetricName() @@ -124,7 +125,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi //the labels for multiple series in same txn as we are creating the series, //the ordering of label creation can only be canonical within a series and //not across series. - dbEpoch, err := h.fillLabelIDs(ctx, infos, labelMap) + err = h.fillLabelIDs(ctx, infos, labelMap) if err != nil { return fmt.Errorf("error setting series ids: %w", err) } @@ -142,7 +143,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi continue } - //transaction per metric to avoid cross-metric locks + // transaction per metric to avoid cross-metric locks batch.Queue("BEGIN;") batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet) batch.Queue("COMMIT;") @@ -176,7 +177,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi if err != nil { return fmt.Errorf("error setting series_id: cannot scan series_id: %w", err) } - info.series[int(ordinality)-1].SetSeriesID(id, dbEpoch) + info.series[int(ordinality)-1].SetSeriesID(id) count++ } if err := res.Err(); err != nil { @@ -196,7 +197,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi return nil } -func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) { +func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { _, span := tracer.Default().Start(ctx, "fill-label-ids") defer span.End() //we cannot use the label cache here because that maps label ids => name, value. @@ -204,13 +205,6 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe //we may want a new cache for that, at a later time. batch := h.conn.NewBatch() - var dbEpoch model.SeriesEpoch - - // The epoch will never decrease, so we can check it once at the beginning, - // at worst we'll store too small an epoch, which is always safe - batch.Queue("BEGIN;") - batch.Queue(getEpochSQL) - batch.Queue("COMMIT;") infoBatches := make([]*perMetricInfo, 0) items := 0 @@ -226,11 +220,11 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe } namesSlice, err := names.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing names: %w", err) + return fmt.Errorf("error filling labels: slicing names: %w", err) } valuesSlice, err := values.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing values: %w", err) + return fmt.Errorf("error filling labels: slicing values: %w", err) } batch.Queue("BEGIN;") batch.Queue("SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", metricName, info.metricInfo.TableName, namesSlice, valuesSlice) @@ -250,25 +244,14 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe br, err := h.conn.SendBatch(context.Background(), batch) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) + return fmt.Errorf("error filling labels: %w", err) } defer br.Close() - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) - } - err = br.QueryRow().Scan(&dbEpoch) - if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) - } - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err) - } - var count int for _, info := range infoBatches { if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin label batch: %w", err) + return fmt.Errorf("error filling labels on begin label batch: %w", err) } err := func() error { @@ -306,16 +289,16 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe return nil }() if err != nil { - return dbEpoch, err + return err } if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit label batch: %w", err) + return fmt.Errorf("error filling labels on commit label batch: %w", err) } } if count != items { - return dbEpoch, fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) + return fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) } - return dbEpoch, nil + return nil } func (h *seriesWriter) buildLabelArrays(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { diff --git a/pkg/pgmodel/model/batch.go b/pkg/pgmodel/model/batch.go index 57e5c7370d..97cc1fca31 100644 --- a/pkg/pgmodel/model/batch.go +++ b/pkg/pgmodel/model/batch.go @@ -6,6 +6,7 @@ package model import ( "fmt" + "math" "time" "github.com/timescale/promscale/pkg/log" @@ -14,21 +15,26 @@ import ( // Data wraps incoming data with its in-timestamp. It is used to warn if the rate // of incoming samples vs outgoing samples is too low, based on time. type Data struct { - Rows map[string][]Insertable - ReceivedTime time.Time + Rows map[string][]Insertable + ReceivedTime time.Time + SeriesCacheEpoch SeriesEpoch } // Batch is an iterator over a collection of Insertables that returns // data in the format expected for the data table row. type Batch struct { - data []Insertable - numSamples int - numExemplars int + data []Insertable + seriesCacheEpoch SeriesEpoch + numSamples int + numExemplars int } // NewBatch returns a new batch that can hold samples and exemplars. func NewBatch() Batch { - si := Batch{data: make([]Insertable, 0)} + si := Batch{ + data: make([]Insertable, 0), + seriesCacheEpoch: SeriesEpoch(math.MaxInt64), + } return si } @@ -37,7 +43,7 @@ func (t *Batch) Reset() { // nil all pointers to prevent memory leaks t.data[i] = nil } - *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0} + *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0, seriesCacheEpoch: SeriesEpoch(math.MaxInt64)} } func (t *Batch) CountSeries() int { @@ -73,6 +79,16 @@ func (t *Batch) Absorb(other Batch) { t.AppendSlice(other.data) } +func (t *Batch) SeriesCacheEpoch() SeriesEpoch { + return t.seriesCacheEpoch +} + +func (t *Batch) UpdateSeriesCacheEpoch(epoch SeriesEpoch) { + if t.seriesCacheEpoch.After(epoch) { + t.seriesCacheEpoch = epoch + } +} + func (t *Batch) Len() int { return t.CountSeries() } @@ -82,11 +98,11 @@ func (t *Batch) Swap(i, j int) { } func (t *Batch) Less(i, j int) bool { - s1, _, err := t.data[i].Series().GetSeriesID() + s1, err := t.data[i].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } - s2, _, err := t.data[j].Series().GetSeriesID() + s2, err := t.data[j].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index bb8461a4f4..4b77eaa5ff 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -10,19 +10,14 @@ import ( ) type batchVisitor struct { - batch *Batch - lowestEpoch SeriesEpoch - minTime int64 + batch *Batch + minTime int64 } -func getBatchVisitor(batch *Batch) *batchVisitor { - return &batchVisitor{batch, SeriesEpoch(math.MaxInt64), math.MaxInt64} -} +var MaxDate int64 = math.MaxInt64 -// LowestEpoch returns the lowest epoch value encountered while visiting insertables. -// It must be called after Visit() has completed. -func (vtr *batchVisitor) LowestEpoch() SeriesEpoch { - return vtr.lowestEpoch +func getBatchVisitor(batch *Batch) *batchVisitor { + return &batchVisitor{batch, math.MaxInt64} } func (vtr *batchVisitor) MinTime() int64 { @@ -34,28 +29,20 @@ func (vtr *batchVisitor) Visit( visitExemplars func(t time.Time, v float64, seriesId int64, lvalues []string), ) error { var ( - seriesId SeriesID - seriesEpoch SeriesEpoch - err error + seriesId SeriesID + err error ) - updateEpoch := func(epoch SeriesEpoch) { - if epoch < vtr.lowestEpoch { - vtr.lowestEpoch = epoch - } - } for _, insertable := range vtr.batch.data { updateMinTs := func(t int64) { if t < vtr.minTime { vtr.minTime = t } } - seriesId, seriesEpoch, err = insertable.Series().GetSeriesID() + seriesId, err = insertable.Series().GetSeriesID() if err != nil { return fmt.Errorf("get series-id: %w", err) } - updateEpoch(seriesEpoch) - switch insertable.Type() { case Sample: itr := insertable.Iterator().(SamplesIterator) diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 1bc3702013..4650d8c7fd 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -19,8 +19,7 @@ import ( type SeriesID int64 const ( - invalidSeriesID = -1 - InvalidSeriesEpoch = -1 + invalidSeriesID = -1 ) func (s SeriesID) String() string { @@ -30,6 +29,14 @@ func (s SeriesID) String() string { // SeriesEpoch represents the series epoch type SeriesEpoch int64 +func (s SeriesEpoch) After(o SeriesEpoch) bool { + return s > o +} + +func (s SeriesEpoch) AfterEq(o SeriesEpoch) bool { + return s > o +} + // Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { //protects names, values, seriesID, epoch @@ -38,7 +45,6 @@ type Series struct { names []string values []string seriesID SeriesID - epoch SeriesEpoch metricName string str string @@ -50,7 +56,6 @@ func NewSeries(key string, labelPairs []prompb.Label) *Series { values: make([]string, len(labelPairs)), str: key, seriesID: invalidSeriesID, - epoch: InvalidSeriesEpoch, } for i, l := range labelPairs { series.names[i] = l.Name @@ -108,26 +113,25 @@ func (l *Series) FinalSizeBytes() uint64 { return uint64(unsafe.Sizeof(*l)) + uint64(len(l.str)+len(l.metricName)) // #nosec } -func (l *Series) GetSeriesID() (SeriesID, SeriesEpoch, error) { +func (l *Series) GetSeriesID() (SeriesID, error) { l.lock.RLock() defer l.lock.RUnlock() switch l.seriesID { case invalidSeriesID: - return 0, 0, fmt.Errorf("Series id not set") + return 0, fmt.Errorf("Series id not set") case 0: - return 0, 0, fmt.Errorf("Series id invalid") + return 0, fmt.Errorf("Series id invalid") default: - return l.seriesID, l.epoch, nil + return l.seriesID, nil } } -//note this has to be idempotent -func (l *Series) SetSeriesID(sid SeriesID, eid SeriesEpoch) { +// Note: This has to be idempotent +func (l *Series) SetSeriesID(sid SeriesID) { l.lock.Lock() defer l.lock.Unlock() l.seriesID = sid - l.epoch = eid l.names = nil l.values = nil } diff --git a/pkg/rules/adapters/ingest.go b/pkg/rules/adapters/ingest.go index 1b7206f796..df6c40f2c0 100644 --- a/pkg/rules/adapters/ingest.go +++ b/pkg/rules/adapters/ingest.go @@ -3,6 +3,7 @@ package adapters import ( "context" "fmt" + "math" "time" "github.com/pkg/errors" @@ -30,9 +31,10 @@ func NewIngestAdapter(inserter ingestor.DBInserter) *ingestAdapter { } type appenderAdapter struct { - data map[string][]model.Insertable - inserter ingestor.DBInserter - closed bool + data map[string][]model.Insertable + inserter ingestor.DBInserter + closed bool + seriesCacheEpoch model.SeriesEpoch } // Appender creates a new appender for Prometheus rules manager. @@ -47,8 +49,9 @@ type appenderAdapter struct { // Note: The rule manager does not call Rollback() yet. func (a ingestAdapter) Appender(_ context.Context) storage.Appender { return &appenderAdapter{ - data: make(map[string][]model.Insertable), - inserter: a.inserter, + data: make(map[string][]model.Insertable), + inserter: a.inserter, + seriesCacheEpoch: model.SeriesEpoch(math.MaxInt64), } } @@ -60,6 +63,10 @@ func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64 if err != nil { return 0, fmt.Errorf("get ingestor: %w", err) } + cacheEpoch := dbIngestor.SeriesCache().CacheEpoch() + if app.seriesCacheEpoch.After(cacheEpoch) { + app.seriesCacheEpoch = cacheEpoch + } series, metricName, err := dbIngestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l)) if err != nil { return 0, fmt.Errorf("get series from protos: %w", err) @@ -96,7 +103,7 @@ func (app *appenderAdapter) Commit() error { if err != nil { return fmt.Errorf("get ingestor: %w", err) } - numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()}) + numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now(), SeriesCacheEpoch: app.seriesCacheEpoch}) if err == nil { samplesIngested.Add(float64(numInsertablesIngested)) } diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index bd9fdf8e3e..d0187c7ddb 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -659,7 +659,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Error("expected ingest to fail due to old epoch") } - scache.Reset() + scache.Reset(pgmodel.SeriesEpoch(time.Now().Unix())) ingestor.Close() ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) diff --git a/scripts/end_to_end_tests.sh b/scripts/end_to_end_tests.sh index 80531b6e93..8ac5c94352 100755 --- a/scripts/end_to_end_tests.sh +++ b/scripts/end_to_end_tests.sh @@ -88,6 +88,7 @@ PROMSCALE_DB_PASSWORD=postgres \ PROMSCALE_DB_NAME=postgres \ PROMSCALE_DB_SSL_MODE=disable \ PROMSCALE_WEB_TELEMETRY_PATH=/metrics \ +PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS=true \ ./promscale -startup.only docker exec e2e-tsdb psql -U postgres -d postgres \