diff --git a/CHANGELOG.md b/CHANGELOG.md index c92c9b4a70..1a7f41cc9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ We use the following categories for changes: configuration. Supersedes `startup.dataset.config` which accepts a string instead of a mapping [#1737] - Add alert to notify about duplicate sample/metric ingestion. [#1688] +- Implemented proper invalidation of the series cache [#1752] ### Changed - Reduced the verbosity of the logs emitted by the vacuum engine [#1715] diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index e6a0dad825..2bc39131f6 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when our changes are published in the docker image + image: ghcr.io/timescale/dev_promscale_extension:jg-logical-to-time-epoch-2-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -42,6 +43,7 @@ services: PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317" PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1" PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true otel-collector: platform: linux/amd64 diff --git a/docker-compose/high-availability/docker-compose.yaml b/docker-compose/high-availability/docker-compose.yaml index 60baa70d25..00ed4852a8 100644 --- a/docker-compose/high-availability/docker-compose.yaml +++ b/docker-compose/high-availability/docker-compose.yaml @@ -2,7 +2,8 @@ version: '3.0' services: db: - image: timescale/timescaledb-ha:pg14-latest + # TODO (james): replace this when our changes are published in the docker image + image: ghcr.io/timescale/dev_promscale_extension:jg-logical-to-time-epoch-2-ts2-pg14 ports: - 5432:5432/tcp environment: @@ -44,6 +45,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true promscale-connector2: image: timescale/promscale:latest @@ -61,6 +63,7 @@ services: PROMSCALE_METRICS_HIGH_AVAILABILITY: true PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml + PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true node_exporter: image: quay.io/prometheus/node-exporter diff --git a/pkg/clockcache/cache.go b/pkg/clockcache/cache.go index 71289fd45d..dc8e1c2300 100644 --- a/pkg/clockcache/cache.go +++ b/pkg/clockcache/cache.go @@ -329,6 +329,13 @@ func (self *Cache) ExpandTo(newMax int) { } func (self *Cache) Reset() { + self.ResetAndCallWhileLocksHeld(func() {}) +} + +// ResetAndCallWhileLocksHeld resets the cache and calls fn while holding the +// insert and elements locks. This allows a consumer of the cache to coordinate +// its state with the state of the cache. +func (self *Cache) ResetAndCallWhileLocksHeld(fn func()) { self.insertLock.Lock() defer self.insertLock.Unlock() oldSize := cap(self.storage) @@ -338,6 +345,10 @@ func (self *Cache) Reset() { self.elementsLock.Lock() defer self.elementsLock.Unlock() + + // We're holding all locks here, so call fn + fn() + self.elements = newElements self.storage = newStorage self.next = 0 @@ -415,3 +426,47 @@ func packUsedAndTimestamp(used bool, ts int32) uint32 { } return uint32(usedBit<<31) | uint32(ts) } + +// RemoveMatchingItems removes items whose value passes the check applied by +// the passed `matcher` function. It returns the number of entries which were +// removed from the cache. +// Note: Due to the locks that this function takes, it is best used when very +// few items stored in the cache are to be removed. +func (self *Cache) RemoveMatchingItems(matcher func(value interface{}) bool) int { + // Note: we take an RLock to find matching items, then release it and take + // a full Lock to remove them. This has two assumptions: + // 1. len(matchingItems) << len(storage): taking a big lock should be fine. + // 2. Missing the addition of an item to the cache is not problematic. + // Locking like this doesn't provide a consistent view of the cache + // contents. Either an entry is removed or added under our feet. An + // entry being removed is not a problem, we will just skip it. An entry + // being added is fine for the one consumer of this method (cache + // invalidation): if an item is added to the cache, then it was just + // created/resurrected in the DB, so it wouldn't have been a candidate + // for removal. + + self.elementsLock.RLock() + matchingItems := make([]interface{}, 0) + for k, v := range self.elements { + if matcher(v.value) { + matchingItems = append(matchingItems, k) + } + } + self.elementsLock.RUnlock() + self.elementsLock.Lock() + for _, v := range matchingItems { + self.remove(v) + } + self.elementsLock.Unlock() + return len(matchingItems) +} + +func (self *Cache) remove(key interface{}) { + value, present := self.elements[key] + if !present { + // do nothing + return + } + delete(self.elements, key) + *value = element{} +} diff --git a/pkg/pgmodel/cache/series_cache.go b/pkg/pgmodel/cache/series_cache.go index 289cd7019e..dcbabdbcb9 100644 --- a/pkg/pgmodel/cache/series_cache.go +++ b/pkg/pgmodel/cache/series_cache.go @@ -30,22 +30,28 @@ var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting eleme // SeriesCache is a cache of model.Series entries. type SeriesCache interface { - Reset() + Reset(epoch model.SeriesEpoch) GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error) Len() int Cap() int Evictions() uint64 + EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int + CacheEpoch() model.SeriesEpoch + SetCacheEpoch(epoch model.SeriesEpoch) } type SeriesCacheImpl struct { cache *clockcache.Cache maxSizeBytes uint64 + cacheEpoch model.SeriesEpoch + epochLock sync.RWMutex } func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { cache := &SeriesCacheImpl{ - clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize), - config.SeriesCacheMemoryMaxBytes, + cache: clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize), + maxSizeBytes: config.SeriesCacheMemoryMaxBytes, + cacheEpoch: model.SeriesEpoch(0), } if sigClose != nil { @@ -54,6 +60,18 @@ func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl { return cache } +func (t *SeriesCacheImpl) CacheEpoch() model.SeriesEpoch { + t.epochLock.RLock() + defer t.epochLock.RUnlock() + return t.cacheEpoch +} + +func (t *SeriesCacheImpl) SetCacheEpoch(epoch model.SeriesEpoch) { + t.epochLock.Lock() + defer t.epochLock.Unlock() + t.cacheEpoch = epoch +} + func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) { ticker := time.NewTicker(growCheckDuration) for { @@ -115,8 +133,12 @@ func (t *SeriesCacheImpl) Evictions() uint64 { } // Reset should be concurrency-safe -func (t *SeriesCacheImpl) Reset() { - t.cache.Reset() +func (t *SeriesCacheImpl) Reset(epoch model.SeriesEpoch) { + t.cache.ResetAndCallWhileLocksHeld(func() { + // When we reset the cache, we need to reset the cache epoch as well, + // otherwise it may be out of date as soon as data is loaded into it. + t.SetCacheEpoch(epoch) + }) } // Get the canonical version of a series if one exists. @@ -133,7 +155,7 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) { // representation, returning the canonical version (which can be different in // the even of multiple goroutines setting labels concurrently). func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series { - //str not counted twice in size since the key and lset.str will point to same thing. + // str not counted twice in size since the key and lset.str will point to same thing. val, _ := t.cache.Insert(str, lset, lset.FinalSizeBytes()) return val.(*model.Series) } @@ -278,3 +300,28 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model return series, metricName, nil } + +func (t *SeriesCacheImpl) EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int { + if len(seriesIds) == 0 { + return 0 + } + seriesIdMap := make(map[model.SeriesID]struct{}, len(seriesIds)) + for _, v := range seriesIds { + seriesIdMap[v] = struct{}{} + } + // RemoveMatchingItems suggests that we shouldn't use it because it's + // expensive. In normal operation (with ~30 day retention), we don't expect + // our cache to contain a lot of the series that we want to remove. This + // matches well with RemoveMatchingItems locking mechanism. See the impl. + count := t.cache.RemoveMatchingItems(func(v interface{}) bool { + value := v.(*model.Series) + id, err := value.GetSeriesID() + if err != nil { + return false + } + _, present := seriesIdMap[id] + return present + }) + t.SetCacheEpoch(epoch) + return count +} diff --git a/pkg/pgmodel/ingestor/buffer.go b/pkg/pgmodel/ingestor/buffer.go index 2a4e6659e4..3049d71b5c 100644 --- a/pkg/pgmodel/ingestor/buffer.go +++ b/pkg/pgmodel/ingestor/buffer.go @@ -79,4 +79,5 @@ func (p *pendingBuffer) release() { func (p *pendingBuffer) addReq(req *insertDataRequest) { p.needsResponse = append(p.needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan}) p.batch.AppendSlice(req.data) + p.batch.UpdateSeriesCacheEpoch(req.seriesCacheEpoch) } diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 7337a466d7..1ff5654203 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -145,6 +145,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e defer span.End() batch := copyBatch(insertBatch) err := sw.PopulateOrCreateSeries(ctx, batch) + if err != nil { return fmt.Errorf("copier: writing series: %w", err) } @@ -209,6 +210,15 @@ func doInsertOrFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyR if isPGUniqueViolation(err) { err, _ = insertSeries(ctx, conn, true, reqs...) } + if isEpochAbort(err) { + // Epoch abort is non-recoverable. Return error here instead of retrying. + log.Warn("msg", "Insert failed on epoch abort", "error", err) + for i := range reqs { + reqs[i].data.reportResults(NewInsertAbortError(err)) + reqs[i].data.release() + } + return + } if err != nil { log.Error("msg", err) insertBatchErrorFallback(ctx, conn, reqs...) @@ -234,6 +244,17 @@ func isPGUniqueViolation(err error) bool { return false } +func isEpochAbort(err error) bool { + if err == nil { + return false + } + pgErr, ok := err.(*pgconn.PgError) + if ok && pgErr.Code == "PS001" { + return true + } + return false +} + func insertBatchErrorFallback(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest) { ctx, span := tracer.Default().Start(ctx, "insert-batch-error-fallback") defer span.End() @@ -349,8 +370,8 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re var sampleRows [][]interface{} var exemplarRows [][]interface{} insertStart := time.Now() - lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64) lowestMinTime := int64(math.MaxInt64) + minCacheEpoch := pgmodel.SeriesEpoch(0) tx, err := conn.BeginTx(ctx) if err != nil { return fmt.Errorf("failed to start transaction for inserting metrics: %v", err), lowestMinTime @@ -365,6 +386,10 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re for r := range reqs { req := &reqs[r] + epoch := req.data.batch.SeriesCacheEpoch() + if minCacheEpoch == pgmodel.SeriesEpoch(0) || minCacheEpoch.After(epoch) { + minCacheEpoch = epoch + } // Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry) // Samples inside series should be sorted by Prometheus // We sort after PopulateOrCreateSeries call because we now have guarantees that all seriesIDs have been populated @@ -410,10 +435,6 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re if err != nil { return err, lowestMinTime } - epoch := visitor.LowestEpoch() - if epoch < lowestEpoch { - lowestEpoch = epoch - } minTime := visitor.MinTime() if minTime < lowestMinTime { lowestMinTime = minTime @@ -472,11 +493,12 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re } } - //note the epoch increment takes an access exclusive on the table before incrementing. - //thus we don't need row locking here. Note by doing this check at the end we can - //have some wasted work for the inserts before this fails but this is rare. - //avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it. - row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch)) + // Note: The epoch increment takes an access exclusive on the table before + // incrementing, thus we don't need row locking here. By doing this check + // at the end we can have some wasted work for the inserts before this + // fails but this is rare. Avoiding an additional loop or memoization to + // find the lowest epoch ahead of time seems worth it. + row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(minCacheEpoch)) var val []byte if err = row.Scan(&val); err != nil { return err, lowestMinTime diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..b86898f169 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "fmt" + "github.com/pkg/errors" "sync" "sync/atomic" "time" @@ -26,15 +27,16 @@ import ( ) const ( - finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" - getEpochSQL = "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1" + finalizeMetricCreation = "CALL _prom_catalog.finalize_metric_creation()" + getEpochSQL = "SELECT current_epoch, COALESCE(delete_epoch, 0) FROM _prom_catalog.ids_epoch LIMIT 1" + getStaleSeriesIDsArraySQL = "SELECT ARRAY_AGG(id) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL" ) var ErrDispatcherClosed = fmt.Errorf("dispatcher is closed") -// pgxDispatcher redirects incoming samples to the appropriate metricBatcher +// PgxDispatcher redirects incoming samples to the appropriate metricBatcher // corresponding to the metric in the sample. -type pgxDispatcher struct { +type PgxDispatcher struct { conn pgxconn.PgxConn metricTableNames cache.MetricCache scache cache.SeriesCache @@ -48,17 +50,51 @@ type pgxDispatcher struct { doneChannel chan struct{} closed *uber_atomic.Bool doneWG sync.WaitGroup + seriesRefreshLock sync.Mutex } -var _ model.Dispatcher = &pgxDispatcher{} +type InsertAbortError struct { + err error +} + +func NewInsertAbortError(err error) InsertAbortError { + return InsertAbortError{err} +} + +func (e InsertAbortError) Error() string { + return e.err.Error() +} -func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*pgxDispatcher, error) { +var _ model.Dispatcher = &PgxDispatcher{} + +func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cache.SeriesCache, eCache cache.PositionCache, cfg *Cfg) (*PgxDispatcher, error) { numCopiers := cfg.NumCopiers if numCopiers < 1 { log.Warn("msg", "num copiers less than 1, setting to 1") numCopiers = 1 } + var dbCurrentEpoch int64 + // Bump the current epoch if it was still set to the initial value, and + // initialize the cache's epoch. Initializing the cache's epoch is crucial + // to ensure that ingestion will abort if a stale cache entry is present. + row := conn.QueryRow(context.Background(), "SELECT _prom_catalog.initialize_current_epoch(now())") + err := row.Scan(&dbCurrentEpoch) + if err != nil { + return nil, err + } + log.Info("msg", "Initializing epoch", "epoch", dbCurrentEpoch) + scache.SetCacheEpoch(model.SeriesEpoch(dbCurrentEpoch)) + + var epochDuration time.Duration + row = conn.QueryRow(context.Background(), "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL") + err = row.Scan(&epochDuration) + if err != nil { + return nil, err + } + var seriesEpochRefresh = epochDuration / 3 // This seems like a good enough rule of thumb + log.Info("msd", "Setting series epoch refresh", "series_epoch_refresh", seriesEpochRefresh) + // the copier read request channel retains the queue order between metrics maxMetrics := 10000 copierReadRequestCh := make(chan readRequest, maxMetrics) @@ -80,14 +116,14 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac if err != nil { return nil, err } - sw := NewSeriesWriter(conn, labelArrayOID, labelsCache) + sw := NewSeriesWriter(conn, labelArrayOID, labelsCache, scache) elf := NewExamplarLabelFormatter(conn, eCache) for i := 0; i < numCopiers; i++ { go runCopier(conn, copierReadRequestCh, sw, elf) } - inserter := &pgxDispatcher{ + inserter := &PgxDispatcher{ conn: conn, metricTableNames: mCache, scache: scache, @@ -97,7 +133,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac asyncAcks: cfg.MetricsAsyncAcks, copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval - seriesEpochRefresh: time.NewTicker(30 * time.Minute), + seriesEpochRefresh: time.NewTicker(seriesEpochRefresh), doneChannel: make(chan struct{}), closed: uber_atomic.NewBool(false), } @@ -122,7 +158,7 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac return inserter, nil } -func (p *pgxDispatcher) runCompleteMetricCreationWorker() { +func (p *PgxDispatcher) runCompleteMetricCreationWorker() { for range p.completeMetricCreation { err := p.CompleteMetricCreation(context.Background()) if err != nil { @@ -131,56 +167,100 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() { } } -func (p *pgxDispatcher) runSeriesEpochSync() { - epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch) - // we don't have any great place to report errors, and if the - // connection recovers we can still make progress, so we'll just log it - // and continue execution - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } +func (p *PgxDispatcher) runSeriesEpochSync() { + p.RefreshSeriesEpoch() for { select { case <-p.seriesEpochRefresh.C: - epoch, err = p.refreshSeriesEpoch(epoch) - if err != nil { - log.Error("msg", "error refreshing the series cache", "err", err) - } + p.RefreshSeriesEpoch() case <-p.doneChannel: return } } } -func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) { - dbEpoch, err := p.getServerEpoch() +func (p *PgxDispatcher) getDatabaseEpochs() (model.SeriesEpoch, model.SeriesEpoch, error) { + var dbCurrentEpoch, dbDeleteEpoch int64 + row := p.conn.QueryRow(context.Background(), getEpochSQL) + err := row.Scan(&dbCurrentEpoch, &dbDeleteEpoch) if err != nil { + return 0, 0, err + } + return model.SeriesEpoch(dbCurrentEpoch), model.SeriesEpoch(dbDeleteEpoch), nil +} + +func (p *PgxDispatcher) RefreshSeriesEpoch() { + // There can be multiple concurrent executions of `RefreshSeriesEpoch`. We + // use TryLock here to ensure that we don't have concurrent execution in + // this function, and that we don't have multiple back-to-back executions. + locked := p.seriesRefreshLock.TryLock() + if !locked { + log.Info("msg", "Skipping series cache refresh") + return + } + defer p.seriesRefreshLock.Unlock() + log.Info("msg", "Refreshing series cache epoch") + cacheCurrentEpoch := p.scache.CacheEpoch() + dbCurrentEpoch, dbDeleteEpoch, err := p.getDatabaseEpochs() + log.Info("msg", "RefreshSeriesEpoch", "cacheCurrentEpoch", cacheCurrentEpoch, "dbCurrentEpoch", dbCurrentEpoch, "dbDeleteEpoch", dbDeleteEpoch) + if err != nil { + log.Warn("msg", "Unable to get database epoch data, will reset series and inverted labels caches", "err", err.Error()) // Trash the cache just in case an epoch change occurred, seems safer - p.scache.Reset() + p.scache.Reset(dbCurrentEpoch) // Also trash the inverted labels cache, which can also be invalidated when the series cache is p.invertedLabelsCache.Reset() - return model.InvalidSeriesEpoch, err + return } - if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch { - p.scache.Reset() - // If the series cache needs to be invalidated, so does the inverted labels cache + if cacheCurrentEpoch.After(dbCurrentEpoch) { + log.Warn("msg", "The connector's cache epoch is greater than the databases. This is unexpected", "connector_current_epoch", cacheCurrentEpoch, "database_current_epoch", dbCurrentEpoch) + // Reset the caches. It's not really clear what would lead to this + // situation, but if it does then something must have gone wrong... + p.scache.Reset(dbCurrentEpoch) + p.invertedLabelsCache.Reset() + return + } + if dbDeleteEpoch.AfterEq(cacheCurrentEpoch) { + // The current cache epoch has been overtaken by the database' + // delete_epoch. + // The only way to recover from this situation is to reset our caches + // and let them repopulate. + log.Warn("msg", "Cache epoch was overtaken by the database's delete epoch, resetting series and labels caches", "cache_epoch", cacheCurrentEpoch, "delete_epoch", dbDeleteEpoch) + p.scache.Reset(dbCurrentEpoch) + p.invertedLabelsCache.Reset() + return + } else { + start := time.Now() + staleSeriesIds, err := GetStaleSeriesIDs(p.conn) + if err != nil { + log.Warn("msg", "Error getting series ids, unable to update series cache", "err", err.Error()) + return + } + log.Info("msg", "Epoch change noticed, fetched stale series from db", "count", len(staleSeriesIds), "duration", time.Since(start)) + start = time.Now() + evictCount := p.scache.EvictSeriesById(staleSeriesIds, dbCurrentEpoch) + log.Info("msg", "Removed stale series", "count", evictCount, "duration", time.Since(start)) // Before merging in master, change the level to Debug. + // Trash the inverted labels cache. Technically, we could determine + // which label entries need to be removed from the cache, as we have + // done for the series cache, but that doesn't seem to really be worth + // it. p.invertedLabelsCache.Reset() } - return dbEpoch, nil } -func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) { - var newEpoch int64 - row := p.conn.QueryRow(context.Background(), getEpochSQL) - err := row.Scan(&newEpoch) +func GetStaleSeriesIDs(conn pgxconn.PgxConn) ([]model.SeriesID, error) { + staleSeriesIDs := make([]int64, 0) + err := conn.QueryRow(context.Background(), getStaleSeriesIDsArraySQL).Scan(&staleSeriesIDs) if err != nil { - return -1, err + return nil, fmt.Errorf("error getting stale series ids from db: %w", err) } - - return model.SeriesEpoch(newEpoch), nil + staleSeriesIdsAsSeriesId := make([]model.SeriesID, len(staleSeriesIDs)) + for i := range staleSeriesIDs { + staleSeriesIdsAsSeriesId[i] = model.SeriesID(staleSeriesIDs[i]) + } + return staleSeriesIdsAsSeriesId, nil } -func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { +func (p *PgxDispatcher) CompleteMetricCreation(ctx context.Context) error { if p.closed.Load() { return ErrDispatcherClosed } @@ -193,7 +273,7 @@ func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error { return err } -func (p *pgxDispatcher) Close() { +func (p *PgxDispatcher) Close() { if p.closed.Load() { return } @@ -215,7 +295,7 @@ func (p *pgxDispatcher) Close() { // actually inserted) and any error. // Though we may insert data to multiple tables concurrently, if asyncAcks is // unset this function will wait until _all_ the insert attempts have completed. -func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64, error) { +func (p *PgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64, error) { if p.closed.Load() { return 0, ErrDispatcherClosed } @@ -225,6 +305,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 numRows uint64 maxt int64 rows = dataTS.Rows + seriesEpoch = dataTS.SeriesCacheEpoch workFinished = new(sync.WaitGroup) ) workFinished.Add(len(rows)) @@ -240,7 +321,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, seriesCacheEpoch: seriesEpoch, data: data, finished: workFinished, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -258,6 +339,9 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 case err = <-errChan: default: } + if errors.As(err, &InsertAbortError{}) { + p.RefreshSeriesEpoch() + } reportMetricsTelemetry(maxt, numRows, 0) close(errChan) } else { @@ -270,6 +354,9 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 } close(errChan) if err != nil { + if errors.As(err, &InsertAbortError{}) { + p.RefreshSeriesEpoch() + } log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err) } reportMetricsTelemetry(maxt, numRows, 0) @@ -279,7 +366,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 return numRows, err } -func (p *pgxDispatcher) InsertMetadata(ctx context.Context, metadata []model.Metadata) (uint64, error) { +func (p *PgxDispatcher) InsertMetadata(ctx context.Context, metadata []model.Metadata) (uint64, error) { if p.closed.Load() { return 0, ErrDispatcherClosed } @@ -309,7 +396,7 @@ func reportMetricsTelemetry(maxTs int64, numSamples, numMetadata uint64) { } // Get the handler for a given metric name, creating a new one if none exists -func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataRequest { +func (p *PgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataRequest { batcher, ok := p.batchers.Load(metric) if !ok { // The ordering is important here: we need to ensure that every call @@ -330,11 +417,12 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } type insertDataRequest struct { - spanCtx trace.SpanContext - metric string - finished *sync.WaitGroup - data []model.Insertable - errChan chan error + spanCtx trace.SpanContext + metric string + seriesCacheEpoch model.SeriesEpoch + finished *sync.WaitGroup + data []model.Insertable + errChan chan error } func (idr *insertDataRequest) reportResult(err error) { diff --git a/pkg/pgmodel/ingestor/handler_test.go b/pkg/pgmodel/ingestor/handler_test.go index 07b08b3b24..2fb0175569 100644 --- a/pkg/pgmodel/ingestor/handler_test.go +++ b/pkg/pgmodel/ingestor/handler_test.go @@ -5,12 +5,11 @@ package ingestor import ( - "testing" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/model" + "testing" ) func getSeries(t *testing.T, scache *cache.SeriesCacheImpl, labels labels.Labels) *model.Series { @@ -74,7 +73,7 @@ func TestLabelArrayCreator(t *testing.T) { /* test one series already set */ setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo}) - setSeries.SetSeriesID(5, 4) + setSeries.SetSeriesID(5) seriesSet = []*model.Series{ getSeries(t, scache, labels.Labels{metricNameLabel, valOne}), setSeries, diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 815eef2f2d..65dddc3c4f 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -194,6 +194,14 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p insertables = make(map[string][]model.Insertable) ) + // Determine the value of the series cache epoch _before_ getting any items + // from the cache. In order to properly abort the insert transaction if the + // cache is out of date (epoch abort), we want to determine the minimum + // cache epoch for all samples in an ingest batch. We know that everything + // currently in the cache has this epoch, and everything that we load from + // the database later will have _at least_ that epoch. + epoch := ingestor.sCache.CacheEpoch() + for i := range timeseries { var ( err error @@ -238,7 +246,7 @@ func (ingestor *DBIngestor) ingestTimeseries(ctx context.Context, timeseries []p } releaseMem() - numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now()}) + numInsertablesIngested, errSamples := ingestor.dispatcher.InsertTs(ctx, model.Data{Rows: insertables, ReceivedTime: time.Now(), SeriesCacheEpoch: epoch}) if errSamples == nil && numInsertablesIngested != totalRowsExpected { return numInsertablesIngested, fmt.Errorf("failed to insert all the data! Expected: %d, Got: %d", totalRowsExpected, numInsertablesIngested) } diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..ddc52c25d5 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -80,14 +80,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -130,14 +122,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -182,14 +166,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -231,14 +207,6 @@ func TestPGXInserterInsertSeries(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, {Sql: "BEGIN;"}, { Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", @@ -275,9 +243,9 @@ func TestPGXInserterInsertSeries(t *testing.T) { } mock := model.NewSqlRecorder(c.sqlQueries, t) scache := cache.NewSeriesCache(cache.DefaultConfig, nil) - scache.Reset() + scache.Reset(pgmodel.SeriesEpoch(time.Now().Unix())) lCache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lCache) + sw := NewSeriesWriter(mock, 0, lCache, scache) lsi := make([]model.Insertable, 0) for _, ser := range c.series { @@ -306,232 +274,25 @@ func TestPGXInserterInsertSeries(t *testing.T) { if err == nil { for _, si := range lsi { - si, se, err := si.Series().GetSeriesID() + si, err := si.Series().GetSeriesID() require.NoError(t, err) require.True(t, si > 0, "series id not set") - require.True(t, se > 0, "epoch not set") } } }) } } -func TestPGXInserterCacheReset(t *testing.T) { - series := []labels.Labels{ - { - {Name: "__name__", Value: "metric_1"}, - {Name: "name_1", Value: "value_1"}, - }, - { - {Name: "name_1", Value: "value_2"}, - {Name: "__name__", Value: "metric_1"}, - }, - } - - sqlQueries := []model.SqlQuery{ - - // first series cache fetch - {Sql: "BEGIN;"}, - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", - Args: []interface{}{ - "metric_1", - tableName, - []string{"__name__", "name_1", "name_1"}, - []string{"metric_1", "value_1", "value_2"}, - }, - Results: model.RowResults{ - {[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}}, - }, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: seriesInsertSQL, - Args: []interface{}{ - metricID, - tableName, - getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}), - }, - Results: model.RowResults{{int64(1), int64(1)}, {int64(2), int64(2)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - - // first labels cache refresh, does not trash - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(1)}}, - Err: error(nil), - }, - - // second labels cache refresh, trash the cache - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, - Err: error(nil), - }, - {Sql: "BEGIN;"}, - - // repopulate the cache - { - Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}(nil), - Results: model.RowResults{{int64(2)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: "SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", - Args: []interface{}{ - "metric_1", - tableName, - []string{"__name__", "name_1", "name_1"}, - []string{"metric_1", "value_1", "value_2"}, - }, - Results: model.RowResults{ - {[]int32{1, 2, 2}, []int32{1, 2, 3}, []string{"__name__", "name_1", "name_1"}, []string{"metric_1", "value_1", "value_2"}}, - }, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - {Sql: "BEGIN;"}, - { - Sql: seriesInsertSQL, - Args: []interface{}{ - metricID, - tableName, - getTestLabelArray(t, [][]int32{{1, 2}, {1, 3}}), - }, - Results: model.RowResults{{int64(3), int64(1)}, {int64(4), int64(2)}}, - Err: error(nil), - }, - {Sql: "COMMIT;"}, - } - - for i := range sqlQueries { - for j := range sqlQueries[i].Args { - if _, ok := sqlQueries[i].Args[j].([]string); ok { - tmp := &pgutf8str.TextArray{} - err := tmp.Set(sqlQueries[i].Args[j]) - require.NoError(t, err) - sqlQueries[i].Args[j] = tmp - } - } - } - - mock := model.NewSqlRecorder(sqlQueries, t) - scache := cache.NewSeriesCache(cache.DefaultConfig, nil) - lcache, _ := cache.NewInvertedLabelsCache(10) - sw := NewSeriesWriter(mock, 0, lcache) - inserter := pgxDispatcher{ - conn: mock, - scache: scache, - invertedLabelsCache: lcache, - } - - makeSamples := func(series []labels.Labels) []model.Insertable { - lsi := make([]model.Insertable, 0) - for _, ser := range series { - ls, err := scache.GetSeriesFromLabels(ser) - if err != nil { - t.Errorf("invalid labels %+v, %v", ls, err) - } - lsi = append(lsi, model.NewPromSamples(ls, nil)) - } - return lsi - } - - samples := makeSamples(series) - err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) - if err != nil { - t.Fatal(err) - } - - expectedIds := []model.SeriesID{ - model.SeriesID(1), - model.SeriesID(2), - } - - for index, si := range samples { - _, _, ok := si.Series().NameValues() - require.False(t, ok) - expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() - require.NoError(t, err) - if gotId != expectedId { - t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) - } - } - - // refreshing during the same epoch gives the same IDs without checking the DB - _, err = inserter.refreshSeriesEpoch(1) - require.NoError(t, err) - - samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) - if err != nil { - t.Fatal(err) - } - - for index, si := range samples { - _, _, ok := si.Series().NameValues() - require.False(t, ok) - expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() - require.NoError(t, err) - if gotId != expectedId { - t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) - } - } - - // trash the cache - _, err = inserter.refreshSeriesEpoch(1) - require.NoError(t, err) - - // retrying rechecks the DB and uses the new IDs - samples = makeSamples(series) - err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples)) - if err != nil { - t.Fatal(err) - } - - expectedIds = []model.SeriesID{ - model.SeriesID(3), - model.SeriesID(4), - } - - for index, si := range samples { - _, _, ok := si.Series().NameValues() - require.False(t, ok) - expectedId := expectedIds[index] - gotId, _, err := si.Series().GetSeriesID() - require.NoError(t, err) - if gotId != expectedId { - t.Errorf("incorrect ID:\ngot: %v\nexpected: %v", gotId, expectedId) - } - } -} - func TestPGXInserterInsertData(t *testing.T) { if err := os.Setenv("IS_TEST", "true"); err != nil { t.Fatal(err) } + testTime := time.Now().Unix() + testEpochDuration := 1 * time.Hour + seriesCacheEpoch := pgmodel.SeriesEpoch(testTime - 10) makeLabel := func() *model.Series { l := &model.Series{} - l.SetSeriesID(1, 1) + l.SetSeriesID(1) return l } @@ -544,6 +305,8 @@ func TestPGXInserterInsertData(t *testing.T) { { name: "Zero data", sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -555,6 +318,8 @@ func TestPGXInserterInsertData(t *testing.T) { "metric_0": {model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1))}, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -575,8 +340,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -591,6 +356,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -620,8 +387,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -639,6 +406,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -658,6 +427,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -680,8 +451,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the full batch - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -710,8 +481,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, { //this is the attempt on the individual copyRequests - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: fmt.Errorf("epoch error"), }, @@ -730,6 +501,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -790,6 +563,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -812,6 +587,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, metricsGetErr: fmt.Errorf("some metrics error"), sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -833,8 +610,8 @@ func TestPGXInserterInsertData(t *testing.T) { Err: error(nil), }, { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -850,6 +627,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, sqlQueries: []model.SqlQuery{ + {Sql: "SELECT _prom_catalog.initialize_current_epoch(now())", Results: model.RowResults{{testTime}}}, + {Sql: "SELECT _prom_catalog.get_default_value('epoch_duration')::INTERVAL", Results: model.RowResults{{testEpochDuration}}}, {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, @@ -900,8 +679,8 @@ func TestPGXInserterInsertData(t *testing.T) { }, // epoch check after insert from temp table { - Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", - Args: []interface{}{int64(1)}, + Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", + Args: []interface{}{int64(seriesCacheEpoch)}, Results: model.RowResults{{[]byte{}}}, Err: error(nil), }, @@ -938,7 +717,7 @@ func TestPGXInserterInsertData(t *testing.T) { } defer inserter.Close() - _, err = inserter.InsertTs(ctx, model.Data{Rows: c.rows}) + _, err = inserter.InsertTs(ctx, model.Data{Rows: c.rows, SeriesCacheEpoch: seriesCacheEpoch}) var expErr error switch { diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index f6153cef12..98b9c0128b 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..407c639333 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -136,7 +136,7 @@ func TestInitializeMetricBatcher(t *testing.T) { func TestSendBatches(t *testing.T) { makeSeries := func(seriesID int) *model.Series { l := &model.Series{} - l.SetSeriesID(pgmodel.SeriesID(seriesID), 1) + l.SetSeriesID(pgmodel.SeriesID(seriesID)) return l } var workFinished sync.WaitGroup @@ -154,7 +154,7 @@ func TestSendBatches(t *testing.T) { // we make sure that we receive batch data for i := 0; i < 3; i++ { - id, _, err := batch.data.batch.Data()[i].Series().GetSeriesID() + id, err := batch.data.batch.Data()[i].Series().GetSeriesID() if err != nil { t.Fatal(err) } diff --git a/pkg/pgmodel/ingestor/series_writer.go b/pkg/pgmodel/ingestor/series_writer.go index 44ae573f5b..8b1cb63ae7 100644 --- a/pkg/pgmodel/ingestor/series_writer.go +++ b/pkg/pgmodel/ingestor/series_writer.go @@ -7,7 +7,6 @@ package ingestor import ( "context" "fmt" - "github.com/jackc/pgtype" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -27,6 +26,7 @@ type seriesWriter struct { conn pgxconn.PgxConn labelArrayOID uint32 labelsCache *cache.InvertedLabelsCache + seriesCache cache.SeriesCache } type SeriesVisitor interface { @@ -35,8 +35,8 @@ type SeriesVisitor interface { func labelArrayTranscoder() pgtype.ValueTranscoder { return &pgtype.Int4Array{} } -func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache) *seriesWriter { - return &seriesWriter{conn, labelArrayOID, labelsCache} +func NewSeriesWriter(conn pgxconn.PgxConn, labelArrayOID uint32, labelsCache *cache.InvertedLabelsCache, seriesCache cache.SeriesCache) *seriesWriter { + return &seriesWriter{conn, labelArrayOID, labelsCache, seriesCache} } type perMetricInfo struct { @@ -59,6 +59,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi defer span.End() infos := make(map[string]*perMetricInfo) seriesCount := 0 + err := sv.VisitSeries(func(metricInfo *pgmodel.MetricInfo, series *model.Series) error { if !series.IsSeriesIDSet() { metricName := series.MetricName() @@ -124,7 +125,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi //the labels for multiple series in same txn as we are creating the series, //the ordering of label creation can only be canonical within a series and //not across series. - dbEpoch, err := h.fillLabelIDs(ctx, infos, labelMap) + err = h.fillLabelIDs(ctx, infos, labelMap) if err != nil { return fmt.Errorf("error setting series ids: %w", err) } @@ -142,7 +143,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi continue } - //transaction per metric to avoid cross-metric locks + // transaction per metric to avoid cross-metric locks batch.Queue("BEGIN;") batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet) batch.Queue("COMMIT;") @@ -176,7 +177,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi if err != nil { return fmt.Errorf("error setting series_id: cannot scan series_id: %w", err) } - info.series[int(ordinality)-1].SetSeriesID(id, dbEpoch) + info.series[int(ordinality)-1].SetSeriesID(id) count++ } if err := res.Err(); err != nil { @@ -196,7 +197,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi return nil } -func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) { +func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { _, span := tracer.Default().Start(ctx, "fill-label-ids") defer span.End() //we cannot use the label cache here because that maps label ids => name, value. @@ -204,13 +205,6 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe //we may want a new cache for that, at a later time. batch := h.conn.NewBatch() - var dbEpoch model.SeriesEpoch - - // The epoch will never decrease, so we can check it once at the beginning, - // at worst we'll store too small an epoch, which is always safe - batch.Queue("BEGIN;") - batch.Queue(getEpochSQL) - batch.Queue("COMMIT;") infoBatches := make([]*perMetricInfo, 0) items := 0 @@ -226,11 +220,11 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe } namesSlice, err := names.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing names: %w", err) + return fmt.Errorf("error filling labels: slicing names: %w", err) } valuesSlice, err := values.Slice(i, high) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: slicing values: %w", err) + return fmt.Errorf("error filling labels: slicing values: %w", err) } batch.Queue("BEGIN;") batch.Queue("SELECT * FROM _prom_catalog.get_or_create_label_ids($1, $2, $3, $4)", metricName, info.metricInfo.TableName, namesSlice, valuesSlice) @@ -250,25 +244,14 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe br, err := h.conn.SendBatch(context.Background(), batch) if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) + return fmt.Errorf("error filling labels: %w", err) } defer br.Close() - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err) - } - err = br.QueryRow().Scan(&dbEpoch) - if err != nil { - return dbEpoch, fmt.Errorf("error filling labels: %w", err) - } - if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err) - } - var count int for _, info := range infoBatches { if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on begin label batch: %w", err) + return fmt.Errorf("error filling labels on begin label batch: %w", err) } err := func() error { @@ -306,16 +289,16 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe return nil }() if err != nil { - return dbEpoch, err + return err } if _, err := br.Exec(); err != nil { - return dbEpoch, fmt.Errorf("error filling labels on commit label batch: %w", err) + return fmt.Errorf("error filling labels on commit label batch: %w", err) } } if count != items { - return dbEpoch, fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) + return fmt.Errorf("error filling labels: not filling as many items as expected: %v vs %v", count, items) } - return dbEpoch, nil + return nil } func (h *seriesWriter) buildLabelArrays(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) error { diff --git a/pkg/pgmodel/model/batch.go b/pkg/pgmodel/model/batch.go index 57e5c7370d..97cc1fca31 100644 --- a/pkg/pgmodel/model/batch.go +++ b/pkg/pgmodel/model/batch.go @@ -6,6 +6,7 @@ package model import ( "fmt" + "math" "time" "github.com/timescale/promscale/pkg/log" @@ -14,21 +15,26 @@ import ( // Data wraps incoming data with its in-timestamp. It is used to warn if the rate // of incoming samples vs outgoing samples is too low, based on time. type Data struct { - Rows map[string][]Insertable - ReceivedTime time.Time + Rows map[string][]Insertable + ReceivedTime time.Time + SeriesCacheEpoch SeriesEpoch } // Batch is an iterator over a collection of Insertables that returns // data in the format expected for the data table row. type Batch struct { - data []Insertable - numSamples int - numExemplars int + data []Insertable + seriesCacheEpoch SeriesEpoch + numSamples int + numExemplars int } // NewBatch returns a new batch that can hold samples and exemplars. func NewBatch() Batch { - si := Batch{data: make([]Insertable, 0)} + si := Batch{ + data: make([]Insertable, 0), + seriesCacheEpoch: SeriesEpoch(math.MaxInt64), + } return si } @@ -37,7 +43,7 @@ func (t *Batch) Reset() { // nil all pointers to prevent memory leaks t.data[i] = nil } - *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0} + *t = Batch{data: t.data[:0], numSamples: 0, numExemplars: 0, seriesCacheEpoch: SeriesEpoch(math.MaxInt64)} } func (t *Batch) CountSeries() int { @@ -73,6 +79,16 @@ func (t *Batch) Absorb(other Batch) { t.AppendSlice(other.data) } +func (t *Batch) SeriesCacheEpoch() SeriesEpoch { + return t.seriesCacheEpoch +} + +func (t *Batch) UpdateSeriesCacheEpoch(epoch SeriesEpoch) { + if t.seriesCacheEpoch.After(epoch) { + t.seriesCacheEpoch = epoch + } +} + func (t *Batch) Len() int { return t.CountSeries() } @@ -82,11 +98,11 @@ func (t *Batch) Swap(i, j int) { } func (t *Batch) Less(i, j int) bool { - s1, _, err := t.data[i].Series().GetSeriesID() + s1, err := t.data[i].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } - s2, _, err := t.data[j].Series().GetSeriesID() + s2, err := t.data[j].Series().GetSeriesID() if err != nil { log.Warn("seriesID", "not set but being sorted on") } diff --git a/pkg/pgmodel/model/batch_visitor.go b/pkg/pgmodel/model/batch_visitor.go index bb8461a4f4..4b77eaa5ff 100644 --- a/pkg/pgmodel/model/batch_visitor.go +++ b/pkg/pgmodel/model/batch_visitor.go @@ -10,19 +10,14 @@ import ( ) type batchVisitor struct { - batch *Batch - lowestEpoch SeriesEpoch - minTime int64 + batch *Batch + minTime int64 } -func getBatchVisitor(batch *Batch) *batchVisitor { - return &batchVisitor{batch, SeriesEpoch(math.MaxInt64), math.MaxInt64} -} +var MaxDate int64 = math.MaxInt64 -// LowestEpoch returns the lowest epoch value encountered while visiting insertables. -// It must be called after Visit() has completed. -func (vtr *batchVisitor) LowestEpoch() SeriesEpoch { - return vtr.lowestEpoch +func getBatchVisitor(batch *Batch) *batchVisitor { + return &batchVisitor{batch, math.MaxInt64} } func (vtr *batchVisitor) MinTime() int64 { @@ -34,28 +29,20 @@ func (vtr *batchVisitor) Visit( visitExemplars func(t time.Time, v float64, seriesId int64, lvalues []string), ) error { var ( - seriesId SeriesID - seriesEpoch SeriesEpoch - err error + seriesId SeriesID + err error ) - updateEpoch := func(epoch SeriesEpoch) { - if epoch < vtr.lowestEpoch { - vtr.lowestEpoch = epoch - } - } for _, insertable := range vtr.batch.data { updateMinTs := func(t int64) { if t < vtr.minTime { vtr.minTime = t } } - seriesId, seriesEpoch, err = insertable.Series().GetSeriesID() + seriesId, err = insertable.Series().GetSeriesID() if err != nil { return fmt.Errorf("get series-id: %w", err) } - updateEpoch(seriesEpoch) - switch insertable.Type() { case Sample: itr := insertable.Iterator().(SamplesIterator) diff --git a/pkg/pgmodel/model/series.go b/pkg/pgmodel/model/series.go index 3b0cead4b3..36eab0cb1e 100644 --- a/pkg/pgmodel/model/series.go +++ b/pkg/pgmodel/model/series.go @@ -19,8 +19,7 @@ import ( type SeriesID int64 const ( - invalidSeriesID = -1 - InvalidSeriesEpoch = -1 + invalidSeriesID = -1 ) func (s SeriesID) String() string { @@ -30,6 +29,14 @@ func (s SeriesID) String() string { // SeriesEpoch represents the series epoch type SeriesEpoch int64 +func (s SeriesEpoch) After(o SeriesEpoch) bool { + return s > o +} + +func (s SeriesEpoch) AfterEq(o SeriesEpoch) bool { + return s >= o +} + // Series stores a Prometheus labels.Labels in its canonical string representation type Series struct { //protects names, values, seriesID, epoch @@ -38,7 +45,6 @@ type Series struct { names []string values []string seriesID SeriesID - epoch SeriesEpoch metricName string str string @@ -50,7 +56,6 @@ func NewSeries(key string, labelPairs []prompb.Label) *Series { values: make([]string, len(labelPairs)), str: key, seriesID: invalidSeriesID, - epoch: InvalidSeriesEpoch, } for i, l := range labelPairs { series.names[i] = l.Name @@ -108,26 +113,25 @@ func (l *Series) FinalSizeBytes() uint64 { return uint64(unsafe.Sizeof(*l)) + uint64(len(l.str)+len(l.metricName)) // #nosec } -func (l *Series) GetSeriesID() (SeriesID, SeriesEpoch, error) { +func (l *Series) GetSeriesID() (SeriesID, error) { l.lock.RLock() defer l.lock.RUnlock() switch l.seriesID { case invalidSeriesID: - return 0, 0, fmt.Errorf("Series id not set") + return 0, fmt.Errorf("Series id not set") case 0: - return 0, 0, fmt.Errorf("Series id invalid") + return 0, fmt.Errorf("Series id invalid") default: - return l.seriesID, l.epoch, nil + return l.seriesID, nil } } -// note this has to be idempotent -func (l *Series) SetSeriesID(sid SeriesID, eid SeriesEpoch) { +// Note: This has to be idempotent +func (l *Series) SetSeriesID(sid SeriesID) { l.lock.Lock() defer l.lock.Unlock() l.seriesID = sid - l.epoch = eid l.names = nil l.values = nil } diff --git a/pkg/pgmodel/model/sql_test_utils.go b/pkg/pgmodel/model/sql_test_utils.go index 124965b1ee..59ad84a37a 100644 --- a/pkg/pgmodel/model/sql_test_utils.go +++ b/pkg/pgmodel/model/sql_test_utils.go @@ -413,6 +413,10 @@ func (m *MockRows) Scan(dest ...interface{}) error { if d, ok := dest[i].(*time.Time); ok { *d = s } + case time.Duration: + if d, ok := dest[i].(*time.Duration); ok { + *d = s + } case float64: if _, ok := dest[i].(float64); !ok { return fmt.Errorf("wrong value type float64") diff --git a/pkg/rules/adapters/ingest.go b/pkg/rules/adapters/ingest.go index 1b7206f796..6bb161508e 100644 --- a/pkg/rules/adapters/ingest.go +++ b/pkg/rules/adapters/ingest.go @@ -30,9 +30,10 @@ func NewIngestAdapter(inserter ingestor.DBInserter) *ingestAdapter { } type appenderAdapter struct { - data map[string][]model.Insertable - inserter ingestor.DBInserter - closed bool + data map[string][]model.Insertable + inserter ingestor.DBInserter + closed bool + seriesCacheEpoch model.SeriesEpoch } // Appender creates a new appender for Prometheus rules manager. @@ -47,8 +48,9 @@ type appenderAdapter struct { // Note: The rule manager does not call Rollback() yet. func (a ingestAdapter) Appender(_ context.Context) storage.Appender { return &appenderAdapter{ - data: make(map[string][]model.Insertable), - inserter: a.inserter, + data: make(map[string][]model.Insertable), + inserter: a.inserter, + seriesCacheEpoch: model.SeriesEpoch(0), } } @@ -60,6 +62,10 @@ func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64 if err != nil { return 0, fmt.Errorf("get ingestor: %w", err) } + cacheEpoch := dbIngestor.SeriesCache().CacheEpoch() + if app.seriesCacheEpoch == model.SeriesEpoch(0) || app.seriesCacheEpoch.After(cacheEpoch) { + app.seriesCacheEpoch = cacheEpoch + } series, metricName, err := dbIngestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l)) if err != nil { return 0, fmt.Errorf("get series from protos: %w", err) @@ -96,7 +102,7 @@ func (app *appenderAdapter) Commit() error { if err != nil { return fmt.Errorf("get ingestor: %w", err) } - numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()}) + numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now(), SeriesCacheEpoch: app.seriesCacheEpoch}) if err == nil { samplesIngested.Add(float64(numInsertablesIngested)) } diff --git a/pkg/tests/end_to_end_tests/create_test.go b/pkg/tests/end_to_end_tests/create_test.go index d1acf0e5ae..e96ee4ccdd 100644 --- a/pkg/tests/end_to_end_tests/create_test.go +++ b/pkg/tests/end_to_end_tests/create_test.go @@ -496,6 +496,163 @@ func TestSQLIngest(t *testing.T) { } } +func TestInsertAbort(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + samplesOne := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 1, Value: 0.1}, + }, + }, + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 2, Value: 0.2}, + }, + }, + } + + samplesTwo := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 3, Value: 0.1}, + }, + }, + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 4, Value: 0.2}, + }, + }, + } + + withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) { + db := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_writer") + defer db.Close() + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) + if err != nil { + t.Fatal(err) + } + defer ingestor.Close() + + // Ingest some metrics + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesOne))) + require.NoError(t, err) + + err = ingestor.CompleteMetricCreation(context.Background()) + require.NoError(t, err) + + // Adjust database delete_epoch + _, err = db.Exec(context.Background(), "UPDATE _prom_catalog.ids_epoch SET delete_epoch = current_epoch, current_epoch = current_epoch + 14400") + require.NoError(t, err) + + // Insert more samples + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesTwo))) + + // Assert that insert fails + require.Error(t, err, "") + + // Insert more samples + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesTwo))) + + // Expect that insertion is successful + require.NoError(t, err) + }) +} + +func TestCacheEvictionBetweenInserts(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + samplesOne := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 1, Value: 0.1}, + }, + }, + } + + samplesTwo := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabelName, Value: "Test"}, + {Name: "test", Value: "test"}, + }, + Samples: []prompb.Sample{ + {Timestamp: 2, Value: 0.2}, + }, + }, + } + + withDB(t, *testDatabase, func(dbOwner *pgxpool.Pool, t testing.TB) { + db := testhelpers.PgxPoolWithRole(t, *testDatabase, "prom_writer") + defer db.Close() + ingestor, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) + if err != nil { + t.Fatal(err) + } + defer ingestor.Close() + + // Ingest some metrics + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesOne))) + require.NoError(t, err) + + err = ingestor.CompleteMetricCreation(context.Background()) + require.NoError(t, err) + + // Mark all series for deletion, this will cause cache eviction if the epoch refresh runs + _, err = db.Exec(context.Background(), "UPDATE _prom_catalog.series SET delete_epoch = i.current_epoch FROM _prom_catalog.ids_epoch as i") + require.NoError(t, err) + + // Adjust database current_epoch + _, err = db.Exec(context.Background(), "UPDATE _prom_catalog.ids_epoch SET current_epoch = current_epoch + 10") + require.NoError(t, err) + + // Refresh series epoch, this will result in caches being emptied + ingestor.Dispatcher().(*ingstr.PgxDispatcher).RefreshSeriesEpoch() + + // Here we expect all items to have been removed from the cache + // We can't dig into the cache directly, but we can observe that items + // were evicted by inserting into the same series again, and seeing + // that the `delete_epoch` is NULL in the database afterwards. + + // Insert more samples for the series + _, _, err = ingestor.IngestMetrics(context.Background(), newWriteRequestWithTs(copyMetrics(samplesTwo))) + require.NoError(t, err, "") + + // Check how many series still have non-null delete_epoch + row := db.QueryRow(context.Background(), "SELECT count(*) FROM _prom_catalog.series WHERE delete_epoch IS NOT NULL") + var count int + err = row.Scan(&count) + require.NoError(t, err) + + // We expect that there are no series with non-null delete epoch + require.Equal(t, 0, count) + }) +} + func TestInsertCompressedDuplicates(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") diff --git a/pkg/tests/end_to_end_tests/drop_test.go b/pkg/tests/end_to_end_tests/drop_test.go index bd9fdf8e3e..d0187c7ddb 100644 --- a/pkg/tests/end_to_end_tests/drop_test.go +++ b/pkg/tests/end_to_end_tests/drop_test.go @@ -659,7 +659,7 @@ func TestSQLDropMetricChunk(t *testing.T) { t.Error("expected ingest to fail due to old epoch") } - scache.Reset() + scache.Reset(pgmodel.SeriesEpoch(time.Now().Unix())) ingestor.Close() ingestor2, err := ingstr.NewPgxIngestorForTests(pgxconn.NewPgxConn(db), nil) diff --git a/pkg/tests/upgrade_tests/shapshot.go b/pkg/tests/upgrade_tests/shapshot.go index 94f93f695b..ec25720aa0 100644 --- a/pkg/tests/upgrade_tests/shapshot.go +++ b/pkg/tests/upgrade_tests/shapshot.go @@ -347,7 +347,12 @@ func getTableInfosForSchema(t *testing.T, db *pgxpool.Pool, schema string) (out `SELECT array_agg(relname::TEXT order by relname::TEXT) FROM pg_class WHERE relnamespace=$1::TEXT::regnamespace AND relkind='r' - AND (relnamespace, relname) != ('_ps_catalog'::regnamespace, 'migration') + AND ( + (relnamespace, relname) != ('_ps_catalog'::regnamespace, 'migration') + AND + -- skip ids_epoch because the current_epoch can change when promscale starts + (relnamespace, relname) != ('_prom_catalog'::regnamespace, 'ids_epoch') + ) `, schema, ) @@ -359,7 +364,7 @@ func getTableInfosForSchema(t *testing.T, db *pgxpool.Pool, schema string) (out } numTables := len(tables) - if schema == "_ps_catalog" { + if schema == "_ps_catalog" || schema == "_prom_catalog" { numTables = numTables + 1 } out = make([]tableInfo, numTables) @@ -390,5 +395,33 @@ func getTableInfosForSchema(t *testing.T, db *pgxpool.Pool, schema string) (out return } } + if schema == "_prom_catalog" { + // dynamically build a query to select all columns except for `current_epoch` + row = db.QueryRow(context.Background(), + ` + WITH cols AS ( + SELECT array_agg(format('%I', a.attname)) as cols + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_attribute a ON a.attrelid = c.oid + WHERE n.nspname = '_prom_catalog' + AND relname = 'ids_epoch' + AND a.attnum > 0 + AND a.attname <> 'current_epoch' + ) + SELECT 'SELECT array_agg((' || array_to_string((SELECT cols FROM cols), ', ') || ')::TEXT) FROM _prom_catalog.ids_epoch'; + `) + var query string + if err = row.Scan(&query); err != nil { + t.Errorf("error building query: %v", err) + return + } + row = db.QueryRow(context.Background(), query) + out[numTables-1].name = "ids_epoch" + if err = row.Scan(&out[numTables-1].values); err != nil { + t.Errorf("error querying values from table _ps_catalog.ids_epoch: %v", err) + return + } + } return } diff --git a/scripts/end_to_end_tests.sh b/scripts/end_to_end_tests.sh index 80531b6e93..8ac5c94352 100755 --- a/scripts/end_to_end_tests.sh +++ b/scripts/end_to_end_tests.sh @@ -88,6 +88,7 @@ PROMSCALE_DB_PASSWORD=postgres \ PROMSCALE_DB_NAME=postgres \ PROMSCALE_DB_SSL_MODE=disable \ PROMSCALE_WEB_TELEMETRY_PATH=/metrics \ +PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS=true \ ./promscale -startup.only docker exec e2e-tsdb psql -U postgres -d postgres \