diff --git a/pkg/api/router.go b/pkg/api/router.go index 42dba3f007..5725befa11 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -24,6 +24,7 @@ import ( "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgclient" pgMetrics "github.com/timescale/promscale/pkg/pgmodel/metrics" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/query" "github.com/timescale/promscale/pkg/telemetry" ) @@ -167,6 +168,8 @@ func withWarnLog(msg string, handler http.Handler) http.HandlerFunc { func timeHandler(histogramVec prometheus.ObserverVec, path string, handler http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { start := time.Now() + ctx := psctx.WithStartTime(r.Context(), start) + r = r.WithContext(ctx) handler.ServeHTTP(w, r) elapsedMs := time.Since(start).Milliseconds() histogramVec.WithLabelValues(path).Observe(float64(elapsedMs)) diff --git a/pkg/log/log.go b/pkg/log/log.go index cb2c1421d7..5466ccd998 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -95,6 +95,7 @@ func Init(cfg Config) error { // NOTE: we add a level of indirection with our logging functions, // so we need additional caller depth logger = log.With(l, "ts", timestampFormat, "caller", log.Caller(4)) + traceRequestEnabled = isTraceRequestEnabled() return nil } @@ -205,3 +206,28 @@ func WarnRateLimited(keyvals ...interface{}) { func DebugRateLimited(keyvals ...interface{}) { rateLimit(debug, keyvals...) } + +var traceRequestEnabled bool + +func isTraceRequestEnabled() bool { + value := os.Getenv("PROMSCALE_TRACE_REQUEST") + if value == "" { + return false + } + enabled, err := strconv.ParseBool(value) + if err != nil || !enabled { + //assume off + return false + } + return true +} + +func TraceRequestEnabled() bool { + return traceRequestEnabled +} + +func TraceRequest(keyvals ...interface{}) { + if TraceRequestEnabled() { + Debug(keyvals...) + } +} diff --git a/pkg/pgmodel/ingestor/buffer.go b/pkg/pgmodel/ingestor/buffer.go index 2a4e6659e4..9f0ae3ba30 100644 --- a/pkg/pgmodel/ingestor/buffer.go +++ b/pkg/pgmodel/ingestor/buffer.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "sync" + "time" "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" @@ -34,6 +35,7 @@ type pendingBuffer struct { spanCtx context.Context needsResponse []insertDataTask batch model.Batch + Start time.Time } var pendingBuffers = sync.Pool{ @@ -46,7 +48,9 @@ var pendingBuffers = sync.Pool{ } func NewPendingBuffer() *pendingBuffer { - return pendingBuffers.Get().(*pendingBuffer) + pb := pendingBuffers.Get().(*pendingBuffer) + pb.Start = time.Now() + return pb } func (p *pendingBuffer) IsFull() bool { diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 7337a466d7..5911ad9545 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -40,6 +40,7 @@ type copyRequest struct { var ( getBatchMutex = &sync.Mutex{} + lastGetBatch = time.Time{} handleDecompression = retryAfterDecompression ) @@ -72,9 +73,13 @@ func (reqs copyBatch) VisitExemplar(callBack func(info *pgmodel.MetricInfo, s *p return nil } +type readRequest struct { + copySender <-chan copyRequest +} + // Handles actual insertion into the DB. // We have one of these per connection reserved for insertion. -func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf *ExemplarLabelFormatter) { +func runCopier(conn pgxconn.PgxConn, sw *seriesWriter, elf *ExemplarLabelFormatter, reservationQ *ReservationQueue) { requestBatch := make([]readRequest, 0, metrics.MaxInsertStmtPerTxn) insertBatch := make([]copyRequest, 0, cap(requestBatch)) for { @@ -90,7 +95,7 @@ func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf // the fact that we fetch the entire batch before executing any of the // reads. This guarantees that we never need to batch the same metrics // together in the copier. - requestBatch, ok = copierGetBatch(ctx, requestBatch, in) + requestBatch, ok = copierGetBatch(ctx, requestBatch, reservationQ) if !ok { span.End() return @@ -157,7 +162,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e return nil } -func copierGetBatch(ctx context.Context, batch []readRequest, in <-chan readRequest) ([]readRequest, bool) { +func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *ReservationQueue) ([]readRequest, bool) { _, span := tracer.Default().Start(ctx, "get-batch") defer span.End() //This mutex is not for safety, but rather for better batching. @@ -173,30 +178,45 @@ func copierGetBatch(ctx context.Context, batch []readRequest, in <-chan readRequ span.AddEvent("Unlocking") }(span) - req, ok := <-in + //note this metric logic does depend on the lock + now := time.Now() + if !lastGetBatch.IsZero() { + timeBetweenGetBatch := now.Sub(lastGetBatch) + metrics.IngestorWaitForCopierSeconds.With(labelsCopier).Observe(timeBetweenGetBatch.Seconds()) + + } + lastGetBatch = now + + startTime, ok := reservationQ.Peek() if !ok { return batch, false } - span.AddEvent("Appending first batch") - batch = append(batch, req) - - //we use a small timeout to prevent low-pressure systems from using up too many - //txns and putting pressure on system - timeout := time.After(20 * time.Millisecond) -hot_gather: - for len(batch) < cap(batch) { - select { - case r2 := <-in: - span.AddEvent("Appending batch") - batch = append(batch, r2) - case <-timeout: - span.AddEvent("Timeout appending batches") - break hot_gather - } - } - if len(batch) == cap(batch) { - span.AddEvent("Batch is full") + since := time.Since(startTime) + //TODO: make configurable in CLI + minDuration := 0 * time.Millisecond + + // Having a minimum batching duration can be useful if the system is using up too many txns or mxids. + // The prometheus remote-write dynamic sharding strategy should auto-adjust things to slow down writes + // in low-pressure environments but having a CLI-settable backstop can also be usefull in certain scenarios. + // Values that have previously been tested with good results: 50ms-250ms. + if since < minDuration { + span.AddEvent("Sleep waiting to batch") + sleepDuration := minDuration - since + metrics.IngestorWaitForBatchSleepSeconds.With(labelsCopier).Add(sleepDuration.Seconds()) + metrics.IngestorWaitForBatchSleepTotal.With(labelsCopier).Inc() + time.Sleep(sleepDuration) } + + metrics.IngestorPipelineTime.With(labelsCopier).Observe(time.Since(startTime).Seconds()) + span.AddEvent("After sleep") + + var reason string + batch, _, reason = reservationQ.PopOntoBatch(batch) + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": reason}).Inc() + + span.AddEvent("Flushed due to" + reason) + + metrics.IngestorBatchRemainingAfterFlushTotal.With(labelsCopier).Observe(float64(reservationQ.Len())) span.SetAttributes(attribute.Int("num_batches", len(batch))) return batch, true } @@ -501,7 +521,9 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars)) tput.ReportDuplicateMetrics(duplicateSamples, duplicateMetrics) - metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds()) + duration := time.Since(insertStart).Seconds() + metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(duration) + metrics.IngestorInsertDurationPerRow.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(duration / (float64(totalExemplars + totalSamples))) return nil, lowestMinTime } diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..74c38472eb 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -21,6 +21,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/tracer" tput "github.com/timescale/promscale/pkg/util/throughput" ) @@ -41,13 +42,14 @@ type pgxDispatcher struct { invertedLabelsCache *cache.InvertedLabelsCache exemplarKeyPosCache cache.PositionCache batchers sync.Map + batchersWG sync.WaitGroup completeMetricCreation chan struct{} asyncAcks bool - copierReadRequestCh chan<- readRequest seriesEpochRefresh *time.Ticker doneChannel chan struct{} closed *uber_atomic.Bool doneWG sync.WaitGroup + reservationQ *ReservationQueue } var _ model.Dispatcher = &pgxDispatcher{} @@ -59,13 +61,6 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac numCopiers = 1 } - // the copier read request channel retains the queue order between metrics - maxMetrics := 10000 - copierReadRequestCh := make(chan readRequest, maxMetrics) - - metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh))) - metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) }) - if cfg.IgnoreCompressedChunks { // Handle decompression to not decompress anything. handleDecompression = skipDecompression @@ -82,9 +77,9 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac } sw := NewSeriesWriter(conn, labelArrayOID, labelsCache) elf := NewExamplarLabelFormatter(conn, eCache) - + reservationQ := NewReservationQueue() for i := 0; i < numCopiers; i++ { - go runCopier(conn, copierReadRequestCh, sw, elf) + go runCopier(conn, sw, elf, reservationQ) } inserter := &pgxDispatcher{ @@ -95,11 +90,11 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac exemplarKeyPosCache: eCache, completeMetricCreation: make(chan struct{}, 1), asyncAcks: cfg.MetricsAsyncAcks, - copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval seriesEpochRefresh: time.NewTicker(30 * time.Minute), doneChannel: make(chan struct{}), closed: uber_atomic.NewBool(false), + reservationQ: reservationQ, } inserter.closed.Store(false) runBatchWatcher(inserter.doneChannel) @@ -110,7 +105,13 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac return nil, err } - go inserter.runCompleteMetricCreationWorker() + if !cfg.DisableMetricCreation { + inserter.doneWG.Add(1) + go func() { + defer inserter.doneWG.Done() + inserter.runCompleteMetricCreationWorker() + }() + } if !cfg.DisableEpochSync { inserter.doneWG.Add(1) @@ -204,7 +205,8 @@ func (p *pgxDispatcher) Close() { return true }) - close(p.copierReadRequestCh) + p.batchersWG.Wait() + p.reservationQ.Close() close(p.doneChannel) p.doneWG.Wait() } @@ -226,8 +228,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt int64 rows = dataTS.Rows workFinished = new(sync.WaitGroup) + batched = new(sync.WaitGroup) ) workFinished.Add(len(rows)) + batched.Add(len(rows)) // we only allocate enough space for a single error message here as we only // report one error back upstream. The inserter should not block on this // channel, but only insert if it's empty, anything else can deadlock. @@ -240,7 +244,17 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan} + } + + var startTime time.Time + if log.TraceRequestEnabled() { + t, err := psctx.StartTime(ctx) + if err != nil { + log.TraceRequest("component", "dispatcher", "err", err) + } + startTime = t + log.TraceRequest("component", "dispatcher", "event", "start", "metrics", len(rows), "samples", numRows, "start_time", startTime.UnixNano()) } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -258,6 +272,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 case err = <-errChan: default: } + log.TraceRequest("component", "dispatcher", "event", "ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime)) reportMetricsTelemetry(maxt, numRows, 0) close(errChan) } else { @@ -272,6 +287,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 if err != nil { log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err) } + log.TraceRequest("component", "dispatcher", "event", "async_ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime)) reportMetricsTelemetry(maxt, numRows, 0) }() } @@ -321,7 +337,11 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques actual, old := p.batchers.LoadOrStore(metric, c) batcher = actual if !old { - go runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.copierReadRequestCh) + p.batchersWG.Add(1) + go func() { + defer p.batchersWG.Done() + runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.reservationQ) + }() } } ch := batcher.(chan *insertDataRequest) @@ -330,11 +350,13 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } type insertDataRequest struct { - spanCtx trace.SpanContext - metric string - finished *sync.WaitGroup - data []model.Insertable - errChan chan error + requestCtx context.Context + spanCtx trace.SpanContext + metric string + finished *sync.WaitGroup + batched *sync.WaitGroup + data []model.Insertable + errChan chan error } func (idr *insertDataRequest) reportResult(err error) { diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 815eef2f2d..fa6bee14f9 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -28,6 +28,7 @@ type Cfg struct { TracesAsyncAcks bool NumCopiers int DisableEpochSync bool + DisableMetricCreation bool IgnoreCompressedChunks bool InvertedLabelsCacheSize uint64 TracesBatchTimeout time.Duration diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..42e48fb64f 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -536,10 +536,11 @@ func TestPGXInserterInsertData(t *testing.T) { } testCases := []struct { - name string - rows map[string][]model.Insertable - sqlQueries []model.SqlQuery - metricsGetErr error + name string + rows map[string][]model.Insertable + sqlQueries []model.SqlQuery + metricsGetErr error + disableMetricCreation bool }{ { name: "Zero data", @@ -810,7 +811,8 @@ func TestPGXInserterInsertData(t *testing.T) { model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), }, }, - metricsGetErr: fmt.Errorf("some metrics error"), + metricsGetErr: fmt.Errorf("some metrics error"), + disableMetricCreation: true, sqlQueries: []model.SqlQuery{ {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, @@ -821,7 +823,6 @@ func TestPGXInserterInsertData(t *testing.T) { Results: model.RowResults{{int64(1), "metric_0", true}}, Err: error(nil), }, - {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, { Copy: &model.Copy{ Table: pgx.Identifier{"prom_data", "metric_0"}, @@ -916,7 +917,6 @@ func TestPGXInserterInsertData(t *testing.T) { t.Run(c.name, func(t *testing.T) { mock := model.NewSqlRecorder(c.sqlQueries, t) scache := cache.NewSeriesCache(cache.DefaultConfig, nil) - mockMetrics := &model.MockMetricCache{ MetricCache: make(map[string]model.MetricInfo), GetMetricErr: c.metricsGetErr, @@ -932,7 +932,7 @@ func TestPGXInserterInsertData(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2}) + inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2, DisableMetricCreation: c.disableMetricCreation}) if err != nil { t.Fatal(err) } diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index f6153cef12..840b73aaa4 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "fmt" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -20,6 +21,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/tracer" ) @@ -35,10 +37,6 @@ func containsExemplars(data []model.Insertable) bool { return false } -type readRequest struct { - copySender <-chan copyRequest -} - func metricTableName(conn pgxconn.PgxConn, metric string) (info model.MetricInfo, possiblyNew bool, err error) { res, err := conn.Query( context.Background(), @@ -139,7 +137,7 @@ func runMetricBatcher(conn pgxconn.PgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames cache.MetricCache, - copierReadRequestCh chan<- readRequest, + reservationQ *ReservationQueue, ) { var ( info model.MetricInfo @@ -164,7 +162,7 @@ func runMetricBatcher(conn pgxconn.PgxConn, if !firstReqSet { return } - sendBatches(firstReq, input, conn, &info, copierReadRequestCh) + sendBatches(firstReq, input, conn, &info, reservationQ) } //the basic structure of communication from the batcher to the copier is as follows: @@ -181,12 +179,13 @@ func runMetricBatcher(conn pgxconn.PgxConn, // of requests consecutively to minimize processing delays. That's what the mutex in the copier does. // 2. There is an auto-adjusting adaptation loop in step 3. The longer the copier takes to catch up to the readRequest in the queue, the more things will be batched // 3. The batcher has only a single read request out at a time. -func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, conn pgxconn.PgxConn, info *model.MetricInfo, copierReadRequestCh chan<- readRequest) { +func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, conn pgxconn.PgxConn, info *model.MetricInfo, reservationQ *ReservationQueue) { var ( exemplarsInitialized = false span trace.Span ) + var reservation Reservation addReq := func(req *insertDataRequest, buf *pendingBuffer) { if !exemplarsInitialized && containsExemplars(req.data) { if err := initializeExemplars(conn, info.TableName); err != nil { @@ -205,19 +204,36 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con trace.WithAttributes(attribute.Int("insertable_count", len(req.data))), ) buf.addReq(req) + t, err := psctx.StartTime(req.requestCtx) + if err != nil { + log.Error("msg", err) + t = time.Time{} + } + metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) + reservation.Update(reservationQ, t, len(req.data)) + req.batched.Done() addSpan.End() } //This channel in synchronous (no buffering). This provides backpressure //to the batcher to keep batching until the copier is ready to read. copySender := make(chan copyRequest) defer close(copySender) - readRequest := readRequest{copySender: copySender} + + startReservation := func(req *insertDataRequest) { + t, err := psctx.StartTime(req.requestCtx) + if err != nil { + log.Error("msg", err) + t = time.Time{} + } + metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) + reservation = reservationQ.Add(copySender, req.batched, t) + } pending := NewPendingBuffer() pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches") span.SetAttributes(attribute.String("metric", info.TableName)) + startReservation(firstReq) addReq(firstReq, pending) - copierReadRequestCh <- readRequest span.AddEvent("Sent a read request") for { @@ -227,8 +243,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con if !ok { return } + startReservation(req) addReq(req, pending) - copierReadRequestCh <- readRequest span.AddEvent("Sent a read request") } @@ -239,22 +255,18 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con } numSeries := pending.batch.CountSeries() - + numSamples, numExemplars := pending.batch.Count() + wasFull := pending.IsFull() + start := pending.Start select { - //try to send first, if not then keep batching - case copySender <- copyRequest{pending, info}: - metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) - span.SetAttributes(attribute.Int("num_series", numSeries)) - span.End() - pending = NewPendingBuffer() - pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches") - span.SetAttributes(attribute.String("metric", info.TableName)) + //try to batch as much as possible before sending case req, ok := <-recvCh: if !ok { if !pending.IsEmpty() { span.AddEvent("Sending last non-empty batch") copySender <- copyRequest{pending, info} metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds()) } span.AddEvent("Exiting metric batcher batch loop") span.SetAttributes(attribute.Int("num_series", numSeries)) @@ -262,6 +274,23 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con return } addReq(req, pending) + case copySender <- copyRequest{pending, info}: + metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars)) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds()) + if wasFull { + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc() + } else { + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "requested"}).Inc() + } + //note that this is the number of waiting in the queue, not samples or series. + metrics.IngestorBatchRemainingAfterFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(len(recvCh))) + span.SetAttributes(attribute.Int("num_series", numSeries)) + span.End() + pending = NewPendingBuffer() + pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches") + span.SetAttributes(attribute.String("metric", info.TableName)) + } } } diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..3b64919f27 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -5,9 +5,11 @@ package ingestor import ( + "context" "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -15,6 +17,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/model" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/prompb" + "github.com/timescale/promscale/pkg/psctx" ) func TestMetricTableName(t *testing.T) { @@ -140,17 +143,24 @@ func TestSendBatches(t *testing.T) { return l } var workFinished sync.WaitGroup + var batched sync.WaitGroup + batched.Add(1) errChan := make(chan error, 1) data := []model.Insertable{ model.NewPromSamples(makeSeries(1), make([]prompb.Sample, 1)), model.NewPromSamples(makeSeries(2), make([]prompb.Sample, 1)), model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)), } - firstReq := &insertDataRequest{metric: "test", data: data, finished: &workFinished, errChan: errChan} - copierCh := make(chan readRequest) - go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, copierCh) - copierReq := <-copierCh - batch := <-copierReq.copySender + spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour)) + firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, batched: &batched, errChan: errChan} + reservationQ := NewReservationQueue() + go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ) + resos := make([]readRequest, 0, 1) + reservationQ.Peek() + resos, cnt, _ := reservationQ.PopOntoBatch(resos) + require.Equal(t, 1, cnt) + require.Equal(t, 1, len(resos)) + batch := <-(resos[0].copySender) // we make sure that we receive batch data for i := 0; i < 3; i++ { diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go new file mode 100644 index 0000000000..7b87e4fbca --- /dev/null +++ b/pkg/pgmodel/ingestor/reservation.go @@ -0,0 +1,230 @@ +package ingestor + +import ( + "container/heap" + "sync" + "sync/atomic" + "time" + + "github.com/timescale/promscale/pkg/log" +) + +type reservation struct { + copySender <-chan copyRequest + firstRequestBatched *sync.WaitGroup + index int + + lock sync.Mutex + startTime time.Time + + items int64 +} + +func newReservation(cs <-chan copyRequest, startTime time.Time, batched *sync.WaitGroup) *reservation { + return &reservation{cs, batched, -1, sync.Mutex{}, startTime, 1} +} + +func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) { + rest := res.GetStartTime() + atomic.AddInt64(&res.items, int64(num_insertables)) + + if t.Before(rest) { + //this should happen rarely + res.SetStartTime(t) + rq.update(res) + } +} + +func (res *reservation) GetStartTime() time.Time { + res.lock.Lock() + defer res.lock.Unlock() + return res.startTime +} + +func (res *reservation) SetStartTime(t time.Time) { + res.lock.Lock() + defer res.lock.Unlock() + + //double check that it's before + if t.Before(res.startTime) { + res.startTime = t + } +} + +// reservationQueueInternal implements heap.Interface +type reservationQueueInternal []*reservation + +func newReservationQueueInternal() *reservationQueueInternal { + q := make(reservationQueueInternal, 0, 100) + return &q +} + +func (res reservationQueueInternal) Len() int { return len(res) } + +func (res reservationQueueInternal) Less(i, j int) bool { + startTimeI := res[i].GetStartTime() + startTimeJ := res[j].GetStartTime() + if startTimeI.Equal(startTimeJ) { + itemsI := atomic.LoadInt64(&res[i].items) + itemsJ := atomic.LoadInt64(&res[j].items) + //prerer metrics with more items because they probably hold up more stuff + return itemsI > itemsJ + } + return res[i].GetStartTime().Before(res[j].GetStartTime()) +} + +func (res reservationQueueInternal) Swap(i, j int) { + res[i], res[j] = res[j], res[i] + res[i].index = i + res[j].index = j +} + +func (res *reservationQueueInternal) Push(x interface{}) { + n := len(*res) + item := x.(*reservation) + item.index = n + *res = append(*res, item) +} + +func (res *reservationQueueInternal) Pop() interface{} { + old := *res + n := len(old) + item := old[n-1] + item.index = -1 //for safety + old[n-1] = nil // avoid memory leak + *res = old[0 : n-1] + return item +} + +type Reservation interface { + Update(*ReservationQueue, time.Time, int) +} + +type ReservationQueue struct { + lock sync.Mutex + cond sync.Cond + q *reservationQueueInternal + closed bool +} + +func NewReservationQueue() *ReservationQueue { + res := &ReservationQueue{lock: sync.Mutex{}, q: newReservationQueueInternal(), closed: false} + res.cond = *sync.NewCond(&res.lock) + return res +} + +func (rq *ReservationQueue) Add(cs <-chan copyRequest, batched *sync.WaitGroup, startTime time.Time) Reservation { + si := newReservation(cs, startTime, batched) + + rq.lock.Lock() + defer rq.lock.Unlock() + + if rq.closed { + panic("Should never add to a closed queue") + } + + if rq.q.Len() == 0 { + rq.cond.Broadcast() + } + + heap.Push(rq.q, si) + return si +} + +func (rq *ReservationQueue) Len() int { + rq.lock.Lock() + defer rq.lock.Unlock() + + return rq.q.Len() +} + +func (rq *ReservationQueue) Close() { + rq.lock.Lock() + defer rq.lock.Unlock() + + rq.closed = true + rq.cond.Broadcast() +} + +// Peek gives the first startTime as well as if the queue is not closed. +// It blocks until there is an element in the queue or it has been closed. +func (rq *ReservationQueue) Peek() (time.Time, bool) { + reservation, waited, ok := rq.peek() + if !ok { + return time.Time{}, false + } + if waited { + /* If this is the first reservation in the queue, wait for the entire request to be batched with a timeout. + * (timeout is really a safety measure to prevent deadlocks if some metric batcher is full, which is unlikely)*/ + waitch := make(chan struct{}) + go func() { + reservation.firstRequestBatched.Wait() + close(waitch) + }() + select { + case <-waitch: + case <-time.After(250 * time.Millisecond): + } + log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.Len(), "waited", waited, "took", time.Since(reservation.GetStartTime())) + } + return reservation.GetStartTime(), ok +} + +func (rq *ReservationQueue) peek() (*reservation, bool, bool) { + rq.lock.Lock() + defer rq.lock.Unlock() + waited := false + for !rq.closed && rq.q.Len() == 0 { + waited = true + rq.cond.Wait() + } + + if rq.q.Len() > 0 { + firstReservation := (*rq.q)[0] + return firstReservation, waited, true + } + + //must be closed + return nil, false, false +} + +// PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty. +// never blocks. Returns number of requests pop'ed. +func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, int, string) { + rq.lock.Lock() + defer rq.lock.Unlock() + + count := 0 + items := int64(0) + if rq.q.Len() > 0 { + items = atomic.LoadInt64(&(*rq.q)[0].items) + } + total_items := int64(0) + for len(batch) < cap(batch) && rq.q.Len() > 0 && (len(batch) == 0 || items+total_items < 20000) { + res := heap.Pop(rq.q).(*reservation) + batch = append(batch, readRequest{res.copySender}) + count++ + total_items += items + items = 0 + if rq.q.Len() > 0 { + items = atomic.LoadInt64(&(*rq.q)[0].items) + } + } + reason := "timeout" + if !(len(batch) < cap(batch)) { + reason = "size_metrics" + } else if !(len(batch) == 0 || items+total_items < 20000) { + reason = "size_samples" + } + log.TraceRequest("component", "reservation", "event", "pop", "reason", reason, "metrics", count, "items", total_items, "remaining_metrics", rq.q.Len()) + return batch, count, reason +} + +func (rq *ReservationQueue) update(res *reservation) { + rq.lock.Lock() + defer rq.lock.Unlock() + if res.index < 0 { //has already been poped + return + } + heap.Fix(rq.q, res.index) +} diff --git a/pkg/pgmodel/ingestor/trace/trace_batcher.go b/pkg/pgmodel/ingestor/trace/trace_batcher.go index 65e5401d7a..d77b69685a 100644 --- a/pkg/pgmodel/ingestor/trace/trace_batcher.go +++ b/pkg/pgmodel/ingestor/trace/trace_batcher.go @@ -159,7 +159,7 @@ func (b *Batcher) batch(batchIdx int) { ticker.Reset(b.config.BatchTimeout) } if batch.isFull() { - metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "reason": "size"}).Inc() + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "subsystem": "batcher", "reason": "size"}).Inc() batch = flushBatch(batch) } } @@ -169,7 +169,7 @@ func (b *Batcher) batch(batchIdx int) { processReq(item) case <-ticker.C: batcherSpan.AddEvent("Batch timeout reached") - metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "reason": "timeout"}).Inc() + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "subsystem": "batcher", "reason": "timeout"}).Inc() if !batch.isEmpty() { batch = flushBatch(batch) } diff --git a/pkg/pgmodel/metrics/ingest.go b/pkg/pgmodel/metrics/ingest.go index fb792f0524..f3ac423125 100644 --- a/pkg/pgmodel/metrics/ingest.go +++ b/pkg/pgmodel/metrics/ingest.go @@ -5,8 +5,6 @@ package metrics import ( - "os" - "github.com/prometheus/client_golang/prometheus" "github.com/timescale/promscale/pkg/util" ) @@ -16,7 +14,7 @@ const ( MetricBatcherChannelCap = 1000 // FlushSize defines the batch size. It is the maximum number of samples/exemplars per insert batch. // This translates to the max array size that we pass into `insert_metric_row` - FlushSize = 2000 + FlushSize = 10000 MaxInsertStmtPerTxn = 100 ) @@ -75,11 +73,38 @@ var ( prometheus.HistogramOpts{ Namespace: util.PromNamespace, Subsystem: "ingest", - Name: "flush_series", + Name: "metric_batch_flush_series", Help: "Number of series batched by the ingestor.", Buckets: util.HistogramBucketsSaturating(1, 2, FlushSize), }, []string{"type", "subsystem"}, ) + IngestorFlushInsertables = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "metric_batch_flush_insertables_total", + Help: "Number of insertables batched by the ingestor.", + Buckets: append(util.HistogramBucketsSaturating(1, 2, FlushSize), 1.2*FlushSize, 2*FlushSize), + }, []string{"type", "subsystem"}, + ) + IngestorBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "metric_batch_duration_seconds", + Help: "Number of seconds that metrics were batched together", + Buckets: prometheus.DefBuckets, + }, []string{"type", "subsystem"}, + ) + IngestorPipelineTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "pipeline_time_seconds", + Help: "Time that it took to reach the subsystem, from beginning of request", + Buckets: prometheus.DefBuckets, + }, []string{"type", "subsystem"}, + ) IngestorInsertsPerBatch = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: util.PromNamespace, @@ -95,7 +120,7 @@ var ( Subsystem: "ingest", Name: "rows_per_batch", Help: "Number of rows inserted in a single transaction.", - Buckets: prometheus.ExponentialBuckets(1, 2, 15), + Buckets: prometheus.ExponentialBuckets(1, 2, 17), }, []string{"type", "subsystem"}, ) IngestorRowsPerInsert = prometheus.NewHistogramVec( @@ -116,6 +141,15 @@ var ( Buckets: append(prometheus.DefBuckets, []float64{60, 120, 300}...), }, []string{"type", "subsystem", "kind"}, ) + IngestorInsertDurationPerRow = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "insert_duration_seconds_per_row", + Help: "Duration of sample/exemplar batch insert call per row to the database.", + Buckets: append([]float64{0.000001, 0.000005, 0.00001, 0.00005, 0.0001, 0.0005, 0.001}, prometheus.DefBuckets...), + }, []string{"type", "subsystem", "kind"}, + ) IngestorActiveWriteRequests = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: util.PromNamespace, @@ -180,8 +214,17 @@ var ( Namespace: util.PromNamespace, Subsystem: "ingest", Name: "batch_flush_total", - Help: "Number of batch flushes by reason (size or timeout).", - }, []string{"type", "reason"}, + Help: "Number of batch flushes by reason (size, timeout, requested).", + }, []string{"type", "reason", "subsystem"}, + ) + IngestorBatchRemainingAfterFlushTotal = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "batch_remaining_after_flush_total", + Help: "Number of items remaining after a flush. Mostly only applies if batch flush reason was size", + Buckets: []float64{0, 10, 50, 100, 200, 500, 1000, 2000, 4000, 6000, 8000, 10000, 20000}, + }, []string{"type", "subsystem"}, ) IngestorPendingBatches = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -199,6 +242,31 @@ var ( Help: "Number of active user requests in queue.", }, []string{"type", "queue_idx"}, ) + IngestorWaitForBatchSleepSeconds = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "wait_for_batch_sleep_seconds", + Help: "Number of seconds sleeping while waiting for batch", + }, []string{"type", "subsystem"}, + ) + IngestorWaitForBatchSleepTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "wait_for_batch_sleep_total", + Help: "Number of times sleeping while waiting for batch", + }, []string{"type", "subsystem"}, + ) + IngestorWaitForCopierSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "wait_for_copier_seconds", + Help: "Number of seconds waiting for copier get batch", + Buckets: prometheus.DefBuckets, + }, []string{"type", "subsystem"}, + ) ) func init() { @@ -211,10 +279,12 @@ func init() { IngestorChannelCap, IngestorChannelLenBatcher, IngestorFlushSeries, + IngestorFlushInsertables, IngestorInsertsPerBatch, IngestorRowsPerBatch, IngestorRowsPerInsert, IngestorInsertDuration, + IngestorInsertDurationPerRow, IngestorActiveWriteRequests, IngestorDuration, IngestorItems, @@ -225,28 +295,11 @@ func init() { IngestorBatchFlushTotal, IngestorPendingBatches, IngestorRequestsQueued, + IngestorWaitForBatchSleepSeconds, + IngestorWaitForBatchSleepTotal, + IngestorBatchDuration, + IngestorPipelineTime, + IngestorBatchRemainingAfterFlushTotal, + IngestorWaitForCopierSeconds, ) } - -// RegisterCopierChannelLenMetric creates and registers the copier channel len metric with a callback -// that should return the length of the channel. -// -// Note: ingestorChannelLenCopier metric depends on prometheus call to /metrics hence we need to update with -// a callback. This is an odd one out from the other metrics in the ingestor as other metrics -// are async to prometheus calls. -func RegisterCopierChannelLenMetric(updater func() float64) { - r := prometheus.DefaultRegisterer - if val := os.Getenv("IS_TEST"); val == "true" { - r = prometheus.NewRegistry() - } - ingestorChannelLenCopier := prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: util.PromNamespace, - Subsystem: "ingest", - Name: "channel_len", - Help: "Length of the ingestor channel.", - ConstLabels: map[string]string{"type": "metric", "subsystem": "copier", "kind": "sample"}, - }, updater, - ) - r.MustRegister(ingestorChannelLenCopier) -} diff --git a/pkg/pgmodel/model/sql_test_utils.go b/pkg/pgmodel/model/sql_test_utils.go index 124965b1ee..860aea4529 100644 --- a/pkg/pgmodel/model/sql_test_utils.go +++ b/pkg/pgmodel/model/sql_test_utils.go @@ -153,6 +153,9 @@ func (r *SqlRecorder) checkQuery(sql string, args ...interface{}) (RowResults, e } assert.Equal(r.t, len(row.Args), len(args), "Args of different lengths @ %d %s", idx, sql) + if len(row.Args) != len(args) { + return nil, row.Err + } for i := range row.Args { switch row.Args[i].(type) { case pgtype.TextEncoder: diff --git a/pkg/psctx/psctx.go b/pkg/psctx/psctx.go new file mode 100644 index 0000000000..da59bef331 --- /dev/null +++ b/pkg/psctx/psctx.go @@ -0,0 +1,27 @@ +package psctx + +import ( + "context" + "fmt" + "time" +) + +type StartKey struct{} + +var ErrStartTimeNotSet = fmt.Errorf("start time not set") + +func WithStartTime(ctx context.Context, start time.Time) context.Context { + return context.WithValue(ctx, StartKey{}, start) +} + +func StartTime(ctx context.Context) (time.Time, error) { + val := ctx.Value(StartKey{}) + if val == nil { + return time.Time{}, ErrStartTimeNotSet + } + t, ok := val.(time.Time) + if !ok { + return t, fmt.Errorf("start time not time.Time, is: %T", val) + } + return t, nil +}