Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,9 @@
sudo: false
language: ruby
rvm:
- 2.0
- 2.1
- 2.2
- 2.3
- 2.4
- 2.5
script: bundle exec rake $TASK
env:
- TASK=spec
matrix:
include:
env: TASK=rubocop
rvm: 2.5
- 2.6
- 2.7
script:
- bundle exec rubocop
- bundle exec rake spec
bundler_args: --without=localdev
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# CHANGELOG

## Unreleased

* Try to make this thread safe

[//]: # (comment: Don't forget to update lib/datadog/statsd.rb:DogStatsd::Statsd::VERSION when releasing a new version)

## 4.8.0 / 2020.04.20
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ gem 'minitest-matchers'
gem 'yard', '~> 0.9.20'
gem 'single_cov'
gem 'climate_control'
gem 'concurrent-ruby', '~> 1.1.6'

if RUBY_VERSION >= '2.0.0'
gem 'rubocop', '~> 0.50.0' # bump this and TargetRubyVersion once we drop ruby 2.0
Expand Down
12 changes: 11 additions & 1 deletion lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# frozen_string_literal: true

require 'socket'

require_relative 'statsd/version'
Expand Down Expand Up @@ -42,6 +43,7 @@ class Statsd
DISTRIBUTION_TYPE = 'd'
TIMING_TYPE = 'ms'
SET_TYPE = 's'
UNIT_MS = ['un:ms'].freeze

# A namespace to prepend to all statsd calls. Defaults to no namespace.
attr_reader :namespace
Expand Down Expand Up @@ -110,6 +112,8 @@ def initialize(

@sample_rate = sample_rate

@buffer = Concurrent::Array.new

# we reduce max_buffer_bytes by a the rough estimate of the telemetry payload
@batch = Batch.new(connection, (max_buffer_bytes - telemetry.estimate_max_size))
end
Expand Down Expand Up @@ -220,7 +224,13 @@ def distribution(stat, value, opts = EMPTY_OPTIONS)
# @option opts [Array<String>] :tags An array of tags
def timing(stat, ms, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
send_stats(stat, ms, TIMING_TYPE, opts)
safe_tags = opts[:tags] || []
if safe_tags.is_a?(Array)
safe_tags += UNIT_MS
elsif safe_tags.is_a?(Hash)
safe_tags[:un] = 'ms'
end
send_stats stat, ms, TIMING_TYPE, opts.merge(tags: safe_tags)
end

# Reports execution time of the provided block using {#timing}.
Expand Down
36 changes: 22 additions & 14 deletions lib/datadog/statsd/batch.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
# frozen_string_literal: true

require 'concurrent'
require 'monitor'

module Datadog
class Statsd
class Batch
include MonitorMixin

def initialize(connection, max_buffer_bytes)
super()

@connection = connection
@max_buffer_bytes = max_buffer_bytes
@depth = 0
reset
end

def open
@depth += 1

synchronize { @depth += 1 }
yield
ensure
@depth -= 1
synchronize { @depth -= 1 }
flush if !open?
end

Expand All @@ -24,19 +30,21 @@ def open?
end

def add(message)
message_bytes = message.bytesize

unless @buffer_bytes == 0
if @buffer_bytes + 1 + message_bytes >= @max_buffer_bytes
flush
else
@buffer << "\n"
@buffer_bytes += 1
synchronize do
message_bytes = message.bytesize

unless @buffer_bytes == 0
if @buffer_bytes + 1 + message_bytes >= @max_buffer_bytes
flush
else
@buffer << "\n"
@buffer_bytes += 1
end
end
end

@buffer << message
@buffer_bytes += message_bytes
@buffer << message
@buffer_bytes += message_bytes
end
end

def flush
Expand Down
56 changes: 33 additions & 23 deletions lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# frozen_string_literal: true

require 'concurrent'
require 'monitor'

module Datadog
class Statsd
class Connection
include MonitorMixin

def initialize(telemetry)
super()
@telemetry = telemetry
end

Expand All @@ -18,36 +24,40 @@ def close
end

def write(payload)
logger.debug { "Statsd: #{payload}" } if logger
synchronize do
begin
logger.debug { "Statsd: #{payload}" } if logger

flush_telemetry = telemetry.flush?
flush_telemetry = telemetry.flush?

payload += telemetry.flush if flush_telemetry
payload += telemetry.flush if flush_telemetry

send_message(payload)
send_message(payload)

telemetry.reset if flush_telemetry
telemetry.reset if flush_telemetry

telemetry.sent(packets: 1, bytes: payload.length)
rescue StandardError => boom
# Try once to reconnect if the socket has been closed
retries ||= 1
if retries <= 1 &&
(boom.is_a?(Errno::ENOTCONN) or
boom.is_a?(Errno::ECONNREFUSED) or
boom.is_a?(IOError) && boom.message =~ /closed stream/i)
retries += 1
begin
close
retry
rescue StandardError => e
boom = e
telemetry.sent(packets: 1, bytes: payload.length)
rescue StandardError => boom
# Try once to reconnect if the socket has been closed
retries ||= 1
if retries <= 1 &&
(boom.is_a?(Errno::ENOTCONN) or
boom.is_a?(Errno::ECONNREFUSED) or
boom.is_a?(IOError) && boom.message =~ /closed stream/i)
retries += 1
begin
close
retry
rescue StandardError => e
boom = e
end
end

telemetry.dropped(packets: 1, bytes: payload.length)
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
nil
end
end

telemetry.dropped(packets: 1, bytes: payload.length)
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
nil
end

private
Expand Down
6 changes: 5 additions & 1 deletion lib/datadog/statsd/udp_connection.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# frozen_string_literal: true

require 'concurrent'
require 'monitor'
require_relative 'connection'

module Datadog
class Statsd
class UDPConnection < Connection
include MonitorMixin

DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 8125

Expand All @@ -25,7 +29,7 @@ def initialize(host, port, logger, telemetry)

def connect
UDPSocket.new.tap do |socket|
socket.connect(host, port)
synchronize { socket.connect(host, port) }
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/statsd/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

module Datadog
class Statsd
VERSION = '4.8.0'
VERSION = '4.8.1'
end
end
Loading