Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: '3.0'

services:
db:
image: timescale/timescaledb-ha:pg14-latest
# TODO (james): replace this when Promscale extension 0.7.0 published
image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14
ports:
- 5432:5432/tcp
environment:
Expand Down Expand Up @@ -42,6 +43,7 @@ services:
PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317"
PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1"
PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml
PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true

otel-collector:
platform: linux/amd64
Expand Down
5 changes: 4 additions & 1 deletion docker-compose/high-availability/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: '3.0'

services:
db:
image: timescale/timescaledb-ha:pg14-latest
# TODO (james): replace this when Promscale extension 0.7.0 published
image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14
ports:
- 5432:5432/tcp
environment:
Expand Down Expand Up @@ -44,6 +45,7 @@ services:
PROMSCALE_METRICS_HIGH_AVAILABILITY: true
PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow
PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml
PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true

promscale-connector2:
image: timescale/promscale:latest
Expand All @@ -61,6 +63,7 @@ services:
PROMSCALE_METRICS_HIGH_AVAILABILITY: true
PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow
PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml
PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true

node_exporter:
image: quay.io/prometheus/node-exporter
Expand Down
44 changes: 44 additions & 0 deletions pkg/clockcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,47 @@ func packUsedAndTimestamp(used bool, ts int32) uint32 {
}
return uint32(usedBit<<31) | uint32(ts)
}

// RemoveMatchingItems removes items whose value passes the check applied by
// the passed `matcher` function. It returns the number of entries which were
// removed from the cache.
// Note: Due to the locks that this function takes, it is best used when very
// few items stored in the cache are to be removed.
func (self *Cache) RemoveMatchingItems(matcher func(value interface{}) bool) int {
// Note: we take an RLock to find matching items, then release it and take
// a full Lock to remove them. This has two assumptions:
// 1. len(matchingItems) << len(storage): taking a big lock should be fine.
// 2. Missing the addition of an item to the cache is not problematic.
// Locking like this doesn't provide a consistent view of the cache
// contents. Either an entry is removed or added under our feet. An
// entry being removed is not a problem, we will just skip it. An entry
// being added is fine for the one consumer of this method (cache
// invalidation): if an item is added to the cache, then it was just
// created/resurrected in the DB, so it wouldn't have been a candidate
// for removal.

self.elementsLock.RLock()
matchingItems := make([]interface{}, 0)
for k, v := range self.elements {
if matcher(v.value) {
matchingItems = append(matchingItems, k)
}
}
self.elementsLock.RUnlock()
self.elementsLock.Lock()
for i := range matchingItems {
self.remove(i)
}
self.elementsLock.Unlock()
return len(matchingItems)
}

func (self *Cache) remove(key interface{}) {
value, present := self.elements[key]
if !present {
// do nothing
return
}
delete(self.elements, key)
*value = element{}
}
45 changes: 43 additions & 2 deletions pkg/pgmodel/cache/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,27 @@ var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting eleme

// SeriesCache is a cache of model.Series entries.
type SeriesCache interface {
Reset()
Reset(epoch model.SeriesEpoch)
GetSeriesFromProtos(labelPairs []prompb.Label) (series *model.Series, metricName string, err error)
Len() int
Cap() int
Evictions() uint64
EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int
CacheEpoch() model.SeriesEpoch
SetCacheEpoch(epoch model.SeriesEpoch)
}

type SeriesCacheImpl struct {
cache *clockcache.Cache
maxSizeBytes uint64
cacheEpoch model.SeriesEpoch
}

