Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b09df3b
Add origin detection
StephenWakely Jun 25, 2025
cf25b16
Add external env
StephenWakely Jun 26, 2025
7dee16a
Add tag cardinality option
StephenWakely Jun 27, 2025
c28b06a
Serialize fields
StephenWakely Jul 2, 2025
4df07b8
Update event serializer params
StephenWakely Jul 3, 2025
ff41013
Merge remote-tracking branch 'origin/master' into stephen/origin-dete…
StephenWakely Jul 4, 2025
5b48dd2
Add fields to service check
StephenWakely Jul 4, 2025
abb3b79
Use older fakefs version to test against Ruby v2
StephenWakely Jul 7, 2025
a50fafb
Ruby 2.1 compatible multiline strings
StephenWakely Jul 7, 2025
3e0ad4f
Only use old fakefs on old versions
StephenWakely Jul 7, 2025
deee154
Missed multiline
StephenWakely Jul 7, 2025
84f505a
Further fixes for windows
StephenWakely Jul 7, 2025
9c182b2
Fix content logic
StephenWakely Jul 7, 2025
6111742
Fudge allocation test
StephenWakely Jul 7, 2025
6282d13
Increase timing for macos
StephenWakely Jul 7, 2025
280b919
Try avoiding allocation
StephenWakely Jul 7, 2025
571458d
Trace the allocations
StephenWakely Jul 7, 2025
b7ed78a
Trace initialization
StephenWakely Jul 7, 2025
afb698b
Return block result
StephenWakely Jul 7, 2025
035cb88
Try warming up
StephenWakely Jul 8, 2025
4289183
Origin detection doesn't need to be a class
StephenWakely Jul 8, 2025
e7d35dd
Remove field serializer from event
StephenWakely Jul 8, 2025
922cd2e
Remove windows specific
StephenWakely Jul 8, 2025
7e1bb67
Return origin detection
StephenWakely Jul 8, 2025
dd1eb6a
Remove all linux specific alloction test
StephenWakely Jul 8, 2025
76486f8
Attempt to bound regex
StephenWakely Jul 8, 2025
166fd8b
Take substring of path.
StephenWakely Jul 8, 2025
cd89b8c
Replace regex with parser
StephenWakely Jul 8, 2025
556bfe1
Ruby <= 2.3 compliance
StephenWakely Jul 8, 2025
fcbd7f7
Remove trace allocations
StephenWakely Jul 8, 2025
29f4f08
Only turn origin detection off for linux
StephenWakely Jul 8, 2025
a4bd599
Make origin detection methods private
StephenWakely Jul 9, 2025
1c58afa
Remove unused instance vars
StephenWakely Jul 9, 2025
ecbbf3d
Add origin fields to telemetry metrics
StephenWakely Jul 10, 2025
373be59
Add delay serializer tests
StephenWakely Jul 10, 2025
cea7499
Fix field order
StephenWakely Jul 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ end

group :test do
gem 'mocha'
if RUBY_VERSION < '3.0'
gem 'fakefs', '~> 0.13.3'
else
gem 'fakefs', '~> 3.0'
end
end
70 changes: 66 additions & 4 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require_relative 'statsd/udp_connection'
require_relative 'statsd/uds_connection'
require_relative 'statsd/connection_cfg'
require_relative 'statsd/origin_detection'
require_relative 'statsd/message_buffer'
require_relative 'statsd/serialization'
require_relative 'statsd/sender'
Expand Down Expand Up @@ -82,6 +83,9 @@ def tags
# @option [Float] default sample rate if not overridden
# @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread
# @option [Boolean] delay_serialization delays stat serialization
# @option [Boolean] origin_detection is origin detection enabled
# @option [String] container_id the container ID field, used for origin detection
# @option [String] cardinality the default tag cardinality to use
def initialize(
host = nil,
port = nil,
Expand All @@ -104,15 +108,32 @@ def initialize(
delay_serialization: false,

telemetry_enable: true,
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL,

origin_detection: true,
container_id: nil,
cardinality: nil
)
unless tags.nil? || tags.is_a?(Array) || tags.is_a?(Hash)
raise ArgumentError, 'tags must be an array of string tags or a Hash'
end

@namespace = namespace
@prefix = @namespace ? "#{@namespace}.".freeze : nil
@serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags)

origin_detection_enabled = origin_detection_enabled?(origin_detection)
container_id = get_container_id(container_id, origin_detection_enabled)

external_data = sanitize(ENV['DD_EXTERNAL_ENV'])

@serializer = Serialization::Serializer.new(prefix: @prefix,
container_id: container_id,
external_data: external_data,
global_tags: tags,
)

