Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: '3.0'

services:
db:
image: timescale/timescaledb-ha:pg14-latest
# TODO (james): replace this when Promscale extension 0.7.0 published
image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14
ports:
- 5432:5432/tcp
environment:
Expand Down Expand Up @@ -42,6 +43,7 @@ services:
PROMSCALE_TELEMETRY_TRACE_OTEL_ENDPOINT: "otel-collector:4317"
PROMSCALE_TELEMETRY_TRACE_SAMPLING_RATIO: "0.1"
PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml
PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true

otel-collector:
platform: linux/amd64
Expand Down
5 changes: 4 additions & 1 deletion docker-compose/high-availability/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ version: '3.0'

services:
db:
image: timescale/timescaledb-ha:pg14-latest
# TODO (james): replace this when Promscale extension 0.7.0 published
image: ghcr.io/timescale/dev_promscale_extension:jg-time-based-epoch-ts2-pg14
ports:
- 5432:5432/tcp
environment:
Expand Down Expand Up @@ -44,6 +45,7 @@ services:
PROMSCALE_METRICS_HIGH_AVAILABILITY: true
PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow
PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml
PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true

promscale-connector2:
image: timescale/promscale:latest
Expand All @@ -61,6 +63,7 @@ services:
PROMSCALE_METRICS_HIGH_AVAILABILITY: true
PROMSCALE_DB_URI: postgres://postgres:password@db:5432/postgres?sslmode=allow
PROMSCALE_METRICS_RULES_CONFIG_FILE: /prometheus.yml
PROMSCALE_STARTUP_UPGRADE_PRERELEASE_EXTENSIONS: true

