From b70e60a79de3bec6743e92454a57a2393785249d Mon Sep 17 00:00:00 2001 From: michaelschiff Date: Tue, 26 Oct 2021 23:58:33 -0700 Subject: [PATCH 1/3] removes gazette_shard_read from consumer/interfaces.go and stops tracking of this gauge value in consumer/transaction.go - Instead, exporting this metric via the Collector implemented in consumer/resolver.go means that this stats life-cycle on the process matches that of the shard. --- consumer/interfaces.go | 9 +++++---- consumer/resolver.go | 9 +++++++++ consumer/transaction.go | 5 ----- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/consumer/interfaces.go b/consumer/interfaces.go index ec37006a..d720de96 100644 --- a/consumer/interfaces.go +++ b/consumer/interfaces.go @@ -300,6 +300,11 @@ var ( "gazette_shard_up", "Indicates the processing status of a shard by this consumer.", []string{"shard", "status"}, nil) + + shardReadHeadDesc = prometheus.NewDesc( + "gazette_shard_read_head", + "Current read head of the consumer (i.e., next journal byte offset to be read).", + []string{"shard", "journal"}, nil) ) var ( @@ -315,10 +320,6 @@ var ( Name: "gazette_shard_read_bytes_total", Help: "Total byte-length of messages processed by completed consumer transactions.", }, []string{"shard"}) - shardReadHeadGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "gazette_shard_read_head", - Help: "Current read head of the consumer (i.e., next journal byte offset to be read).", - }, []string{"shard", "journal"}) shardTxnPhaseSecondsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "gazette_shard_phase_seconds_total", Help: "Cumulative number of seconds processing transactions.", diff --git a/consumer/resolver.go b/consumer/resolver.go index e5401938..f77e0eb6 100644 --- a/consumer/resolver.go +++ b/consumer/resolver.go @@ -401,6 +401,15 @@ func (r *Resolver) Collect(ch chan<- prometheus.Metric) { 1, shardID.String(), status.Code.String()) + + for j,o := range shard.progress.readThrough { + ch <- prometheus.MustNewConstMetric( + shardReadHeadDesc, + prometheus.GaugeValue, + float64(o), + shardID.String(), + j.String()) + } } } diff --git a/consumer/transaction.go b/consumer/transaction.go index a76d4779..be48215f 100644 --- a/consumer/transaction.go +++ b/consumer/transaction.go @@ -501,11 +501,6 @@ func recordMetrics(s *shard, txn *transaction) { shardTxnTotal.WithLabelValues(s.FQN()).Inc() shardReadMsgsTotal.WithLabelValues(s.FQN()).Add(float64(txn.consumedCount)) shardReadBytesTotal.WithLabelValues(s.FQN()).Add(float64(txn.consumedBytes)) - for journal, source := range txn.checkpoint.Sources { - shardReadHeadGauge. - WithLabelValues(s.FQN(), journal.String()). - Set(float64(source.ReadThrough)) - } var ( durNotRunning = txn.beganAt.Sub(txn.prevPrepareDoneAt) From ff6e41b17fdd2c996018e4a6283d82e77bceeedb Mon Sep 17 00:00:00 2001 From: michaelschiff Date: Wed, 27 Oct 2021 01:07:07 -0700 Subject: [PATCH 2/3] gets rid of read head gauge tracking same information --- consumer/interfaces.go | 4 ---- consumer/transaction.go | 1 - 2 files changed, 5 deletions(-) diff --git a/consumer/interfaces.go b/consumer/interfaces.go index d720de96..e8bce254 100644 --- a/consumer/interfaces.go +++ b/consumer/interfaces.go @@ -358,8 +358,4 @@ var ( Name: "gazette_consumer_consumed_bytes_total", Help: "Cumulative number of bytes consumed.", }) - readHeadGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "gazette_consumer_read_head", - Help: "Consumer read head", - }, []string{"journal"}) ) diff --git a/consumer/transaction.go b/consumer/transaction.go index be48215f..910deef3 100644 --- a/consumer/transaction.go +++ b/consumer/transaction.go @@ -214,7 +214,6 @@ func txnRead(s *shard, txn, prev *transaction, env EnvelopeOrError, ok bool) err // DEPRECATED metrics to be removed: bytesConsumedTotal.Add(float64(env.End - env.Begin)) - readHeadGauge.WithLabelValues(env.Journal.Name.String()).Set(float64(env.End)) // End DEPRECATED metrics. // |env| is read-uncommitted. Queue and act on its sequencing outcome. From b3458d49213b12e48bde55b96bef89a685d1bcb2 Mon Sep 17 00:00:00 2001 From: michaelschiff Date: Fri, 29 Oct 2021 10:43:55 -0700 Subject: [PATCH 3/3] collect metrics from resolver into slice, stream slice into |ch| to avoid writing out to metrics collection client while holding lock --- consumer/interfaces.go | 6 +++--- consumer/resolver.go | 22 +++++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/consumer/interfaces.go b/consumer/interfaces.go index e8bce254..4ef8ecb8 100644 --- a/consumer/interfaces.go +++ b/consumer/interfaces.go @@ -302,9 +302,9 @@ var ( []string{"shard", "status"}, nil) shardReadHeadDesc = prometheus.NewDesc( - "gazette_shard_read_head", - "Current read head of the consumer (i.e., next journal byte offset to be read).", - []string{"shard", "journal"}, nil) + "gazette_shard_read_head", + "Current read head of the consumer (i.e., next journal byte offset to be read).", + []string{"shard", "journal"}, nil) ) var ( diff --git a/consumer/resolver.go b/consumer/resolver.go index f77e0eb6..1702bb66 100644 --- a/consumer/resolver.go +++ b/consumer/resolver.go @@ -391,26 +391,34 @@ func (r *Resolver) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector func (r *Resolver) Collect(ch chan<- prometheus.Metric) { + for _, m := range r.collectShardMetrics() { + ch <- m + } +} + +func (r *Resolver) collectShardMetrics() []prometheus.Metric { r.state.KS.Mu.RLock() defer r.state.KS.Mu.RUnlock() + metrics := make([]prometheus.Metric, 0) for shardID, shard := range r.shards { - status := shard.resolved.assignment.Decoded.(allocator.Assignment).AssignmentValue.(*pc.ReplicaStatus) - ch <- prometheus.MustNewConstMetric( + shardStatus := shard.resolved.assignment.Decoded.(allocator.Assignment).AssignmentValue.(*pc.ReplicaStatus) + metrics = append(metrics, prometheus.MustNewConstMetric( shardUpDesc, prometheus.GaugeValue, 1, shardID.String(), - status.Code.String()) - - for j,o := range shard.progress.readThrough { - ch <- prometheus.MustNewConstMetric( + shardStatus.Code.String())) + readThrough, _ := shard.Progress() + for j, o := range readThrough { + metrics = append(metrics, prometheus.MustNewConstMetric( shardReadHeadDesc, prometheus.GaugeValue, float64(o), shardID.String(), - j.String()) + j.String())) } } + return metrics } // ErrResolverStopped is returned by Resolver if a ShardID resolves to a local