From df28b69735866228c75c7a2f3e5ebc170fbd1ce8 Mon Sep 17 00:00:00 2001 From: James Guthrie Date: Mon, 19 Sep 2022 14:25:05 +0200 Subject: [PATCH] Implement series cache invalidation This change implements invalidation of the series cache, and mechanisms to prevent the ingestion of data based on stale cache information. In principle, the cache invalidation mechanism works as follows: In the database, we track two values: `current_epoch`, and `delete_epoch`. These are unix timestamps (which, for reasons of backwards-compatibility, are stored in a BIGINT field), updated every time that rows in the series table are deleted. `current_epoch` is set from `now()`, and `delete_epoch` is set from `now() - epoch_duration`. `epoch_duration` is a configurable parameter. When a series row is to be deleted, instead of immediately deleting it, we set the `delete_epoch` column of the series row to the `current_epoch` timestamp (the time at which we decided that it will be deleted). After `epoch_duration` time elapses, the row is removed. When the connector starts, it reads `current_epoch` from the database and stores this value with the series cache as `cache_current_epoch`. The connector periodically fetches the ids of series rows where `delete_epoch` is not null, together with `current_epoch`. It removes these entries from the cache, and updates `cache_current_epoch` to the value of `current_epoch` which was fetched from the database. As the connector receives samples to be inserted, it tracks the smallest value of `cache_current_epoch` that it saw for that batch. When it comes to inserting the samples in a batch into the database, it asserts (in the database) that the cache which was read from was not stale. This is expressed as: `cache_current_epoch > delete_epoch`. If this is not the case, the insert is aborted. This is a companion change to [1] which implements the database-side logic required for cache invalidation. [1]: https://github.com/timescale/promscale_extension/pull/529 --- CHANGELOG.md | 1 + docker-compose/docker-compose.yaml | 4 +- .../high-availability/docker-compose.yaml | 5 +- pkg/clockcache/cache.go | 55 ++++ pkg/pgmodel/cache/series_cache.go | 59 +++- pkg/pgmodel/ingestor/buffer.go | 1 + pkg/pgmodel/ingestor/copier.go | 42 ++- pkg/pgmodel/ingestor/dispatcher.go | 184 ++++++++--- pkg/pgmodel/ingestor/handler_test.go | 5 +- pkg/pgmodel/ingestor/ingestor.go | 10 +- pkg/pgmodel/ingestor/ingestor_sql_test.go | 297 +++--------------- pkg/pgmodel/ingestor/metric_batcher.go | 1 - pkg/pgmodel/ingestor/metric_batcher_test.go | 4 +- pkg/pgmodel/ingestor/series_writer.go | 49 +-- pkg/pgmodel/model/batch.go | 34 +- pkg/pgmodel/model/batch_visitor.go | 29 +- pkg/pgmodel/model/series.go | 26 +- pkg/pgmodel/model/sql_test_utils.go | 4 + pkg/rules/adapters/ingest.go | 18 +- pkg/tests/end_to_end_tests/create_test.go | 157 +++++++++ pkg/tests/end_to_end_tests/drop_test.go | 2 +- pkg/tests/upgrade_tests/shapshot.go | 37 ++- scripts/end_to_end_tests.sh | 1 + 23 files changed, 610 insertions(+), 415 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c92c9b4a70..1a7f41cc9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ We use the following categories for changes: configuration. Supersedes `startup.dataset.config` which accepts a string instead of a mapping [#1737] - Add alert to notify about duplicate sample/metric ingestion. [#1688] +- Implemented proper invalidation of the series cache [#1752] ### Changed - Reduced the verbosity of the logs emitted by the vacuum engine [#1715] diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index e6a0dad825..2bc39131f6 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when our changes are published in the docker image + image: ghcr.io/timescale/dev_promscale_extension:jg-logical-to-time-epoch-2-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -42,6 +43,7 @@ services: PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317" PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1" PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true otel-collector: platform: linux/amd64 diff --git a/docker-compose/high-availability/docker-compose.yaml b/docker-compose/high-availability/docker-compose.yaml index 60baa70d25..00ed4852a8 100644 --- a/docker-compose/high-availability/docker-compose.yaml +++ b/docker-compose/high-availability/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when our changes are published in the docker image + image: ghcr.io/timescale/dev_promscale_extension:jg-logical-to-time-epoch-2-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -44,6 +45,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true promscale-connector2: image: timescale/promscale:latest @@ -61,6 +63,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true node_exporter: image: quay.io/prometheus/node-exporter diff --git a/pkg/clockcache/cache.go b/pkg/clockcache/cache.go index 71289fd45d..dc8e1c2300 100644 --- a/pkg/clockcache/cache.go +++ b/pkg/clockcache/cache.go @@ -329,6 +329,13 @@ func (self *Cache) ExpandTo(newMax int) { } func (self *Cache) Reset() { + self.ResetAndCallWhileLocksHeld(func() {}) +} + +// ResetAndCallWhileLocksHeld resets the cache and calls fn while holding the +// insert and elements locks. This allows a consumer of the cache to coordinate +// its state with the state of the cache. +func (self *Cache) ResetAndCallWhileLocksHeld(fn func()) { self.insertLock.Lock() defer self.insertLock.Unlock() oldSize := cap(self.storage) @@ -338,6 +345,10 @@ func (self *Cache) Reset() { self.elementsLock.Lock() defer self.elementsLock.Unlock() + + // We're holding all locks here, so call fn + fn() + self.elements = newElements self.storage = newStorage self.next = 0 @@ -415,3 +426,47 @@ func packUsedAndTimestamp(used bool, ts int32) uint32 { } return uint32(usedBit<<31) | uint32(ts) } + +// RemoveMatchingItems removes items whose value passes the check applied by +// the passed `matcher` function. It returns the number of entries which were +// removed from the cache. +// Note: Due to the locks that this function takes, it is best used when very +// few items stored in the cache are to be removed. +func (self *Cache) RemoveMatchingItems(matcher func(value interface{}) bool) int { + // Note: we take an RLock to find matching items, then release it and take + // a full Lock to remove them. This has two assumptions: + // 1. len(matchingItems) << len(storage): taking a big lock should be fine. + // 2. Missing the addition of an item to the cache is not problematic. + // Locking like this doesn't provide a consistent view of the cache + // contents. Either an entry is removed or added under our feet. An + // entry being removed is not a problem, we will just skip it. An entry + // being added is fine for the one consumer of this method (cache + // invalidation): if an item is added to the cache, then it was just + // created/resurrected in the DB, so it wouldn't have been a candidate + // for removal. + + self.elementsLock.RLock() + matchingItems := make([]interface{}, 0) + for k, v := range self.elements { + if matcher(v.value) { + matchingItems = append(matchingItems, k) + } + } + self.elementsLock.RUnlock() + self.elementsLock.Lock() + for _, v := range matchingItems { + self.remove(v) + } + self.elementsLock.Unlock() + return len(matchingItems) +} + +func (self *Cache) remove(key interface{}) { + value, present := self.elements[key] + if !present { + // do nothing + return + } + delete(self.elements, key) + *value = element{} +} diff --git a/pkg/pgmodel/cache/series_cache.go b/pkg/pgmodel/cache/series_cache.go index 289cd7019e..dcbabdbcb9 100644 --- a/pkg/pgmodel/cache/series_cache.go +++ b/pkg/pgmodel/cache/series_cache.go @@ -30,22 +30,28 @@ var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting eleme // SeriesCache is a cache of model.Series entries. type SeriesCache interface { - Reset() + Reset(epoch model.SeriesEpoch) GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error) Len() int Cap() int Evictions() uint64 + EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int + CacheEpoch() model.SeriesEpoch + SetCacheEpoch(epoch model.SeriesEpoch) } type SeriesCacheImpl struct { cache *clockcache.Cache maxSizeBytes uint64 + cacheEpoch model.SeriesEpoch + epochLock sync.RWMutex } func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { cache := &SeriesCacheImpl{ - clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize), - config.SeriesCacheMemoryMaxBytes, + cache: clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize), + maxSizeBytes: config.SeriesCacheMemoryMaxBytes, + cacheEpoch: model.SeriesEpoch(0), } if sigClose != nil { @@ -54,6 +60,18 @@ func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { return cache } +func (t *SeriesCacheImpl) CacheEpoch() model.SeriesEpoch { + t.epochLock.RLock() + defer t.epochLock.RUnlock() + return t.cacheEpoch +} + +func (t *SeriesCacheImpl) SetCacheEpoch(epoch model.SeriesEpoch) { + t.epochLock.Lock() + defer t.epochLock.Unlock() + t.cacheEpoch = epoch +} + func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) { ticker := time.NewTicker(growCheckDuration) for { @@ -115,8 +133,12 @@ func (t *SeriesCacheImpl) Evictions() uint64 { } // Reset should be concurrency-safe -func (t *SeriesCacheImpl) Reset() { - t.cache.Reset() +func (t *SeriesCacheImpl) Reset(epoch model.SeriesEpoch) { + t.cache.ResetAndCallWhileLocksHeld(func() { + // When we reset the cache, we need to reset the cache epoch as well, + // otherwise it may be out of date as soon as data is loaded into it. + t.SetCacheEpoch(epoch) + }) } // Get the canonical version of a series if one exists. @@ -133,7 +155,7 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) { // representation, returning the canonical version (which can be different in // the even of multiple goroutines setting labels concurrently). func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series { - //str not counted twice in size since the key and lset.str will point to same thing. + // str not counted twice in size since the key and lset.str will point to same thing. val, _ := t.cache.Insert(str, lset, lset.FinalSizeBytes()) return val.(*model.Series) } @@ -278,3 +300,28 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model return series, metricName, nil } + +func (t *SeriesCacheImpl) EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int { + if len(seriesIds) == 0 { + return 0 + } + seriesIdMap := make(map[model.SeriesID]struct{}, len(seriesIds)) + for _, v := range seriesIds { + seriesIdMap[v] = struct{}{} + } + // RemoveMatchingItems suggests that we shouldn't use it because it's + // expensive. In normal operation (with ~30 day retention), we don't expect + // our cache to contain a lot of the series that we want to remove. This + // matches well with RemoveMatchingItems locking mechanism. See the impl. + count := t.cache.RemoveMatchingItems(func(v interface{}) bool { + value := v.(*model.Series) + id, err := value.GetSeriesID() + if err != nil { + return false + } + _, present := seriesIdMap[id] + return present + }) + t.SetCacheEpoch(epoch) + return count +} diff --git a/pkg/pgmodel/ingestor/buffer.go b/pkg/pgmodel/ingestor/buffer.go index 2a4e6659e4..3049d71b5c 100644 --- a/pkg/pgmodel/ingestor/buffer.go +++ b/pkg/pgmodel/ingestor/buffer.go @@ -79,4 +79,5 @@ func (p *pendingBuffer) release() { func (p *pendingBuffer) addReq(req *insertDataRequest) { p.needsResponse = append(p.needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan}) p.batch.AppendSlice(req.data) + p.batch.UpdateSeriesCacheEpoch(req.seriesCacheEpoch) } diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 7337a466d7..1ff5654203 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -145,6 +145,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e defer span.End() batch := copyBatch(insertBatch) err := sw.PopulateOrCreateSeries(ctx, batch) + if err != nil { return fmt.Errorf("copier: writing series: %w", err) } @@ -209,6 +210,15 @@ func doInsertOrFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyR if isPGUniqueViolation(err) { err, _ = insertSeries(ctx, conn, true, reqs...) } + if isEpochAbort(err) { + // Epoch abort is non-recoverable. Return error here instead of retrying. + log.Warn("msg", "Insert failed on epoch abort", "error", err) + for i := range reqs { + reqs[i].data.reportResults(NewInsertAbortError(err)) + reqs[i].data.release() + } + return + } if err != nil { log.Error("msg", err) insertBatchErrorFallback(ctx, conn, reqs...) @@ -234,6 +244,17 @@ func isPGUniqueViolation(err error) bool { return false } +func isEpochAbort(err error) bool { + if err == nil { + return false + } + pgErr, ok := err.(*pgconn.PgError) + if ok && pgErr.Code == "PS001" { + return true + } + return false +} + func insertBatchErrorFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) { ctx, span := tracer.Default().Start(ctx, "insert-batch-error-fallback") defer span.End() @@ -349,8 +370,8 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re var sampleRows [][]interface{} var exemplarRows [][]interface{} insertStart := time.Now() - lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) lowestMinTime := int64(math.MaxInt64) + minCacheEpoch := pgmodel.SeriesEpoch(0) tx, err := conn.BeginTx(ctx) if err != nil { return fmt.Errorf("failed to start transaction for inserting metrics: %v", err), lowestMinTime @@ -365,6 +386,10 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re for r := range reqs { req := &reqs[r] + epoch := req.data.batch.SeriesCacheEpoch() + if minCacheEpoch == pgmodel.SeriesEpoch(0) || minCacheEpoch.After(epoch) { + minCacheEpoch = epoch + } // Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry) // Samples inside series should be sorted by Prometheus // We sort after PopulateOrCreateSeries call because we now have guarantees that all seriesIDs have been populated @@ -410,10 +435,6 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re if err != nil { return err, lowestMinTime } - epoch := visitor.LowestEpoch() - if epoch < lowestEpoch { - lowestEpoch = epoch - } minTime := visitor.MinTime() if minTime < lowestMinTime { lowestMinTime = minTime @@ -472,11 +493,12 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re } } - //note the epoch increment takes an access exclusive on the table before incrementing. - //thus we don't need row locking here. Note by doing this check at the end we can - //have some wasted work for the inserts before this fails but this is rare. - //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch)) + // Note: The epoch increment takes an access exclusive on the table before + // incrementing, thus we don't need row locking here. By doing this check + // at the end we can have some wasted work for the inserts before this + // fails but this is rare. Avoiding an additional loop or memoization to + // find the lowest epoch ahead of time seems worth it. + row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(minCacheEpoch)) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..b86898f169 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "fmt" + "github.com/pkg/errors" "sync" "sync/atomic" "time" @@ -26,15 +27,16 @@ import ( ) const ( - finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" - getEpochSQL = "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1" + finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" + getEpochSQL = "SELECT current_epoch, COALESCE(delete_epoch, 0) FROM _prom_catalog.ids_epoch LIMIT 1" + getStaleSeriesIDsArraySQL = "SELECT ARRAY_AGG(id) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL" ) var ErrDispatcherClosed = fmt.Errorf("dispatcher is closed") -// pgxDispatcher redirects incoming samples to the appropriate metricBatcher +// PgxDispatcher redirects incoming samples to the appropriate metricBatcher // corresponding to the metric in the sample. -type pgxDispatcher struct { +type PgxDispatcher struct { conn pgxconn.PgxConn metricTableNames cache.MetricCache scache cache.SeriesCache @@ -48,17 +50,51 @@ type pgxDispatcher struct { doneChannel chan struct{} closed *uber_atomic.Bool doneWG sync.WaitGroup + seriesRefreshLock sync.Mutex } -var _ model.Dispatcher = &pgxDispatcher{} +type InsertAbortError struct { + err error +} + +func NewInsertAbortError(err error) InsertAbortError { + return InsertAbortError{err} +} + +func (e InsertAbortError) Error() string { + return e.err.Error() +} -func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) { +var _ model.Dispatcher = &PgxDispatcher{} + +func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*PgxDispatcher, error) { numCopiers := cfg.NumCopiers if numCopiers < 1 { log.Warn("msg", "num copiers less than 1, setting to 1") numCopiers = 1 } + var dbCurrentEpoch int64 + // Bump the current epoch if it was still set to the initial value, and + // initialize the cache's epoch. Initializing the cache's epoch is crucial + // to ensure that ingestion will abort if a stale cache entry is present. + row := conn.QueryRow(context.Background(), "SELECT _prom_catalog.initialize_current_epoch(now())") + err := row.Scan(&dbCurrentEpoch) + if err != nil { + return nil, err + } + log.Info("msg", "Initializing epoch", "epoch", dbCurrentEpoch) + scache.SetCacheEpoch(model.SeriesEpoch(dbCurrentEpoch)) + + var epochDuration time.Duration + row = conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL") + err = row.Scan(&epochDuration) + if err != nil { + return nil, err + } + var seriesEpochRefresh = epochDuration / 3 // This seems like a good enough rule of thumb + log.Info("msd", "Setting series epoch refresh", "series_epoch_refresh", seriesEpochRefresh) + // the copier read request channel retains the queue order between metrics maxMetrics := 10000 copierReadRequestCh := make(chan readRequest, maxMetrics) @@ -80,14 +116,14 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac if err != nil { return nil, err } - sw := NewSeriesWriter(conn, labelArrayOID, labelsCache) + sw := NewSeriesWriter(conn, labelArrayOID, labelsCache, scache) elf := NewExamplarLabelFormatter(conn, eCache) for i := 0; i < numCopiers; i++ { go runCopier(conn, copierReadRequestCh, sw, elf) } - inserter := &pgxDispatcher{ + inserter := &PgxDispatcher{ conn: conn, metricTableNames: mCache, scache: scache, @@ -97,7 +133,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac asyncAcks: cfg.MetricsAsyncAcks, copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval - seriesEpochRefresh: time.NewTicker(30 * time.Minute), + seriesEpochRefresh: time.NewTicker(seriesEpochRefresh), doneChannel: make(chan struct{}), closed: uber_atomic.NewBool(false), } @@ -122,7 +158,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac return inserter, nil } -func (p *pgxDispatcher) runCompleteMetricCreationWorker() { +func (p *PgxDispatcher) runCompleteMetricCreationWorker() { for range p.completeMetricCreation { err := p.CompleteMetricCreation(context.Background()) if err != nil { @@ -131,56 +167,100 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() { } } -func (p *pgxDispatcher) runSeriesEpochSync() { - epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) - // we don't have any great place to report errors, and if the - // connection recovers we can still make progress, so we'll just log it - // and continue execution - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } +func (p *PgxDispatcher) runSeriesEpochSync() { + p.RefreshSeriesEpoch() for { select { case <-p.seriesEpochRefresh.C: - epoch, err = p.refreshSeriesEpoch(epoch) - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } + p.RefreshSeriesEpoch() case <-p.doneChannel: return } } } -func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { - dbEpoch, err := p.getServerEpoch() +func (p *PgxDispatcher) getDatabaseEpochs() (model.SeriesEpoch, model.SeriesEpoch, error) { + var dbCurrentEpoch, dbDeleteEpoch int64 + row := p.conn.QueryRow(context.Background(), getEpochSQL) + err := row.Scan(&dbCurrentEpoch, &dbDeleteEpoch) if err != nil { + return 0, 0, err + } + return model.SeriesEpoch(dbCurrentEpoch), model.SeriesEpoch(dbDeleteEpoch), nil +} + +func (p *PgxDispatcher) RefreshSeriesEpoch() { + // There can be multiple concurrent executions of `RefreshSeriesEpoch`. We + // use TryLock here to ensure that we don't have concurrent execution in + // this function, and that we don't have multiple back-to-back executions. + locked := p.seriesRefreshLock.TryLock() + if !locked { + log.Info("msg", "Skipping series cache refresh") + return + } + defer p.seriesRefreshLock.Unlock() + log.Info("msg", "Refreshing series cache epoch") + cacheCurrentEpoch := p.scache.CacheEpoch() + dbCurrentEpoch, dbDeleteEpoch, err := p.getDatabaseEpochs() + log.Info("msg", "RefreshSeriesEpoch", "cacheCurrentEpoch", cacheCurrentEpoch, "dbCurrentEpoch", dbCurrentEpoch, "dbDeleteEpoch", dbDeleteEpoch) + if err != nil { + log.Warn("msg", "Unable to get database epoch data, will reset series and inverted labels caches", "err", err.Error()) // Trash the cache just in case an epoch change occurred, seems safer - p.scache.Reset() + p.scache.Reset(dbCurrentEpoch) // Also trash the inverted labels cache, which can also be invalidated when the series cache is p.invertedLabelsCache.Reset() - return model.InvalidSeriesEpoch, err + return } - if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { - p.scache.Reset() - // If the series cache needs to be invalidated, so does the inverted labels cache + if cacheCurrentEpoch.After(dbCurrentEpoch) { + log.Warn("msg", "The connector's cache epoch is greater than the databases. This is unexpected", "connector_current_epoch", cacheCurrentEpoch, "database_current_epoch", dbCurrentEpoch) + // Reset the caches. It's not really clear what would lead to this + // situation, but if it does then something must have gone wrong... + p.scache.Reset(dbCurrentEpoch) + p.invertedLabelsCache.Reset() + return + } + if dbDeleteEpoch.AfterEq(cacheCurrentEpoch) { + // The current cache epoch has been overtaken by the database' + // delete_epoch. + // The only way to recover from this situation is to reset our caches + // and let them repopulate. + log.Warn("msg", "Cache epoch was overtaken by the database's delete epoch, resetting series and labels caches", "cache_epoch", cacheCurrentEpoch, "delete_epoch", dbDeleteEpoch) + p.scache.Reset(dbCurrentEpoch) + p.invertedLabelsCache.Reset() + return + } else { + start := time.Now() + staleSeriesIds, err := GetStaleSeriesIDs(p.conn) + if err != nil { + log.Warn("msg", "Error getting series ids, unable to update series cache", "err", err.Error()) + return + } + log.Info("msg", "Epoch change noticed, fetched stale series from db", "count", len(staleSeriesIds), "duration", time.Since(start)) + start = time.Now() + evictCount := p.scache.EvictSeriesById(staleSeriesIds, dbCurrentEpoch) + log.Info("msg", "Removed stale series", "count", evictCount, "duration", time.Since(start)) // Before merging in master, change the level to Debug. + // Trash the inverted labels cache. Technically, we could determine + // which label entries need to be removed from the cache, as we have + // done for the series cache, but that doesn't seem to really be worth + // it. p.invertedLabelsCache.Reset() } - return dbEpoch, nil } -func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { - var newEpoch int64 - row := p.conn.QueryRow(context.Background(), getEpochSQL) - err := row.Scan(&newEpoch) +func GetStaleSeriesIDs(conn pgxconn.PgxConn) ([]model.SeriesID, error) { + staleSeriesIDs := make([]int64, 0) + err := conn.QueryRow(context.Background(), getStaleSeriesIDsArraySQL).Scan(&staleSeriesIDs) if err != nil { - return -1, err + return nil, fmt.Errorf("error getting stale series ids from db: %w", err) } - - return model.SeriesEpoch(newEpoch), nil + staleSeriesIdsAsSeriesId := make([]model.SeriesID, len(staleSeriesIDs)) + for i := range staleSeriesIDs { + staleSeriesIdsAsSeriesId[i] = model.SeriesID(staleSeriesIDs[i]) + } + return staleSeriesIdsAsSeriesId, nil } -func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { +func (p *PgxDispatcher) CompleteMetricCreation(ctx context.Context) error { if p.closed.Load() { return ErrDispatcherClosed } @@ -193,7 +273,7 @@ func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { return err } -func (p *pgxDispatcher) Close() { +func (p *PgxDispatcher) Close() { if p.closed.Load() { return } @@ -215,7 +295,7 @@ func (p *pgxDispatcher) Close() { // actually inserted) and any error. // Though we may insert data to multiple tables concurrently, if asyncAcks is // unset this function will wait until _all_ the insert attempts have completed. -func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64, error) { +func (p *PgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64, error) { if p.closed.Load() { return 0, ErrDispatcherClosed } @@ -225,6 +305,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 numRows uint64 maxt int64 rows = dataTS.Rows + seriesEpoch = dataTS.SeriesCacheEpoch workFinished = new(sync.WaitGroup) ) workFinished.Add(len(rows)) @@ -240,7 +321,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, seriesCacheEpoch: seriesEpoch, data: data, finished: workFinished, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -258,6 +339,9 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 case err = <-errChan: default: } + if errors.As(err, &InsertAbortError{}) { + p.RefreshSeriesEpoch() + } reportMetricsTelemetry(maxt, numRows, 0) close(errChan) } else { @@ -270,6 +354,9 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 } close(errChan) if err != nil { + if errors.As(err, &InsertAbortError{}) { + p.RefreshSeriesEpoch() + } log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err) } reportMetricsTelemetry(maxt, numRows, 0) @@ -279,7 +366,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 return numRows, err } -func (p *pgxDispatcher) InsertMetadata(ctx context.Context, metadata []model.Metadata) (uint64, error) { +func (p *PgxDispatcher) InsertMetadata(ctx context.Context, metadata []model.Metadata) (uint64, error) { if p.closed.Load() { return 0, ErrDispatcherClosed } @@ -309,7 +396,7 @@ func reportMetricsTelemetry(maxTs int64, numSamples, numMetadata uint64) { } // Get the handler for a given metric name, creating a new one if none exists -func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataRequest { +func (p *PgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataRequest { batcher, ok := p.batchers.Load(metric) if !ok { // The ordering is important here: we need to ensure that every call @@ -330,11 +417,12 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } type insertDataRequest struct { - spanCtx trace.SpanContext - metric string - finished *sync.WaitGroup - data []model.Insertable - errChan chan error + spanCtx trace.SpanContext + metric string + seriesCacheEpoch model.SeriesEpoch + finished *sync.WaitGroup + data []model.Insertable + errChan chan error } func (idr *insertDataRequest) reportResult(err error) { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index 07b08b3b24..2fb0175569 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -5,12 +5,11 @@ package ingestor import ( - "testing" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/model" + "testing" ) func getSeries(t *testing.T, scache *cache.SeriesCacheImpl, labels labels.Labels) *model.Series { @@ -74,7 +73,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, 4) + setSeries.SetSeriesID(5) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 815eef2f2d..65dddc3c4f 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -194,6 +194,14 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p insertables = make(map[string][]model.Insertable) ) + // Determine the value of the series cache epoch _before_ getting any items + // from the cache. In order to properly abort the insert transaction if the + // cache is out of date (epoch abort), we want to determine the minimum + // cache epoch for all samples in an ingest batch. We know that everything + // currently in the cache has this epoch, and everything that we load from + // the database later will have _at least_ that epoch. + epoch := ingestor.sCache.CacheEpoch() + for i := range timeseries { var ( err error @@ -238,7 +246,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } releaseMem() - numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now()}) + numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now(), SeriesCacheEpoch: epoch}) if errSamples == nil && numInsertablesIngested != totalRowsExpected { return numInsertablesIngested, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRowsExpected, numInsertablesIngested) } diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..ddc52c25d5 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -80,14 +80,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -130,14 +122,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -182,14 +166,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -231,14 +207,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -275,9 +243,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { } mock := model.NewSqlRecorder(c.sqlQueries, t) scache := cache.NewSeriesCache(cache.DefaultConfig, nil) - scache.Reset() + scache.Reset(pgmodel.SeriesEpoch(time.Now().Unix())) lCache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lCache) + sw := NewSeriesWriter(mock, 0, lCache, scache) lsi := make([]model.Insertable, 0) for _, ser := range c.series { @@ -306,232 +274,25 @@ func TestPGXInserterInsertSeries(t *testing.T) { if err == nil { for _, si := range lsi { - si, se, err := si.Series().GetSeriesID() + si, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se > 0, "epoch not set") } } }) } } -func TestPGXInserterCacheReset(t *testing.T) { - series := []labels.Labels{ - { - {Name: "__name__", Value: "metric_1"}, - {Name: "name_1", Value: "value_1"}, - }, - { - {Name: "name_1", Value: "value_2"}, - {Name: "__name__", Value: "metric_1"}, - }, - } - - sqlQueries := []model.SqlQuery{ - - // first series cache fetch - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", - Args: []interface{}{ - "metric_1", - tableName, - []string{"__name__", "name_1", "name_1"}, - []string{"metric_1", "value_1", "value_2"}, - }, - Results: model.RowResults{ - {[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}}, - }, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: seriesInsertSQL, - Args: []interface{}{ - metricID, - tableName, - getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}), - }, - Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - - // first labels cache refresh, does not trash - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - - // second labels cache refresh, trash the cache - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, - Err: error(nil), - }, - {Sql: "BEGIN;"}, - - // repopulate the cache - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", - Args: []interface{}{ - "metric_1", - tableName, - []string{"__name__", "name_1", "name_1"}, - []string{"metric_1", "value_1", "value_2"}, - }, - Results: model.RowResults{ - {[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}}, - }, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: seriesInsertSQL, - Args: []interface{}{ - metricID, - tableName, - getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}), - }, - Results: model.RowResults{{int64(3), int64(1)}, {int64(4), int64(2)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - } - - for i := range sqlQueries { - for j := range sqlQueries[i].Args { - if _, ok := sqlQueries[i].Args[j].([]string); ok { - tmp := &pgutf8str.TextArray{} - err := tmp.Set(sqlQueries[i].Args[j]) - require.NoError(t, err) - sqlQueries[i].Args[j] = tmp - } - } - } - - mock := model.NewSqlRecorder(sqlQueries, t) - scache := cache.NewSeriesCache(cache.DefaultConfig, nil) - lcache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lcache) - inserter := pgxDispatcher{ - conn: mock, - scache: scache, - invertedLabelsCache: lcache, - } - - makeSamples := func(series []labels.Labels) []model.Insertable { - lsi := make([]model.Insertable, 0) - for _, ser := range series { - ls, err := scache.GetSeriesFromLabels(ser) - if err != nil { - t.Errorf("invalid labels %+v, %v", ls, err) - } - lsi = append(lsi, model.NewPromSamples(ls, nil)) - } - return lsi - } - - samples := makeSamples(series) - err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) - if err != nil { - t.Fatal(err) - } - - expectedIds := []model.SeriesID{ - model.SeriesID(1), - model.SeriesID(2), - } - - for index, si := range samples { - _, _, ok := si.Series().NameValues() - require.False(t, ok) - expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() - require.NoError(t, err) - if gotId != expectedId { - t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) - } - } - - // refreshing during the same epoch gives the same IDs without checking the DB - _, err = inserter.refreshSeriesEpoch(1) - require.NoError(t, err) - - samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) - if err != nil { - t.Fatal(err) - } - - for index, si := range samples { - _, _, ok := si.Series().NameValues() - require.False(t, ok) - expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() - require.NoError(t, err) - if gotId != expectedId { - t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) - } - } - - // trash the cache - _, err = inserter.refreshSeriesEpoch(1) - require.NoError(t, err) - - // retrying rechecks the DB and uses the new IDs - samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) - if err != nil { - t.Fatal(err) - } - - expectedIds = []model.SeriesID{ - model.SeriesID(3), - model.SeriesID(4), - } - - for index, si := range samples { - _, _, ok := si.Series().NameValues() - require.False(t, ok) - expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() - require.NoError(t, err) - if gotId != expectedId { - t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) - } - } -} - func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } + testTime := time.Now().Unix() + testEpochDuration := 1 * time.Hour + seriesCacheEpoch := pgmodel.SeriesEpoch(testTime - 10) makeLabel := func() *model.Series { l := &model.Series{} - l.SetSeriesID(1, 1) + l.SetSeriesID(1) return l } @@ -544,6 +305,8 @@ func TestPGXInserterInsertData(t *testing.T) { { name: "Zero data", sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -555,6 +318,8 @@ func TestPGXInserterInsertData(t *testing.T) { "metric_0": {model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1))}, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -575,8 +340,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -591,6 +356,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -620,8 +387,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -639,6 +406,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -658,6 +427,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -680,8 +451,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -710,8 +481,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -730,6 +501,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -790,6 +563,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -812,6 +587,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, metricsGetErr: fmt.Errorf("some metrics error"), sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -833,8 +610,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -850,6 +627,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -900,8 +679,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -938,7 +717,7 @@ func TestPGXInserterInsertData(t *testing.T) { } defer inserter.Close() - _, err = inserter.InsertTs(ctx, model.Data{Rows: c.rows}) + _, err = inserter.InsertTs(ctx, model.Data{Rows: c.rows, SeriesCacheEpoch: seriesCacheEpoch}) var expErr error switch { diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index f6153cef12..98b9c0128b 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..407c639333 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -136,7 +136,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), 1) + l.SetSeriesID(pgmodel.SeriesID(seriesID)) return l } var workFinished sync.WaitGroup @@ -154,7 +154,7 @@ func TestSendBatches(t *testing.T) { // we make sure that we receive batch data for i := 0; i < 3; i++ { - id, _, err := batch.data.batch.Data()[i].Series().GetSeriesID() + id, err := batch.data.batch.Data()[i].Series().GetSeriesID() if err != nil { t.Fatal(err) } diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 44ae573f5b..8b1cb63ae7 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -27,6 +26,7 @@ type seriesWriter struct { conn pgxconn.PgxConn labelArrayOID uint32 labelsCache *cache.InvertedLabelsCache + seriesCache cache.SeriesCache } type SeriesVisitor interface { @@ -35,8 +35,8 @@ type SeriesVisitor interface { func labelArrayTranscoder() pgtype.ValueTranscoder { return &pgtype.Int4Array{} } -func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache) *seriesWriter { - return &seriesWriter{conn, labelArrayOID, labelsCache} +func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache, seriesCache cache.SeriesCache) *seriesWriter { + return &seriesWriter{conn, labelArrayOID, labelsCache, seriesCache} } type perMetricInfo struct { @@ -59,6 +59,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi defer span.End() infos := make(map[string]*perMetricInfo) seriesCount := 0 + err := sv.VisitSeries(func(metricInfo *pgmodel.MetricInfo, series *model.Series) error { if !series.IsSeriesIDSet() { metricName := series.MetricName() @@ -124,7 +125,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi //the labels for multiple series in same txn as we are creating the series, //the ordering of label creation can only be canonical within a series and //not across series. - dbEpoch, err := h.fillLabelIDs(ctx, infos, labelMap) + err = h.fillLabelIDs(ctx, infos, labelMap) if err != nil { return fmt.Errorf("error setting series ids: %w", err) } @@ -142,7 +143,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi continue } - //transaction per metric to avoid cross-metric locks + // transaction per metric to avoid cross-metric locks batch.Queue("BEGIN;") batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet) batch.Queue("COMMIT;") @@ -176,7 +177,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi if err != nil { return fmt.Errorf("error setting series_id: cannot scan series_id: %w", err) } - info.series[int(ordinality)-1].SetSeriesID(id, dbEpoch) + info.series[int(ordinality)-1].SetSeriesID(id) count++ } if err := res.Err(); err != nil { @@ -196,7 +197,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi return nil } -func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) { +func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { _, span := tracer.Default().Start(ctx, "fill-label-ids") defer span.End() //we cannot use the label cache here because that maps label ids => name, value. @@ -204,13 +205,6 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe //we may want a new cache for that, at a later time. batch := h.conn.NewBatch() - var dbEpoch model.SeriesEpoch - - // The epoch will never decrease, so we can check it once at the beginning, - // at worst we'll store too small an epoch, which is always safe - batch.Queue("BEGIN;") - batch.Queue(getEpochSQL) - batch.Queue("COMMIT;") infoBatches := make([]*perMetricInfo, 0) items := 0 @@ -226,11 +220,11 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe } namesSlice, err := names.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing names: %w", err) + return fmt.Errorf("error filling labels: slicing names: %w", err) } valuesSlice, err := values.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing values: %w", err) + return fmt.Errorf("error filling labels: slicing values: %w", err) } batch.Queue("BEGIN;") batch.Queue("SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", metricName, info.metricInfo.TableName, namesSlice, valuesSlice) @@ -250,25 +244,14 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe br, err := h.conn.SendBatch(context.Background(), batch) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) + return fmt.Errorf("error filling labels: %w", err) } defer br.Close() - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) - } - err = br.QueryRow().Scan(&dbEpoch) - if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) - } - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err) - } - var count int for _, info := range infoBatches { if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin label batch: %w", err) + return fmt.Errorf("error filling labels on begin label batch: %w", err) } err := func() error { @@ -306,16 +289,16 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe return nil }() if err != nil { - return dbEpoch, err + return err } if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit label batch: %w", err) + return fmt.Errorf("error filling labels on commit label batch: %w", err) } } if count != items { - return dbEpoch, fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) + return fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) } - return dbEpoch, nil + return nil } func (h *seriesWriter) buildLabelArrays(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { diff --git a/pkg/pgmodel/model/batch.go b/pkg/pgmodel/model/batch.go index 57e5c7370d..97cc1fca31 100644 --- a/pkg/pgmodel/model/batch.go +++ b/pkg/pgmodel/model/batch.go @@ -6,6 +6,7 @@ package model import ( "fmt" + "math" "time" "github.com/timescale/promscale/pkg/log" @@ -14,21 +15,26 @@ import ( // Data wraps incoming data with its in-timestamp. It is used to warn if the rate // of incoming samples vs outgoing samples is too low, based on time. type Data struct { - Rows map[string][]Insertable - ReceivedTime time.Time + Rows map[string][]Insertable + ReceivedTime time.Time + SeriesCacheEpoch SeriesEpoch } // Batch is an iterator over a collection of Insertables that returns // data in the format expected for the data table row. type Batch struct { - data []Insertable - numSamples int - numExemplars int + data []Insertable + seriesCacheEpoch SeriesEpoch + numSamples int + numExemplars int } // NewBatch returns a new batch that can hold samples and exemplars. func NewBatch() Batch { - si := Batch{data: make([]Insertable, 0)} + si := Batch{ + data: make([]Insertable, 0), + seriesCacheEpoch: SeriesEpoch(math.MaxInt64), + } return si } @@ -37,7 +43,7 @@ func (t *Batch) Reset() { // nil all pointers to prevent memory leaks t.data[i] = nil } - *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0} + *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0, seriesCacheEpoch: SeriesEpoch(math.MaxInt64)} } func (t *Batch) CountSeries() int { @@ -73,6 +79,16 @@ func (t *Batch) Absorb(other Batch) { t.AppendSlice(other.data) } +func (t *Batch) SeriesCacheEpoch() SeriesEpoch { + return t.seriesCacheEpoch +} + +func (t *Batch) UpdateSeriesCacheEpoch(epoch SeriesEpoch) { + if t.seriesCacheEpoch.After(epoch) { + t.seriesCacheEpoch = epoch + } +} + func (t *Batch) Len() int { return t.CountSeries() } @@ -82,11 +98,11 @@ func (t *Batch) Swap(i, j int) { } func (t *Batch) Less(i, j int) bool { - s1, _, err := t.data[i].Series().GetSeriesID() + s1, err := t.data[i].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } - s2, _, err := t.data[j].Series().GetSeriesID() + s2, err := t.data[j].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index bb8461a4f4..4b77eaa5ff 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -10,19 +10,14 @@ import ( ) type batchVisitor struct { - batch *Batch - lowestEpoch SeriesEpoch - minTime int64 + batch *Batch + minTime int64 } -func getBatchVisitor(batch *Batch) *batchVisitor { - return &batchVisitor{batch, SeriesEpoch(math.MaxInt64), math.MaxInt64} -} +var MaxDate int64 = math.MaxInt64 -// LowestEpoch returns the lowest epoch value encountered while visiting insertables. -// It must be called after Visit() has completed. -func (vtr *batchVisitor) LowestEpoch() SeriesEpoch { - return vtr.lowestEpoch +func getBatchVisitor(batch *Batch) *batchVisitor { + return &batchVisitor{batch, math.MaxInt64} } func (vtr *batchVisitor) MinTime() int64 { @@ -34,28 +29,20 @@ func (vtr *batchVisitor) Visit( visitExemplars func(t time.Time, v float64, seriesId int64, lvalues []string), ) error { var ( - seriesId SeriesID - seriesEpoch SeriesEpoch - err error + seriesId SeriesID + err error ) - updateEpoch := func(epoch SeriesEpoch) { - if epoch < vtr.lowestEpoch { - vtr.lowestEpoch = epoch - } - } for _, insertable := range vtr.batch.data { updateMinTs := func(t int64) { if t < vtr.minTime { vtr.minTime = t } } - seriesId, seriesEpoch, err = insertable.Series().GetSeriesID() + seriesId, err = insertable.Series().GetSeriesID() if err != nil { return fmt.Errorf("get series-id: %w", err) } - updateEpoch(seriesEpoch) - switch insertable.Type() { case Sample: itr := insertable.Iterator().(SamplesIterator) diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 3b0cead4b3..36eab0cb1e 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -19,8 +19,7 @@ import ( type SeriesID int64 const ( - invalidSeriesID = -1 - InvalidSeriesEpoch = -1 + invalidSeriesID = -1 ) func (s SeriesID) String() string { @@ -30,6 +29,14 @@ func (s SeriesID) String() string { // SeriesEpoch represents the series epoch type SeriesEpoch int64 +func (s SeriesEpoch) After(o SeriesEpoch) bool { + return s > o +} + +func (s SeriesEpoch) AfterEq(o SeriesEpoch) bool { + return s >= o +} + // Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { //protects names, values, seriesID, epoch @@ -38,7 +45,6 @@ type Series struct { names []string values []string seriesID SeriesID - epoch SeriesEpoch metricName string str string @@ -50,7 +56,6 @@ func NewSeries(key string, labelPairs []prompb.Label) *Series { values: make([]string, len(labelPairs)), str: key, seriesID: invalidSeriesID, - epoch: InvalidSeriesEpoch, } for i, l := range labelPairs { series.names[i] = l.Name @@ -108,26 +113,25 @@ func (l *Series) FinalSizeBytes() uint64 { return uint64(unsafe.Sizeof(*l)) + uint64(len(l.str)+len(l.metricName)) // #nosec } -func (l *Series) GetSeriesID() (SeriesID, SeriesEpoch, error) { +func (l *Series) GetSeriesID() (SeriesID, error) { l.lock.RLock() defer l.lock.RUnlock() switch l.seriesID { case invalidSeriesID: - return 0, 0, fmt.Errorf("Series id not set") + return 0, fmt.Errorf("Series id not set") case 0: - return 0, 0, fmt.Errorf("Series id invalid") + return 0, fmt.Errorf("Series id invalid") default: - return l.seriesID, l.epoch, nil + return l.seriesID, nil } } -// note this has to be idempotent -func (l *Series) SetSeriesID(sid SeriesID, eid SeriesEpoch) { +// Note: This has to be idempotent +func (l *Series) SetSeriesID(sid SeriesID) { l.lock.Lock() defer l.lock.Unlock() l.seriesID = sid - l.epoch = eid l.names = nil l.values = nil } diff --git a/pkg/pgmodel/model/sql_test_utils.go b/pkg/pgmodel/model/sql_test_utils.go index 124965b1ee..59ad84a37a 100644 --- a/pkg/pgmodel/model/sql_test_utils.go +++ b/pkg/pgmodel/model/sql_test_utils.go @@ -413,6 +413,10 @@ func (m *MockRows) Scan(dest ...interface{}) error { if d, ok := dest[i].(*time.Time); ok { *d = s } + case time.Duration: + if d, ok := dest[i].(*time.Duration); ok { + *d = s + } case float64: if _, ok := dest[i].(float64); !ok { return fmt.Errorf("wrong value type float64") diff --git a/pkg/rules/adapters/ingest.go b/pkg/rules/adapters/ingest.go index 1b7206f796..6bb161508e 100644 --- a/pkg/rules/adapters/ingest.go +++ b/pkg/rules/adapters/ingest.go @@ -30,9 +30,10 @@ func NewIngestAdapter(inserter ingestor.DBInserter) *ingestAdapter { } type appenderAdapter struct { - data map[string][]model.Insertable - inserter ingestor.DBInserter - closed bool + data map[string][]model.Insertable + inserter ingestor.DBInserter + closed bool + seriesCacheEpoch model.SeriesEpoch } // Appender creates a new appender for Prometheus rules manager. @@ -47,8 +48,9 @@ type appenderAdapter struct { // Note: The rule manager does not call Rollback() yet. func (a ingestAdapter) Appender(_ context.Context) storage.Appender { return &appenderAdapter{ - data: make(map[string][]model.Insertable), - inserter: a.inserter, + data: make(map[string][]model.Insertable), + inserter: a.inserter, + seriesCacheEpoch: model.SeriesEpoch(0), } } @@ -60,6 +62,10 @@ func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64 if err != nil { return 0, fmt.Errorf("get ingestor: %w", err) } + cacheEpoch := dbIngestor.SeriesCache().CacheEpoch() + if app.seriesCacheEpoch == model.SeriesEpoch(0) || app.seriesCacheEpoch.After(cacheEpoch) { + app.seriesCacheEpoch = cacheEpoch + } series, metricName, err := dbIngestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l)) if err != nil { return 0, fmt.Errorf("get series from protos: %w", err) @@ -96,7 +102,7 @@ func (app *appenderAdapter) Commit() error { if err != nil { return fmt.Errorf("get ingestor: %w", err) } - numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()}) + numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now(), SeriesCacheEpoch: app.seriesCacheEpoch}) if err == nil { samplesIngested.Add(float64(numInsertablesIngested)) } diff --git a/pkg/tests/end_to_end_tests/create_test.go b/pkg/tests/end_to_end_tests/create_test.go index d1acf0e5ae..e96ee4ccdd 100644 --- a/pkg/tests/end_to_end_tests/create_test.go +++ b/pkg/tests/end_to_end_tests/create_test.go @@ -496,6 +496,163 @@ func TestSQLIngest(t *testing.T) { } } +func TestInsertAbort(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + samplesOne := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 1, Value: 0.1}, + }, + }, + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 2, Value: 0.2}, + }, + }, + } + + samplesTwo := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 3, Value: 0.1}, + }, + }, + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 4, Value: 0.2}, + }, + }, + } + + withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) { + db := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_writer") + defer db.Close() + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) + if err != nil { + t.Fatal(err) + } + defer ingestor.Close() + + // Ingest some metrics + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesOne))) + require.NoError(t, err) + + err = ingestor.CompleteMetricCreation(context.Background()) + require.NoError(t, err) + + // Adjust database delete_epoch + _, err = db.Exec(context.Background(), "UPDATE _prom_catalog.ids_epoch SET delete_epoch = current_epoch, current_epoch = current_epoch + 14400") + require.NoError(t, err) + + // Insert more samples + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesTwo))) + + // Assert that insert fails + require.Error(t, err, "") + + // Insert more samples + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesTwo))) + + // Expect that insertion is successful + require.NoError(t, err) + }) +} + +func TestCacheEvictionBetweenInserts(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + samplesOne := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 1, Value: 0.1}, + }, + }, + } + + samplesTwo := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 2, Value: 0.2}, + }, + }, + } + + withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) { + db := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_writer") + defer db.Close() + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) + if err != nil { + t.Fatal(err) + } + defer ingestor.Close() + + // Ingest some metrics + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesOne))) + require.NoError(t, err) + + err = ingestor.CompleteMetricCreation(context.Background()) + require.NoError(t, err) + + // Mark all series for deletion, this will cause cache eviction if the epoch refresh runs + _, err = db.Exec(context.Background(), "UPDATE _prom_catalog.series SET delete_epoch = i.current_epoch FROM _prom_catalog.ids_epoch as i") + require.NoError(t, err) + + // Adjust database current_epoch + _, err = db.Exec(context.Background(), "UPDATE _prom_catalog.ids_epoch SET current_epoch = current_epoch + 10") + require.NoError(t, err) + + // Refresh series epoch, this will result in caches being emptied + ingestor.Dispatcher().(*ingstr.PgxDispatcher).RefreshSeriesEpoch() + + // Here we expect all items to have been removed from the cache + // We can't dig into the cache directly, but we can observe that items + // were evicted by inserting into the same series again, and seeing + // that the `delete_epoch` is NULL in the database afterwards. + + // Insert more samples for the series + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesTwo))) + require.NoError(t, err, "") + + // Check how many series still have non-null delete_epoch + row := db.QueryRow(context.Background(), "SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL") + var count int + err = row.Scan(&count) + require.NoError(t, err) + + // We expect that there are no series with non-null delete epoch + require.Equal(t, 0, count) + }) +} + func TestInsertCompressedDuplicates(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index bd9fdf8e3e..d0187c7ddb 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -659,7 +659,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Error("expected ingest to fail due to old epoch") } - scache.Reset() + scache.Reset(pgmodel.SeriesEpoch(time.Now().Unix())) ingestor.Close() ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) diff --git a/pkg/tests/upgrade_tests/shapshot.go b/pkg/tests/upgrade_tests/shapshot.go index 94f93f695b..ec25720aa0 100644 --- a/pkg/tests/upgrade_tests/shapshot.go +++ b/pkg/tests/upgrade_tests/shapshot.go @@ -347,7 +347,12 @@ func getTableInfosForSchema(t *testing.T, db *pgxpool.Pool, schema string) (out `SELECT array_agg(relname::TEXT order by relname::TEXT) FROM pg_class WHERE relnamespace=$1::TEXT::regnamespace AND relkind='r' - AND (relnamespace, relname) != ('_ps_catalog'::regnamespace, 'migration') + AND ( + (relnamespace, relname) != ('_ps_catalog'::regnamespace, 'migration') + AND + -- skip ids_epoch because the current_epoch can change when promscale starts + (relnamespace, relname) != ('_prom_catalog'::regnamespace, 'ids_epoch') + ) `, schema, ) @@ -359,7 +364,7 @@ func getTableInfosForSchema(t *testing.T, db *pgxpool.Pool, schema string) (out } numTables := len(tables) - if schema == "_ps_catalog" { + if schema == "_ps_catalog" || schema == "_prom_catalog" { numTables = numTables + 1 } out = make([]tableInfo, numTables) @@ -390,5 +395,33 @@ func getTableInfosForSchema(t *testing.T, db *pgxpool.Pool, schema string) (out return } } + if schema == "_prom_catalog" { + // dynamically build a query to select all columns except for `current_epoch` + row = db.QueryRow(context.Background(), + ` + WITH cols AS ( + SELECT array_agg(format('%I', a.attname)) as cols + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_attribute a ON a.attrelid = c.oid + WHERE n.nspname = '_prom_catalog' + AND relname = 'ids_epoch' + AND a.attnum > 0 + AND a.attname <> 'current_epoch' + ) + SELECT 'SELECT array_agg((' || array_to_string((SELECT cols FROM cols), ', ') || ')::TEXT) FROM _prom_catalog.ids_epoch'; + `) + var query string + if err = row.Scan(&query); err != nil { + t.Errorf("error building query: %v", err) + return + } + row = db.QueryRow(context.Background(), query) + out[numTables-1].name = "ids_epoch" + if err = row.Scan(&out[numTables-1].values); err != nil { + t.Errorf("error querying values from table _ps_catalog.ids_epoch: %v", err) + return + } + } return } diff --git a/scripts/end_to_end_tests.sh b/scripts/end_to_end_tests.sh index 80531b6e93..8ac5c94352 100755 --- a/scripts/end_to_end_tests.sh +++ b/scripts/end_to_end_tests.sh @@ -88,6 +88,7 @@ PROMSCALE_DB_PASSWORD=postgres \ PROMSCALE_DB_NAME=postgres \ PROMSCALE_DB_SSL_MODE=disable \ PROMSCALE_WEB_TELEMETRY_PATH=/metrics \ +PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS=true \ ./promscale -startup.only docker exec e2e-tsdb psql -U postgres -d postgres \