node_exporter:
image: quay.io/prometheus/node-exporter
Expand Down
6 changes: 3 additions & 3 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
var sampleRows [][]interface{}
var exemplarRows [][]interface{}
insertStart := time.Now()
lowestEpoch := pgmodel.SeriesEpoch(math.MaxInt64)
lowestEpoch := pgmodel.NewSeriesEpoch(pgmodel.MaxDate)
lowestMinTime := int64(math.MaxInt64)
tx, err := conn.BeginTx(ctx)
if err != nil {
Expand Down Expand Up @@ -410,7 +410,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
return err, lowestMinTime
}
epoch := visitor.LowestEpoch()
if epoch < lowestEpoch {
if lowestEpoch.After(epoch) {
lowestEpoch = epoch
}
minTime := visitor.MinTime()
Expand Down Expand Up @@ -475,7 +475,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
//thus we don't need row locking here. Note by doing this check at the end we can
//have some wasted work for the inserts before this fails but this is rare.
//avoiding an additional loop or memoization to find the lowest epoch ahead of time seems worth it.
row := tx.QueryRow(ctx, "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", int64(lowestEpoch))
row := tx.QueryRow(ctx, "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1", lowestEpoch.Time())
var val []byte
if err = row.Scan(&val); err != nil {
return err, lowestMinTime
Expand Down
17 changes: 10 additions & 7 deletions pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *pgxDispatcher) runCompleteMetricCreationWorker() {
}

func (p *pgxDispatcher) runSeriesEpochSync() {
epoch, err := p.refreshSeriesEpoch(model.InvalidSeriesEpoch)
epoch, err := p.refreshSeriesEpoch(nil)
// we don't have any great place to report errors, and if the
// connection recovers we can still make progress, so we'll just log it
// and continue execution
Expand All @@ -152,32 +152,35 @@ func (p *pgxDispatcher) runSeriesEpochSync() {
}
}

func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch model.SeriesEpoch) (model.SeriesEpoch, error) {
func (p *pgxDispatcher) refreshSeriesEpoch(existingEpoch *model.SeriesEpoch) (*model.SeriesEpoch, error) {
dbEpoch, err := p.getServerEpoch()
log.Info("msg", "Refreshing series cache epoch")
if err != nil {
log.Info("msg", "An error occurred refreshing, will reset series and inverted labels caches")
// Trash the cache just in case an epoch change occurred, seems safer
p.scache.Reset()
// Also trash the inverted labels cache, which can also be invalidated when the series cache is
p.invertedLabelsCache.Reset()
return model.InvalidSeriesEpoch, err
return nil, err
}
if existingEpoch == model.InvalidSeriesEpoch || dbEpoch != existingEpoch {
if existingEpoch == nil || *dbEpoch != *existingEpoch {
log.Info("msg", "The local epoch is no longer up-to-date, will reset series and inverted labels caches")
p.scache.Reset()
// If the series cache needs to be invalidated, so does the inverted labels cache
p.invertedLabelsCache.Reset()
}
return dbEpoch, nil
}

func (p *pgxDispatcher) getServerEpoch() (model.SeriesEpoch, error) {
func (p *pgxDispatcher) getServerEpoch() (*model.SeriesEpoch, error) {
var newEpoch int64
row := p.conn.QueryRow(context.Background(), getEpochSQL)
err := row.Scan(&newEpoch)
if err != nil {
return -1, err
return nil, err
}

return model.SeriesEpoch(newEpoch), nil
return model.NewSeriesEpoch(newEpoch), nil
}

func (p *pgxDispatcher) CompleteMetricCreation(ctx context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/ingestor/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ingestor

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -74,7 +75,7 @@ func TestLabelArrayCreator(t *testing.T) {

/* test one series already set */
setSeries := getSeries(t, scache, labels.Labels{metricNameLabel, valTwo})
setSeries.SetSeriesID(5, 4)
setSeries.SetSeriesID(5, model.NewSeriesEpoch(time.Now().Unix()))
seriesSet = []*model.Series{
getSeries(t, scache, labels.Labels{metricNameLabel, valOne}),
setSeries,
Expand Down
53 changes: 29 additions & 24 deletions pkg/pgmodel/ingestor/ingestor_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (c sVisitor) VisitSeries(cb func(info *pgmodel.MetricInfo, s *pgmodel.Serie
func TestPGXInserterInsertSeries(t *testing.T) {
// Set test env so that cache metrics uses a new registry and avoid panic on duplicate register.
require.NoError(t, os.Setenv("IS_TEST", "true"))
initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
testCases := []struct {
name string
series []labels.Labels
Expand All @@ -84,7 +85,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(1)}},
Results: model.RowResults{{initialTime}},
Err: error(nil),
},
{Sql: "COMMIT;"},
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(1)}},
Results: model.RowResults{{initialTime}},
Err: error(nil),
},
{Sql: "COMMIT;"},
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(1)}},
Results: model.RowResults{{initialTime}},
Err: error(nil),
},
{Sql: "COMMIT;"},
Expand Down Expand Up @@ -235,7 +236,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(1)}},
Results: model.RowResults{{initialTime}},
Err: error(nil),
},
{Sql: "COMMIT;"},
Expand Down Expand Up @@ -309,7 +310,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
si, se, err := si.Series().GetSeriesID()
require.NoError(t, err)
require.True(t, si > 0, "series id not set")
require.True(t, se > 0, "epoch not set")
require.True(t, se.Time() == time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), "epoch not set")
}
}
})
Expand All @@ -328,14 +329,17 @@ func TestPGXInserterCacheReset(t *testing.T) {
},
}

initialTime := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
newTime := time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC).Unix()

sqlQueries := []model.SqlQuery{

// first series cache fetch
{Sql: "BEGIN;"},
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(1)}},
Results: model.RowResults{{initialTime}},
Err: error(nil),
},
{Sql: "COMMIT;"},
Expand Down Expand Up @@ -371,15 +375,15 @@ func TestPGXInserterCacheReset(t *testing.T) {
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(1)}},
Results: model.RowResults{{initialTime}},
Err: error(nil),
},