@cardinality = cardinality || ENV['DD_CARDINALITY'] || ENV['DATADOG_CARDINALITY']

@sample_rate = sample_rate
@delay_serialization = delay_serialization

Expand All @@ -136,6 +157,10 @@ def initialize(
sender_queue_size: sender_queue_size,

telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
container_id: container_id,
external_data: external_data,
cardinality: @cardinality,

serializer: serializer
)
end
Expand All @@ -159,6 +184,7 @@ def self.open(*args, **kwargs)
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [Numeric] :by increment value, default 1
# @option opts [String] :cardinality The tag cardinality to use
# @see #count
def increment(stat, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
Expand All @@ -174,6 +200,7 @@ def increment(stat, opts = EMPTY_OPTIONS)
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [Numeric] :by decrement value, default 1
# @option opts [String] :cardinality The tag cardinality to use
# @see #count
def decrement(stat, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
Expand All @@ -189,6 +216,7 @@ def decrement(stat, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
def count(stat, count, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
send_stats(stat, count, COUNTER_TYPE, opts)
Expand All @@ -206,6 +234,7 @@ def count(stat, count, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the current user count:
# $statsd.gauge('user.count', User.count)
def gauge(stat, value, opts = EMPTY_OPTIONS)
Expand All @@ -221,6 +250,7 @@ def gauge(stat, value, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the current user count:
# $statsd.histogram('user.count', User.count)
def histogram(stat, value, opts = EMPTY_OPTIONS)
Expand All @@ -235,6 +265,7 @@ def histogram(stat, value, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the current user count:
# $statsd.distribution('user.count', User.count)
def distribution(stat, value, opts = EMPTY_OPTIONS)
Expand All @@ -251,6 +282,7 @@ def distribution(stat, value, opts = EMPTY_OPTIONS)
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the time (in ms) taken to activate an account
# $statsd.distribution_time('account.activate') { @account.activate! }
def distribution_time(stat, opts = EMPTY_OPTIONS)
Expand All @@ -272,6 +304,7 @@ def distribution_time(stat, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
def timing(stat, ms, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
send_stats(stat, ms, TIMING_TYPE, opts)
Expand All @@ -287,6 +320,7 @@ def timing(stat, ms, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @yield The operation to be timed
# @see #timing
# @example Report the time (in ms) taken to activate an account
Expand All @@ -307,6 +341,7 @@ def time(stat, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Record a unique visitory by id:
# $statsd.set('visitors.uniques', User.id)
def set(stat, value, opts = EMPTY_OPTIONS)
Expand Down Expand Up @@ -348,6 +383,7 @@ def service_check(name, status, opts = EMPTY_OPTIONS)
# @option opts [String, nil] :alert_type ('info') Can be "error", "warning", "info" or "success".
# @option opts [Boolean, false] :truncate_if_too_long (false) Truncate the event if it is too long
# @option opts [Array<String>] :tags tags to be added to every metric
# @option opts [String] :cardinality The tag cardinality to use
# @example Report an awful event:
# $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent'])
def event(title, text, opts = EMPTY_OPTIONS)
Expand Down Expand Up @@ -427,17 +463,43 @@ def send_stats(stat, delta, type, opts = EMPTY_OPTIONS)
telemetry.sent(metrics: 1) if telemetry

sample_rate = opts[:sample_rate] || @sample_rate || 1
cardinality = opts[:cardinality] || @cardinality

if sample_rate == 1 || opts[:pre_sampled] || rand <= sample_rate
full_stat =
if @delay_serialization
[stat, delta, type, opts[:tags], sample_rate]
[stat, delta, type, opts[:tags], sample_rate, cardinality]
else
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate)
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate, cardinality: cardinality)
end

forwarder.send_message(full_stat)
end
end

def origin_detection_enabled?(origin_detection)
if !origin_detection.nil? && !origin_detection
return false
end

if ENV['DD_ORIGIN_DETECTION_ENABLED']
return ![
'0',
'f',
'false'
].include?(
ENV['DD_ORIGIN_DETECTION_ENABLED'].downcase
)
end

return true
end

# Sanitize the DD_EXTERNAL_ENV input to ensure it doesn't contain invalid characters
# that may break the protocol.
# Removing any non-printable characters and `|`.
def sanitize(external_data)
external_data.gsub(/[^[:print:]]|`\|/, '') unless external_data.nil?
end
end
end
6 changes: 6 additions & 0 deletions lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ def initialize(
single_thread: false,

logger: nil,
container_id: nil,
external_data: nil,
cardinality: nil,

serializer:
)
@transport_type = connection_cfg.transport_type

@telemetry = if telemetry_flush_interval
Telemetry.new(telemetry_flush_interval,
container_id,
external_data,
cardinality,
global_tags: global_tags,
transport_type: @transport_type
)
Expand Down
4 changes: 2 additions & 2 deletions lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def add(message)
# Serializes the message if it hasn't been already. Part of the
# delay_serialization feature.
if message.is_a?(Array)
stat, delta, type, tags, sample_rate = message
message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate)
stat, delta, type, tags, sample_rate, cardinality = message
message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate, cardinality: cardinality)
end

message_size = message.bytesize
Expand Down
155 changes: 155 additions & 0 deletions lib/datadog/statsd/origin_detection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# frozen_string_literal: true
module Datadog
class Statsd
private

CGROUPV1BASECONTROLLER = "memory"
HOSTCGROUPNAMESPACEINODE = 0xEFFFFFFB

def host_cgroup_namespace?
stat = File.stat("/proc/self/ns/cgroup") rescue nil
return false unless stat
stat.ino == HOSTCGROUPNAMESPACEINODE
end

def parse_cgroup_node_path(lines)
res = {}
lines.split("\n").each do |line|
tokens = line.split(':')
next unless tokens.length == 3

controller = tokens[1]
path = tokens[2]

if controller == CGROUPV1BASECONTROLLER || controller == ''
res[controller] = path
end
end

res
end

def get_cgroup_inode(cgroup_mount_path, proc_self_cgroup_path)
content = File.read(proc_self_cgroup_path) rescue nil
return nil unless content

controllers = parse_cgroup_node_path(content)

[CGROUPV1BASECONTROLLER, ''].each do |controller|
next unless controllers[controller]

segments = [
cgroup_mount_path.chomp('/'),
controller.strip,
controllers[controller].sub(/^\//, '')
]
path = segments.reject(&:empty?).join("/")
inode = inode_for_path(path)
return inode unless inode.nil?
end

nil
end

def inode_for_path(path)
stat = File.stat(path) rescue nil
return nil unless stat
"in-#{stat.ino}"
end

def parse_container_id(handle)
exp_line = /^\d+:[^:]*:(.+)$/
uuid = /[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}/
container = /[0-9a-f]{64}/
task = /[0-9a-f]{32}-\d+/
exp_container_id = /(#{uuid}|#{container}|#{task})(?:\.scope)?$/

handle.each_line do |line|
match = line.match(exp_line)
next unless match && match[1]
id_match = match[1].match(exp_container_id)

return id_match[1] if id_match && id_match[1]
end

nil
end

def read_container_id(fpath)
handle = File.open(fpath, 'r') rescue nil
return nil unless handle

id = parse_container_id(handle)
handle.close
id
end

# Extracts the final container info from a line in mount info
def extract_container_info(line)
parts = line.strip.split("/")
return nil unless parts.last == "hostname"

# Expected structure: [..., <group>, <container_id>, ..., "hostname"]
container_id = nil
group = nil

parts.each_with_index do |part, idx|
# Match the container id and include the section prior to it.
if part.length == 64 && !!(part =~ /\A[0-9a-f]{64}\z/)
group = parts[idx - 1] if idx >= 1
container_id = part
elsif part.length > 32 && !!(part =~ /\A[0-9a-f]{32}-\d+\z/)
group = parts[idx - 1] if idx >= 1
container_id = part
elsif !!(part =~ /\A[0-9a-f]{8}(-[0-9a-f]{4}){4}\z/)
group = parts[idx - 1] if idx >= 1
container_id = part
end
end

return container_id unless group == "sandboxes"
end

# Parse /proc/self/mountinfo to extract the container id.
# Often container runtimes embed the container id in the mount paths.
# We parse the mount with a final `hostname` component, which is part of
# the containers `etc/hostname` bind mount.
def parse_mount_info(handle)
handle.each_line do |line|
split = line.split(" ")
mnt1 = split[3]
mnt2 = split[4]
[mnt1, mnt2].each do |line|
container_id = extract_container_info(line)
return container_id unless container_id.nil?
end
end

nil
end

def read_mount_info(path)
handle = File.open(path, 'r') rescue nil
return nil unless handle

info = parse_mount_info(handle)
handle.close
info
end

def get_container_id(user_provided_id, cgroup_fallback)
return user_provided_id unless user_provided_id.nil?
return nil unless cgroup_fallback

container_id = read_container_id("/proc/self/cgroup")
return container_id unless container_id.nil?

container_id = read_mount_info("/proc/self/mountinfo")
return container_id unless container_id.nil?

return nil if host_cgroup_namespace?

get_cgroup_inode("/sys/fs/cgroup", "/proc/self/cgroup")
end
end
end
Loading
Loading