func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl {
cache := &SeriesCacheImpl{
clockcache.WithMetrics("series", "metric", config.SeriesCacheInitialSize),
config.SeriesCacheMemoryMaxBytes,
model.SeriesEpoch(0),
}

if sigClose != nil {
Expand All @@ -54,6 +59,14 @@ func NewSeriesCache(config Config, sigClose <-chan struct{}) *SeriesCacheImpl {
return cache
}

func (t *SeriesCacheImpl) CacheEpoch() model.SeriesEpoch {
return t.cacheEpoch
}

func (t *SeriesCacheImpl) SetCacheEpoch(epoch model.SeriesEpoch) {
t.cacheEpoch = epoch
}

func (t *SeriesCacheImpl) runSizeCheck(sigClose <-chan struct{}) {
ticker := time.NewTicker(growCheckDuration)
for {
Expand Down Expand Up @@ -115,8 +128,11 @@ func (t *SeriesCacheImpl) Evictions() uint64 {
}

// Reset should be concurrency-safe
func (t *SeriesCacheImpl) Reset() {
func (t *SeriesCacheImpl) Reset(epoch model.SeriesEpoch) {
t.cache.Reset()
// When we reset the cache, we need to reset the cache epoch as well,
// otherwise it may be out of date as soon as data is loaded into it.
t.SetCacheEpoch(epoch)
}

// Get the canonical version of a series if one exists.
Expand Down Expand Up @@ -277,3 +293,28 @@ func (t *SeriesCacheImpl) GetSeriesFromProtos(labelPairs []prompb.Label) (*model

return series, metricName, nil
}

func (t *SeriesCacheImpl) EvictSeriesById(seriesIds []model.SeriesID, epoch model.SeriesEpoch) int {
if len(seriesIds) == 0 {
return 0
}
seriesIdMap := make(map[model.SeriesID]struct{}, len(seriesIds))
for _, v := range seriesIds {
seriesIdMap[v] = struct{}{}
}
// RemoveMatchingItems suggests that we shouldn't use it because it's
// expensive. In normal operation (with ~30 day retention), we don't expect
// our cache to contain a lot of the series that we want to remove. This
// matches well with RemoveMatchingItems locking mechanism. See the impl.
count := t.cache.RemoveMatchingItems(func(v interface{}) bool {
value := v.(*model.Series)
id, err := value.GetSeriesID()
if err == nil {
return false
}
_, present := seriesIdMap[id]
return present
})
t.SetCacheEpoch(epoch)
return count
}
1 change: 1 addition & 0 deletions pkg/pgmodel/ingestor/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ func (p *pendingBuffer) release() {
func (p *pendingBuffer) addReq(req *insertDataRequest) {
p.needsResponse = append(p.needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan})
p.batch.AppendSlice(req.data)
p.batch.UpdateSeriesCacheEpoch(req.seriesCacheEpoch)
}
22 changes: 12 additions & 10 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e
defer span.End()
batch := copyBatch(insertBatch)
err := sw.PopulateOrCreateSeries(ctx, batch)

if err != nil {
return fmt.Errorf("copier: writing series: %w", err)
}
Expand Down Expand Up @@ -348,8 +349,8 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
var sampleRows [][]interface{}
var exemplarRows [][]interface{}
insertStart := time.Now()
lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64)
lowestMinTime := int64(math.MaxInt64)
minCacheEpoch := pgmodel.SeriesEpoch(math.MaxInt64)
tx, err := conn.BeginTx(ctx)
if err != nil {
return fmt.Errorf("failed to start transaction for inserting metrics: %v", err), lowestMinTime
Expand All @@ -364,6 +365,10 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re

for r := range reqs {
req := &reqs[r]
epoch := req.data.batch.SeriesCacheEpoch()
if minCacheEpoch.After(epoch) {
minCacheEpoch = epoch
}
// Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry)
// Samples inside series should be sorted by Prometheus
// We sort after PopulateOrCreateSeries call because we now have guarantees that all seriesIDs have been populated
Expand Down Expand Up @@ -409,10 +414,6 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
if err != nil {
return err, lowestMinTime
}
epoch := visitor.LowestEpoch()
if epoch < lowestEpoch {
lowestEpoch = epoch
}
minTime := visitor.MinTime()
if minTime < lowestMinTime {
lowestMinTime = minTime
Expand Down Expand Up @@ -471,11 +472,12 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
}
}

//note the epoch increment takes an access exclusive on the table before incrementing.
//thus we don't need row locking here. Note by doing this check at the end we can
//have some wasted work for the inserts before this fails but this is rare.
//avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it.
row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch))
// Note: The epoch increment takes an access exclusive on the table before
// incrementing, thus we don't need row locking here. By doing this check
// at the end we can have some wasted work for the inserts before this
// fails but this is rare. Avoiding an additional loop or memoization to
// find the lowest epoch ahead of time seems worth it.
row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", minCacheEpoch)
var val []byte
if err = row.Scan(&val); err != nil {
return err, lowestMinTime
Expand Down
Loading