Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions asyncclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package statsd

import (
"time"

"github.com/quipo/statsd/event"
)

// AsyncClient is a wrapper around the standard client
// that sends the metrics to statsd concurrently.
// errors will be sent through the ErrorChan and must be
// handled in the application.
type AsyncClient struct {
statsd *StatsdClient
ErrorChan chan error
requestChan chan func() error
}

func NewAsyncClient(addr, prefix string) *AsyncClient {
c := &AsyncClient{
statsd: NewStatsdClient(addr, prefix),
ErrorChan: make(chan error, 100),
requestChan: make(chan func() error, 1000),
}

go c.process()

return c
}

// CreateSocket creates a UDP connection to a StatsD server
func (c *AsyncClient) CreateSocket() error {
return c.statsd.CreateSocket()
}

// Incr - Increment a counter metric. Often used to note a particular event
func (c *AsyncClient) Incr(stat string, count int64) {
c.requestChan <- func() error {
return c.statsd.Incr(stat, count)
}
}

// Decr - Decrement a counter metric. Often used to note a particular event
func (c *AsyncClient) Decr(stat string, count int64) {
c.requestChan <- func() error {
return c.statsd.Decr(stat, count)
}
}

// Timing - Track a duration event
// the time delta must be given in milliseconds
func (c *AsyncClient) Timing(stat string, delta int64) {
c.requestChan <- func() error {
return c.statsd.Timing(stat, delta)
}
}

// PrecisionTiming - Track a duration event
// the time delta has to be a duration
func (c *AsyncClient) PrecisionTiming(stat string, delta time.Duration) {
c.requestChan <- func() error {
return c.statsd.PrecisionTiming(stat, delta)
}
}

// Gauge - Gauges are a constant data type. They are not subject to averaging,
// and they don’t change unless you change them. That is, once you set a gauge value,
// it will be a flat line on the graph until you change it again. If you specify
// delta to be true, that specifies that the gauge should be updated, not set. Due to the
// underlying protocol, you can't explicitly set a gauge to a negative number without
// first setting it to zero.
func (c *AsyncClient) Gauge(stat string, value int64) {
c.requestChan <- func() error {
return c.statsd.Gauge(stat, value)
}
}

// GaugeDelta -- Send a change for a gauge
// Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand
func (c *AsyncClient) GaugeDelta(stat string, value int64) {
c.requestChan <- func() error {
return c.statsd.GaugeDelta(stat, value)
}
}

// FGauge -- Send a floating point value for a gauge
func (c *AsyncClient) FGauge(stat string, value float64) {
c.requestChan <- func() error {
return c.statsd.FGauge(stat, value)
}
}

// FGaugeDelta -- Send a floating point change for a gauge
func (c *AsyncClient) FGaugeDelta(stat string, value float64) {
c.requestChan <- func() error {
return c.statsd.FGaugeDelta(stat, value)
}
}

// Absolute - Send absolute-valued metric (not averaged/aggregated)
func (c *AsyncClient) Absolute(stat string, value int64) {
c.requestChan <- func() error {
return c.statsd.Absolute(stat, value)
}
}

// FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated)
func (c *AsyncClient) FAbsolute(stat string, value float64) {
c.requestChan <- func() error {
return c.statsd.FAbsolute(stat, value)
}
}

// Total - Send a metric that is continously increasing, e.g. read operations since boot
func (c *AsyncClient) Total(stat string, value int64) {
c.requestChan <- func() error {
return c.statsd.Total(stat, value)
}
}

// SendEvent - Sends stats from an event object
func (c *AsyncClient) SendEvent(e event.Event) {
c.requestChan <- func() error {
return c.statsd.SendEvent(e)
}
}

func (c *AsyncClient) process() {
for {
req, open := <-c.requestChan

if !open {
return
close(c.ErrorChan)
}

err := req()
if err != nil {
c.ErrorChan <- err
}
}
}

func (c *AsyncClient) Close() error {
err := c.statsd.Close()
if err != nil {
return err
}
close(c.requestChan)
return nil
}
12 changes: 9 additions & 3 deletions event/precisiontiming.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ func (e PrecisionTiming) Payload() interface{} {
// Stats returns an array of StatsD events as they travel over UDP
func (e PrecisionTiming) Stats() []string {
return []string{
fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0
fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min),
fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max),
fmt.Sprintf("%s.count:%d|a", e.Name, e.Count),
fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)/1000000), // make sure e.Count != 0
fmt.Sprintf("%s.min:%.6f|ms", e.Name, e.durationToMs(e.Min)),
fmt.Sprintf("%s.max:%.6f|ms", e.Name, e.durationToMs(e.Max)),
}
}

