From 49c8a9e918fcbb6628c90133aae9f6ad62aeb595 Mon Sep 17 00:00:00 2001 From: Jing Fu Date: Wed, 25 Feb 2015 14:24:19 -0800 Subject: [PATCH 1/9] Make statsite happy about format --- event/precisiontiming.go | 6 +++--- event/timing.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/event/precisiontiming.go b/event/precisiontiming.go index 587cd85..0e5603e 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -40,9 +40,9 @@ func (e PrecisionTiming) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e PrecisionTiming) Stats() []string { return []string{ - fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max), + fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%.6f|ms", e.Name, e.Min), + fmt.Sprintf("%s.max:%.6f|ms", e.Name, e.Max), } } diff --git a/event/timing.go b/event/timing.go index 273a1ce..f4b5f48 100644 --- a/event/timing.go +++ b/event/timing.go @@ -42,9 +42,9 @@ func (e Timing) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e Timing) Stats() []string { return []string{ - fmt.Sprintf("%s.avg:%d|a", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%d|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%d|a", e.Name, e.Max), + fmt.Sprintf("%s.avg:%d|ms", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%d|ms", e.Name, e.Min), + fmt.Sprintf("%s.max:%d|ms", e.Name, e.Max), } } From 2910a4a0be4e400c8a10cd395321fe697968e6b1 Mon Sep 17 00:00:00 2001 From: Jing Fu Date: Wed, 25 Feb 2015 14:51:29 -0800 Subject: [PATCH 2/9] fix dependency --- bufferedclient.go | 2 +- client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bufferedclient.go b/bufferedclient.go index 260da19..e5e2f71 100644 --- a/bufferedclient.go +++ b/bufferedclient.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/quipo/statsd/event" + "github.com/fuj/statsd/event" ) // request to close the buffered statsd collector diff --git a/client.go b/client.go index 0125319..11d56da 100644 --- a/client.go +++ b/client.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/quipo/statsd/event" + "github.com/fuj/statsd/event" ) // Logger interface compatible with log.Logger From 20e01f696e029d4eb079a15a3f681db9cecdef8f Mon Sep 17 00:00:00 2001 From: Jing Fu Date: Wed, 25 Feb 2015 15:57:41 -0800 Subject: [PATCH 3/9] don't use precision timing, doesn't get us anything --- event/precisiontiming.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/event/precisiontiming.go b/event/precisiontiming.go index 0e5603e..587cd85 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -40,9 +40,9 @@ func (e PrecisionTiming) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e PrecisionTiming) Stats() []string { return []string{ - fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|ms", e.Name, e.Min), - fmt.Sprintf("%s.max:%.6f|ms", e.Name, e.Max), + fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min), + fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max), } } From c77775a7b6733738019d90c9c185e6bdfde690d1 Mon Sep 17 00:00:00 2001 From: Jing Fu Date: Wed, 25 Feb 2015 17:02:21 -0800 Subject: [PATCH 4/9] revert events dir --- bufferedclient.go | 2 +- client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bufferedclient.go b/bufferedclient.go index e5e2f71..260da19 100644 --- a/bufferedclient.go +++ b/bufferedclient.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/fuj/statsd/event" + "github.com/quipo/statsd/event" ) // request to close the buffered statsd collector diff --git a/client.go b/client.go index 11d56da..0125319 100644 --- a/client.go +++ b/client.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/fuj/statsd/event" + "github.com/quipo/statsd/event" ) // Logger interface compatible with log.Logger From 1043054cbe8217436523fde6f65d52238830b257 Mon Sep 17 00:00:00 2001 From: Lorenzo Alberton Date: Thu, 26 Feb 2015 04:16:16 +0000 Subject: [PATCH 5/9] fix precision timing metric + added test case --- event/precisiontiming.go | 11 ++++++++--- event/precisiontiming_test.go | 21 +++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) create mode 100644 event/precisiontiming_test.go diff --git a/event/precisiontiming.go b/event/precisiontiming.go index 587cd85..aa64a8b 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -40,12 +40,17 @@ func (e PrecisionTiming) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e PrecisionTiming) Stats() []string { return []string{ - fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max), + fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)/1000000), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%.6f|ms", e.Name, e.durationToMs(e.Min)), + fmt.Sprintf("%s.max:%.6f|ms", e.Name, e.durationToMs(e.Max)), } } +// durationToMs converts time.Duration into the corresponding value in milliseconds +func (e PrecisionTiming) durationToMs(x time.Duration) float64 { + return float64(x) / float64(time.Millisecond) +} + // Key returns the name of this metric func (e PrecisionTiming) Key() string { return e.Name diff --git a/event/precisiontiming_test.go b/event/precisiontiming_test.go new file mode 100644 index 0000000..5e13cce --- /dev/null +++ b/event/precisiontiming_test.go @@ -0,0 +1,21 @@ +package event + +import ( + "reflect" + "testing" + "time" +) + +func TestPrecisionTimingUpdate(t *testing.T) { + e1 := NewPrecisionTiming("test", 5*time.Microsecond) + e2 := NewPrecisionTiming("test", 3*time.Microsecond) + e3 := NewPrecisionTiming("test", 7*time.Microsecond) + e1.Update(e2) + e1.Update(e3) + + expected := []string{"test.avg:0.005000|ms", "test.min:0.003000|ms", "test.max:0.007000|ms"} + actual := e1.Stats() + if !reflect.DeepEqual(expected, actual) { + t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual) + } +} From 8c351ba1edd2ec4dfb7797582d38fb02e6265a19 Mon Sep 17 00:00:00 2001 From: Lorenzo Alberton Date: Thu, 26 Feb 2015 04:25:19 +0000 Subject: [PATCH 6/9] adding .count property to aggregated timing events --- event/precisiontiming.go | 1 + event/precisiontiming_test.go | 2 +- event/timing.go | 1 + event/timing_test.go | 20 ++++++++++++++++++++ 4 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 event/timing_test.go diff --git a/event/precisiontiming.go b/event/precisiontiming.go index aa64a8b..464fd40 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -40,6 +40,7 @@ func (e PrecisionTiming) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e PrecisionTiming) Stats() []string { return []string{ + fmt.Sprintf("%s.count:%d|a", e.Name, e.Count), fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)/1000000), // make sure e.Count != 0 fmt.Sprintf("%s.min:%.6f|ms", e.Name, e.durationToMs(e.Min)), fmt.Sprintf("%s.max:%.6f|ms", e.Name, e.durationToMs(e.Max)), diff --git a/event/precisiontiming_test.go b/event/precisiontiming_test.go index 5e13cce..28eb3ee 100644 --- a/event/precisiontiming_test.go +++ b/event/precisiontiming_test.go @@ -13,7 +13,7 @@ func TestPrecisionTimingUpdate(t *testing.T) { e1.Update(e2) e1.Update(e3) - expected := []string{"test.avg:0.005000|ms", "test.min:0.003000|ms", "test.max:0.007000|ms"} + expected := []string{"test.count:3|a", "test.avg:0.005000|ms", "test.min:0.003000|ms", "test.max:0.007000|ms"} actual := e1.Stats() if !reflect.DeepEqual(expected, actual) { t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual) diff --git a/event/timing.go b/event/timing.go index f4b5f48..258a9a0 100644 --- a/event/timing.go +++ b/event/timing.go @@ -42,6 +42,7 @@ func (e Timing) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e Timing) Stats() []string { return []string{ + fmt.Sprintf("%s.count:%d|a", e.Name, e.Count), fmt.Sprintf("%s.avg:%d|ms", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 fmt.Sprintf("%s.min:%d|ms", e.Name, e.Min), fmt.Sprintf("%s.max:%d|ms", e.Name, e.Max), diff --git a/event/timing_test.go b/event/timing_test.go new file mode 100644 index 0000000..68d3d37 --- /dev/null +++ b/event/timing_test.go @@ -0,0 +1,20 @@ +package event + +import ( + "reflect" + "testing" +) + +func TestTimingUpdate(t *testing.T) { + e1 := NewTiming("test", 5) + e2 := NewTiming("test", 3) + e3 := NewTiming("test", 7) + e1.Update(e2) + e1.Update(e3) + + expected := []string{"test.count:3|a", "test.avg:5|ms", "test.min:3|ms", "test.max:7|ms"} + actual := e1.Stats() + if !reflect.DeepEqual(expected, actual) { + t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual) + } +} From 1090ab4693b587d42fbf75c2cc64a031a9f22a4b Mon Sep 17 00:00:00 2001 From: Lorenzo Alberton Date: Thu, 26 Feb 2015 04:33:06 +0000 Subject: [PATCH 7/9] import no-op client from https://github.com/wyndhblb/statsd --- noopclient.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 noopclient.go diff --git a/noopclient.go b/noopclient.go new file mode 100644 index 0000000..e2f6e10 --- /dev/null +++ b/noopclient.go @@ -0,0 +1,75 @@ +package statsd + +//@author https://github.com/wyndhblb/statsd + +import ( + "time" +) + +// NoopClient implements a "no-op" statsd in case there is no statsd server +type NoopClient struct{} + +// CreateSocket does nothing +func (s NoopClient) CreateSocket() error { + return nil +} + +// Close does nothing +func (s NoopClient) Close() error { + return nil +} + +// Incr does nothing +func (s NoopClient) Incr(stat string, count int64) error { + return nil +} + +// Decr does nothing +func (s NoopClient) Decr(stat string, count int64) error { + return nil +} + +// Timing does nothing +func (s NoopClient) Timing(stat string, count int64) error { + return nil +} + +// PrecisionTiming does nothing +func (s NoopClient) PrecisionTiming(stat string, delta time.Duration) error { + return nil +} + +// Gauge does nothing +func (s NoopClient) Gauge(stat string, value int64) error { + return nil +} + +// GaugeDelta does nothing +func (s NoopClient) GaugeDelta(stat string, value int64) error { + return nil +} + +// Absolute does nothing +func (s NoopClient) Absolute(stat string, value int64) error { + return nil +} + +// Total does nothing +func (s NoopClient) Total(stat string, value int64) error { + return nil +} + +// FGauge does nothing +func (s NoopClient) FGauge(stat string, value float64) error { + return nil +} + +// FGaugeDelta does nothing +func (s NoopClient) FGaugeDelta(stat string, value float64) error { + return nil +} + +// FAbsolute does nothing +func (s NoopClient) FAbsolute(stat string, value float64) error { + return nil +} From 7a46178531e5d255e8a374d0fb69deb4281b0ad1 Mon Sep 17 00:00:00 2001 From: Dean Elbaz Date: Wed, 25 Feb 2015 10:16:14 +0000 Subject: [PATCH 8/9] add async client --- asyncclient.go | 150 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 asyncclient.go diff --git a/asyncclient.go b/asyncclient.go new file mode 100644 index 0000000..ad023c2 --- /dev/null +++ b/asyncclient.go @@ -0,0 +1,150 @@ +package statsd + +import ( + "time" + + "github.com/quipo/statsd/event" +) + +// AsyncClient is a wrapper around the standard client +// that sends the metrics to statsd concurrently. +// errors will be sent through the ErrorChan and must be +// handled in the application. +type AsyncClient struct { + statsd *StatsdClient + ErrorChan chan error + requestChan chan func() error +} + +func NewAsyncClient(addr, prefix string) *AsyncClient { + c := &AsyncClient{ + statsd: NewStatsdClient(addr, prefix), + ErrorChan: make(chan error, 100), + requestChan: make(chan func() error, 1000), + } + + go c.process() + + return c +} + +// CreateSocket creates a UDP connection to a StatsD server +func (c *AsyncClient) CreateSocket() error { + return c.statsd.CreateSocket() +} + +// Incr - Increment a counter metric. Often used to note a particular event +func (c *AsyncClient) Incr(stat string, count int64) { + c.requestChan <- func() error { + return c.statsd.Incr(stat, count) + } +} + +// Decr - Decrement a counter metric. Often used to note a particular event +func (c *AsyncClient) Decr(stat string, count int64) { + c.requestChan <- func() error { + return c.statsd.Decr(stat, count) + } +} + +// Timing - Track a duration event +// the time delta must be given in milliseconds +func (c *AsyncClient) Timing(stat string, delta int64) { + c.requestChan <- func() error { + return c.statsd.Timing(stat, delta) + } +} + +// PrecisionTiming - Track a duration event +// the time delta has to be a duration +func (c *AsyncClient) PrecisionTiming(stat string, delta time.Duration) { + c.requestChan <- func() error { + return c.statsd.PrecisionTiming(stat, delta) + } +} + +// Gauge - Gauges are a constant data type. They are not subject to averaging, +// and they don’t change unless you change them. That is, once you set a gauge value, +// it will be a flat line on the graph until you change it again. If you specify +// delta to be true, that specifies that the gauge should be updated, not set. Due to the +// underlying protocol, you can't explicitly set a gauge to a negative number without +// first setting it to zero. +func (c *AsyncClient) Gauge(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.Gauge(stat, value) + } +} + +// GaugeDelta -- Send a change for a gauge +// Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand +func (c *AsyncClient) GaugeDelta(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.GaugeDelta(stat, value) + } +} + +// FGauge -- Send a floating point value for a gauge +func (c *AsyncClient) FGauge(stat string, value float64) { + c.requestChan <- func() error { + return c.statsd.FGauge(stat, value) + } +} + +// FGaugeDelta -- Send a floating point change for a gauge +func (c *AsyncClient) FGaugeDelta(stat string, value float64) { + c.requestChan <- func() error { + return c.statsd.FGaugeDelta(stat, value) + } +} + +// Absolute - Send absolute-valued metric (not averaged/aggregated) +func (c *AsyncClient) Absolute(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.Absolute(stat, value) + } +} + +// FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated) +func (c *AsyncClient) FAbsolute(stat string, value float64) { + c.requestChan <- func() error { + return c.statsd.FAbsolute(stat, value) + } +} + +// Total - Send a metric that is continously increasing, e.g. read operations since boot +func (c *AsyncClient) Total(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.Total(stat, value) + } +} + +// SendEvent - Sends stats from an event object +func (c *AsyncClient) SendEvent(e event.Event) { + c.requestChan <- func() error { + return c.statsd.SendEvent(e) + } +} + +func (c *AsyncClient) process() { + for { + req, open := <-c.requestChan + + if !open { + return + } + + err := req() + if err != nil { + c.ErrorChan <- err + } + } +} + +func (c *AsyncClient) Close() error { + err := c.statsd.Close() + if err != nil { + return err + } + close(c.requestChan) + return nil +} From f6e44131abff58ff14b6e26f2b2b6b535f720689 Mon Sep 17 00:00:00 2001 From: Dean Elbaz Date: Thu, 26 Feb 2015 10:32:34 +0000 Subject: [PATCH 9/9] close error chan when exiting --- asyncclient.go | 1 + 1 file changed, 1 insertion(+) diff --git a/asyncclient.go b/asyncclient.go index ad023c2..c485cd1 100644 --- a/asyncclient.go +++ b/asyncclient.go @@ -131,6 +131,7 @@ func (c *AsyncClient) process() { if !open { return + close(c.ErrorChan) } err := req()