From 02237d81bebcda894b8dd149a5f109ceb6ee0161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Thu, 15 May 2025 10:20:29 +0200 Subject: [PATCH] scope: add RawVal type for raw float64 values Adds a new RawVal type to store raw float64 values without any aggregation (histogram, sum, etc). This provides a simpler alternative to FloatVal when only the most recent value is needed. With high cardinality data, it's important to use only minimal fields (especially as Prometheus/Victoria can calculate the sum/min/max aggregations) --- examples/minimal/main.go | 2 + scope.go | 23 ++++++++++++ val.go | 81 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+) diff --git a/examples/minimal/main.go b/examples/minimal/main.go index 1551e21..e0ed94f 100644 --- a/examples/minimal/main.go +++ b/examples/minimal/main.go @@ -50,6 +50,8 @@ func ComputeThing(ctx context.Context, arg1, arg2 int) (res int, err error) { mon.Event("hit 3") } + mon.RawVal("raw").Observe(1.0) + mon.RawValk(monkit.NewSeriesKey("rawk").WithTag("foo", "bar"), monkit.Sum, monkit.Count).Observe(1.0) mon.BoolVal("was-4").Observe(res == 4) mon.IntVal("res").Observe(int64(res)) mon.DurationVal("took").Observe(time.Second + time.Duration(rand.Intn(int(10*time.Second)))) diff --git a/scope.go b/scope.go index fba109d..e721d54 100644 --- a/scope.go +++ b/scope.go @@ -233,6 +233,29 @@ func (s *Scope) DurationVal(name string, tags ...SeriesTag) *DurationVal { return m } +// RawValk retrieves or creates a RawVal with a given key and aggregations. +func (s *Scope) RawValk(key SeriesKey, aggregations ...Aggregate) *RawVal { + source := s.newSource(key.String(), func() StatSource { + return NewRawVal(key, aggregations...) + }) + m, ok := source.(*RawVal) + if !ok { + panic(fmt.Sprintf("%s already used for another stats source: %#v", key, source)) + } + return m +} + +// RawVal retrieves or creates a RawVal after the given name and tags. +func (s *Scope) RawVal(name string, tags ...SeriesTag) *RawVal { + return s.RawValk(NewSeriesKey(name).WithTags(tags...)) +} + +// RawValf retrieves or creates a RawVal after the given printf-formatted +// name. +func (s *Scope) RawValf(template string, args ...interface{}) *RawVal { + return s.RawVal(fmt.Sprintf(template, args...)) +} + // Timer retrieves or creates a Timer after the given name. func (s *Scope) Timer(name string, tags ...SeriesTag) *Timer { source := s.newSource(sourceName("", name, tags), func() StatSource { diff --git a/val.go b/val.go index 2ca255f..dc2ceb1 100644 --- a/val.go +++ b/val.go @@ -246,3 +246,84 @@ func (v *DurationVal) Quantile(quantile float64) (rv time.Duration) { v.mtx.Unlock() return rv } + +// Aggregate can implement additional aggregation for collected values. +type Aggregate func() (observe func(val float64), stat func() (field string, val float64)) + +// RawVal is a simple wrapper around a float64 value without any aggregation +// (histogram, sum, etc). Constructed using NewRawVal, though its expected usage is like: +// +// var mon = monkit.Package() +// +// func MyFunc() { +// ... +// mon.RawVal("value").Observe(val) +// ... +// } +type RawVal struct { + mtx sync.Mutex + value float64 + key SeriesKey + stats []func() (field string, val float64) + observers []func(val float64) +} + +// NewRawVal creates a RawVal +func NewRawVal(key SeriesKey, aggregations ...Aggregate) *RawVal { + val := &RawVal{key: key} + for _, agg := range aggregations { + observe, stat := agg() + val.stats = append(val.stats, stat) + val.observers = append(val.observers, observe) + } + return val +} + +// Observe sets the current value +func (v *RawVal) Observe(val float64) { + v.mtx.Lock() + v.value = val + for _, o := range v.observers { + o(val) + } + v.mtx.Unlock() +} + +// Stats implements the StatSource interface. +func (v *RawVal) Stats(cb func(key SeriesKey, field string, val float64)) { + v.mtx.Lock() + cb(v.key, "recent", v.value) + v.mtx.Unlock() + for _, s := range v.stats { + field, value := s() + cb(v.key, field, value) + } +} + +// Get returns the current value +func (v *RawVal) Get() float64 { + v.mtx.Lock() + value := v.value + v.mtx.Unlock() + return value +} + +// Count is a value aggregator that counts the number of times the value is measured. +func Count() (observe func(val float64), stat func() (field string, val float64)) { + var counter int + return func(val float64) { + counter++ + }, func() (field string, val float64) { + return "count", float64(counter) + } +} + +// Sum is a value aggregator that summarizes the values measured. +func Sum() (observe func(val float64), stat func() (field string, val float64)) { + var sum int + return func(val float64) { + sum += int(val) + }, func() (field string, val float64) { + return "sum", float64(sum) + } +}