// durationToMs converts time.Duration into the corresponding value in milliseconds
func (e PrecisionTiming) durationToMs(x time.Duration) float64 {
return float64(x) / float64(time.Millisecond)
}

// Key returns the name of this metric
func (e PrecisionTiming) Key() string {
return e.Name
Expand Down
21 changes: 21 additions & 0 deletions event/precisiontiming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package event

import (
"reflect"
"testing"
"time"
)

func TestPrecisionTimingUpdate(t *testing.T) {
e1 := NewPrecisionTiming("test", 5*time.Microsecond)
e2 := NewPrecisionTiming("test", 3*time.Microsecond)
e3 := NewPrecisionTiming("test", 7*time.Microsecond)
e1.Update(e2)
e1.Update(e3)

expected := []string{"test.count:3|a", "test.avg:0.005000|ms", "test.min:0.003000|ms", "test.max:0.007000|ms"}
actual := e1.Stats()
if !reflect.DeepEqual(expected, actual) {
t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual)
}
}
7 changes: 4 additions & 3 deletions event/timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (e Timing) Payload() interface{} {
// Stats returns an array of StatsD events as they travel over UDP
func (e Timing) Stats() []string {
return []string{
fmt.Sprintf("%s.avg:%d|a", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0
fmt.Sprintf("%s.min:%d|a", e.Name, e.Min),
fmt.Sprintf("%s.max:%d|a", e.Name, e.Max),
fmt.Sprintf("%s.count:%d|a", e.Name, e.Count),
fmt.Sprintf("%s.avg:%d|ms", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0
fmt.Sprintf("%s.min:%d|ms", e.Name, e.Min),
fmt.Sprintf("%s.max:%d|ms", e.Name, e.Max),
}
}

Expand Down
20 changes: 20 additions & 0 deletions event/timing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package event

import (
"reflect"
"testing"
)

func TestTimingUpdate(t *testing.T) {
e1 := NewTiming("test", 5)
e2 := NewTiming("test", 3)
e3 := NewTiming("test", 7)
e1.Update(e2)
e1.Update(e3)

expected := []string{"test.count:3|a", "test.avg:5|ms", "test.min:3|ms", "test.max:7|ms"}
actual := e1.Stats()
if !reflect.DeepEqual(expected, actual) {
t.Errorf("did not receive all metrics: Expected: %T %v, Actual: %T %v ", expected, expected, actual, actual)
}
}
75 changes: 75 additions & 0 deletions noopclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package statsd

//@author https://github.com/wyndhblb/statsd

import (
"time"
)

// NoopClient implements a "no-op" statsd in case there is no statsd server
type NoopClient struct{}

// CreateSocket does nothing
func (s NoopClient) CreateSocket() error {
return nil
}

// Close does nothing
func (s NoopClient) Close() error {
return nil
}

// Incr does nothing
func (s NoopClient) Incr(stat string, count int64) error {
return nil
}

// Decr does nothing
func (s NoopClient) Decr(stat string, count int64) error {
return nil
}

// Timing does nothing
func (s NoopClient) Timing(stat string, count int64) error {
return nil
}

// PrecisionTiming does nothing
func (s NoopClient) PrecisionTiming(stat string, delta time.Duration) error {
return nil
}

// Gauge does nothing
func (s NoopClient) Gauge(stat string, value int64) error {
return nil
}

// GaugeDelta does nothing
func (s NoopClient) GaugeDelta(stat string, value int64) error {
return nil
}

// Absolute does nothing
func (s NoopClient) Absolute(stat string, value int64) error {
return nil
}

// Total does nothing
func (s NoopClient) Total(stat string, value int64) error {
return nil
}

// FGauge does nothing
func (s NoopClient) FGauge(stat string, value float64) error {
return nil
}

// FGaugeDelta does nothing
func (s NoopClient) FGaugeDelta(stat string, value float64) error {
return nil
}

// FAbsolute does nothing
func (s NoopClient) FAbsolute(stat string, value float64) error {
return nil
}