// second labels cache refresh, trash the cache
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(2)}},
Results: model.RowResults{{newTime}},
Err: error(nil),
},
{Sql: "BEGIN;"},
Expand All @@ -388,7 +392,7 @@ func TestPGXInserterCacheReset(t *testing.T) {
{
Sql: "SELECT current_epoch FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}(nil),
Results: model.RowResults{{int64(2)}},
Results: model.RowResults{{newTime}},
Err: error(nil),
},
{Sql: "COMMIT;"},
Expand Down Expand Up @@ -477,7 +481,7 @@ func TestPGXInserterCacheReset(t *testing.T) {
}

// refreshing during the same epoch gives the same IDs without checking the DB
_, err = inserter.refreshSeriesEpoch(1)
_, err = inserter.refreshSeriesEpoch(model.NewSeriesEpoch(initialTime))
require.NoError(t, err)

samples = makeSamples(series)
Expand All @@ -498,7 +502,7 @@ func TestPGXInserterCacheReset(t *testing.T) {
}

// trash the cache
_, err = inserter.refreshSeriesEpoch(1)
_, err = inserter.refreshSeriesEpoch(model.NewSeriesEpoch(initialTime))
require.NoError(t, err)

// retrying rechecks the DB and uses the new IDs
Expand Down Expand Up @@ -529,9 +533,10 @@ func TestPGXInserterInsertData(t *testing.T) {
if err := os.Setenv("IS_TEST", "true"); err != nil {
t.Fatal(err)
}
testTime := time.Now().Unix()
makeLabel := func() *model.Series {
l := &model.Series{}
l.SetSeriesID(1, 1)
l.SetSeriesID(1, model.NewSeriesEpoch(testTime))
return l
}

Expand Down Expand Up @@ -575,8 +580,8 @@ func TestPGXInserterInsertData(t *testing.T) {
Err: error(nil),
},
{
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{testTime},
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
Expand Down Expand Up @@ -620,8 +625,8 @@ func TestPGXInserterInsertData(t *testing.T) {
Err: error(nil),
},
{
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{testTime},
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
Expand Down Expand Up @@ -680,8 +685,8 @@ func TestPGXInserterInsertData(t *testing.T) {
},
{
//this is the attempt on the full batch
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{testTime},
Results: model.RowResults{{[]byte{}}},
Err: fmt.Errorf("epoch error"),
},
Expand Down Expand Up @@ -710,8 +715,8 @@ func TestPGXInserterInsertData(t *testing.T) {
},
{
//this is the attempt on the individual copyRequests
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{testTime},
Results: model.RowResults{{[]byte{}}},
Err: fmt.Errorf("epoch error"),
},
Expand Down Expand Up @@ -833,8 +838,8 @@ func TestPGXInserterInsertData(t *testing.T) {
Err: error(nil),
},
{
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{testTime},
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
Expand Down Expand Up @@ -900,8 +905,8 @@ func TestPGXInserterInsertData(t *testing.T) {
},
// epoch check after insert from temp table
{
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Sql: "SELECT CASE $1 <= delete_epoch WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{testTime},
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/ingestor/metric_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/pgmodel/cache"
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestInitializeMetricBatcher(t *testing.T) {
func TestSendBatches(t *testing.T) {
makeSeries := func(seriesID int) *model.Series {
l := &model.Series{}
l.SetSeriesID(pgmodel.SeriesID(seriesID), 1)
l.SetSeriesID(pgmodel.SeriesID(seriesID), model.NewSeriesEpoch(time.Now().Unix()))
return l
}
var workFinished sync.WaitGroup
Expand Down
11 changes: 6 additions & 5 deletions pkg/pgmodel/ingestor/series_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package ingestor
import (
"context"
"fmt"

"github.com/jackc/pgtype"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi
continue
}

//transaction per metric to avoid cross-metric locks
// transaction per metric to avoid cross-metric locks
batch.Queue("BEGIN;")
batch.Queue(seriesInsertSQL, info.metricInfo.MetricID, info.metricInfo.TableName, info.labelArraySet)
batch.Queue("COMMIT;")
Expand Down Expand Up @@ -196,15 +195,15 @@ func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisi
return nil
}

func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (model.SeriesEpoch, error) {
func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMetricInfo, labelMap map[cache.LabelKey]cache.LabelInfo) (*model.SeriesEpoch, error) {
_, span := tracer.Default().Start(ctx, "fill-label-ids")
defer span.End()
//we cannot use the label cache here because that maps label ids => name, value.
//what we need here is name, value => id.
//we may want a new cache for that, at a later time.

batch := h.conn.NewBatch()
var dbEpoch model.SeriesEpoch
var dbEpoch *model.SeriesEpoch

// The epoch will never decrease, so we can check it once at the beginning,
// at worst we'll store too small an epoch, which is always safe
Expand Down Expand Up @@ -257,10 +256,12 @@ func (h *seriesWriter) fillLabelIDs(ctx context.Context, infos map[string]*perMe
if _, err := br.Exec(); err != nil {
return dbEpoch, fmt.Errorf("error filling labels on begin: %w", err)
}
err = br.QueryRow().Scan(&dbEpoch)
var epochTime int64
err = br.QueryRow().Scan(&epochTime)
if err != nil {
return dbEpoch, fmt.Errorf("error filling labels: %w", err)
}
dbEpoch = pgmodel.NewSeriesEpoch(epochTime)
if _, err := br.Exec(); err != nil {
return dbEpoch, fmt.Errorf("error filling labels on commit: %w", err)
}
Expand Down
Loading