diff --git a/asyncclient.go b/asyncclient.go new file mode 100644 index 0000000..c485cd1 --- /dev/null +++ b/asyncclient.go @@ -0,0 +1,151 @@ +package statsd + +import ( + "time" + + "github.com/quipo/statsd/event" +) + +// AsyncClient is a wrapper around the standard client +// that sends the metrics to statsd concurrently. +// errors will be sent through the ErrorChan and must be +// handled in the application. +type AsyncClient struct { + statsd *StatsdClient + ErrorChan chan error + requestChan chan func() error +} + +func NewAsyncClient(addr, prefix string) *AsyncClient { + c := &AsyncClient{ + statsd: NewStatsdClient(addr, prefix), + ErrorChan: make(chan error, 100), + requestChan: make(chan func() error, 1000), + } + + go c.process() + + return c +} + +// CreateSocket creates a UDP connection to a StatsD server +func (c *AsyncClient) CreateSocket() error { + return c.statsd.CreateSocket() +} + +// Incr - Increment a counter metric. Often used to note a particular event +func (c *AsyncClient) Incr(stat string, count int64) { + c.requestChan <- func() error { + return c.statsd.Incr(stat, count) + } +} + +// Decr - Decrement a counter metric. Often used to note a particular event +func (c *AsyncClient) Decr(stat string, count int64) { + c.requestChan <- func() error { + return c.statsd.Decr(stat, count) + } +} + +// Timing - Track a duration event +// the time delta must be given in milliseconds +func (c *AsyncClient) Timing(stat string, delta int64) { + c.requestChan <- func() error { + return c.statsd.Timing(stat, delta) + } +} + +// PrecisionTiming - Track a duration event +// the time delta has to be a duration +func (c *AsyncClient) PrecisionTiming(stat string, delta time.Duration) { + c.requestChan <- func() error { + return c.statsd.PrecisionTiming(stat, delta) + } +} + +// Gauge - Gauges are a constant data type. They are not subject to averaging, +// and they don’t change unless you change them. That is, once you set a gauge value, +// it will be a flat line on the graph until you change it again. If you specify +// delta to be true, that specifies that the gauge should be updated, not set. Due to the +// underlying protocol, you can't explicitly set a gauge to a negative number without +// first setting it to zero. +func (c *AsyncClient) Gauge(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.Gauge(stat, value) + } +} + +// GaugeDelta -- Send a change for a gauge +// Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand +func (c *AsyncClient) GaugeDelta(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.GaugeDelta(stat, value) + } +} + +// FGauge -- Send a floating point value for a gauge +func (c *AsyncClient) FGauge(stat string, value float64) { + c.requestChan <- func() error { + return c.statsd.FGauge(stat, value) + } +} + +// FGaugeDelta -- Send a floating point change for a gauge +func (c *AsyncClient) FGaugeDelta(stat string, value float64) { + c.requestChan <- func() error { + return c.statsd.FGaugeDelta(stat, value) + } +} + +// Absolute - Send absolute-valued metric (not averaged/aggregated) +func (c *AsyncClient) Absolute(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.Absolute(stat, value) + } +} + +// FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated) +func (c *AsyncClient) FAbsolute(stat string, value float64) { + c.requestChan <- func() error { + return c.statsd.FAbsolute(stat, value) + } +} + +// Total - Send a metric that is continously increasing, e.g. read operations since boot +func (c *AsyncClient) Total(stat string, value int64) { + c.requestChan <- func() error { + return c.statsd.Total(stat, value) + } +} + +// SendEvent - Sends stats from an event object +func (c *AsyncClient) SendEvent(e event.Event) { + c.requestChan <- func() error { + return c.statsd.SendEvent(e) + } +} + +func (c *AsyncClient) process() { + for { + req, open := <-c.requestChan + + if !open { + return + close(c.ErrorChan) + } + + err := req() + if err != nil { + c.ErrorChan <- err + } + } +} + +func (c *AsyncClient) Close() error { + err := c.statsd.Close() + if err != nil { + return err + } + close(c.requestChan) + return nil +} diff --git a/event/precisiontiming.go b/event/precisiontiming.go index 587cd85..464fd40 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -40,12 +40,18 @@ func (e PrecisionTiming) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e PrecisionTiming) Stats() []string { return []string{ - fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max), + fmt.Sprintf("%s.count:%d|a", e.Name, e.Count), + fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)/1000000), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%.6f|ms", e.Name, e.durationToMs(e.Min)), + fmt.Sprintf("%s.max:%.6f|ms", e.Name, e.durationToMs(e.Max)), } } +// durationToMs converts time.Duration into the corresponding value in milliseconds +func (e PrecisionTiming) durationToMs(x time.Duration) float64 { + return float64(x) / float64(time.Millisecond) +} + // Key returns the name of this metric func (e PrecisionTiming) Key() string { return e.Name diff --git a/event/precisiontiming_test.go b/event/precisiontiming_test.go new file mode 100644 index 0000000..28eb3ee --- /dev/null +++ b/event/precisiontiming_test.go @@ -0,0 +1,21 @@ +package event + +import ( + "reflect" + "testing" + "time" +) + +func TestPrecisionTimingUpdate(t *testing.T) { + e1 := NewPrecisionTiming("test", 5*time.Microsecond) + e2 := NewPrecisionTiming("test", 3*time.Microsecond) + e3 := NewPrecisionTiming("test", 7*time.Microsecond) + e1.Update(e2) + e1.Update(e3) + + expected := []string{"test.count:3|a", "test.avg:0.005000|ms", "test.min:0.003000|ms", "test.max:0.007000|ms"} + actual := e1.Stats() + if !reflect.DeepEqual(expected, actual) { + t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual) + } +} diff --git a/event/timing.go b/event/timing.go index 273a1ce..258a9a0 100644 --- a/event/timing.go +++ b/event/timing.go @@ -42,9 +42,10 @@ func (e Timing) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e Timing) Stats() []string { return []string{ - fmt.Sprintf("%s.avg:%d|a", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%d|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%d|a", e.Name, e.Max), + fmt.Sprintf("%s.count:%d|a", e.Name, e.Count), + fmt.Sprintf("%s.avg:%d|ms", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%d|ms", e.Name, e.Min), + fmt.Sprintf("%s.max:%d|ms", e.Name, e.Max), } } diff --git a/event/timing_test.go b/event/timing_test.go new file mode 100644 index 0000000..68d3d37 --- /dev/null +++ b/event/timing_test.go @@ -0,0 +1,20 @@ +package event + +import ( + "reflect" + "testing" +) + +func TestTimingUpdate(t *testing.T) { + e1 := NewTiming("test", 5) + e2 := NewTiming("test", 3) + e3 := NewTiming("test", 7) + e1.Update(e2) + e1.Update(e3) + + expected := []string{"test.count:3|a", "test.avg:5|ms", "test.min:3|ms", "test.max:7|ms"} + actual := e1.Stats() + if !reflect.DeepEqual(expected, actual) { + t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual) + } +} diff --git a/noopclient.go b/noopclient.go new file mode 100644 index 0000000..e2f6e10 --- /dev/null +++ b/noopclient.go @@ -0,0 +1,75 @@ +package statsd + +//@author https://github.com/wyndhblb/statsd + +import ( + "time" +) + +// NoopClient implements a "no-op" statsd in case there is no statsd server +type NoopClient struct{} + +// CreateSocket does nothing +func (s NoopClient) CreateSocket() error { + return nil +} + +// Close does nothing +func (s NoopClient) Close() error { + return nil +} + +// Incr does nothing +func (s NoopClient) Incr(stat string, count int64) error { + return nil +} + +// Decr does nothing +func (s NoopClient) Decr(stat string, count int64) error { + return nil +} + +// Timing does nothing +func (s NoopClient) Timing(stat string, count int64) error { + return nil +} + +// PrecisionTiming does nothing +func (s NoopClient) PrecisionTiming(stat string, delta time.Duration) error { + return nil +} + +// Gauge does nothing +func (s NoopClient) Gauge(stat string, value int64) error { + return nil +} + +// GaugeDelta does nothing +func (s NoopClient) GaugeDelta(stat string, value int64) error { + return nil +} + +// Absolute does nothing +func (s NoopClient) Absolute(stat string, value int64) error { + return nil +} + +// Total does nothing +func (s NoopClient) Total(stat string, value int64) error { + return nil +} + +// FGauge does nothing +func (s NoopClient) FGauge(stat string, value float64) error { + return nil +} + +// FGaugeDelta does nothing +func (s NoopClient) FGaugeDelta(stat string, value float64) error { + return nil +} + +// FAbsolute does nothing +func (s NoopClient) FAbsolute(stat string, value float64) error { + return nil +}