diff --git a/Gemfile b/Gemfile index 28e6ad45..65690b55 100644 --- a/Gemfile +++ b/Gemfile @@ -27,4 +27,9 @@ end group :test do gem 'mocha' + if RUBY_VERSION < '3.0' + gem 'fakefs', '~> 0.13.3' + else + gem 'fakefs', '~> 3.0' + end end diff --git a/lib/datadog/statsd.rb b/lib/datadog/statsd.rb index 32f8d1a6..d666fad2 100644 --- a/lib/datadog/statsd.rb +++ b/lib/datadog/statsd.rb @@ -6,6 +6,7 @@ require_relative 'statsd/udp_connection' require_relative 'statsd/uds_connection' require_relative 'statsd/connection_cfg' +require_relative 'statsd/origin_detection' require_relative 'statsd/message_buffer' require_relative 'statsd/serialization' require_relative 'statsd/sender' @@ -82,6 +83,9 @@ def tags # @option [Float] default sample rate if not overridden # @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread # @option [Boolean] delay_serialization delays stat serialization + # @option [Boolean] origin_detection is origin detection enabled + # @option [String] container_id the container ID field, used for origin detection + # @option [String] cardinality the default tag cardinality to use def initialize( host = nil, port = nil, @@ -104,7 +108,11 @@ def initialize( delay_serialization: false, telemetry_enable: true, - telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL + telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL, + + origin_detection: true, + container_id: nil, + cardinality: nil ) unless tags.nil? || tags.is_a?(Array) || tags.is_a?(Hash) raise ArgumentError, 'tags must be an array of string tags or a Hash' @@ -112,7 +120,20 @@ def initialize( @namespace = namespace @prefix = @namespace ? "#{@namespace}.".freeze : nil - @serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags) + + origin_detection_enabled = origin_detection_enabled?(origin_detection) + container_id = get_container_id(container_id, origin_detection_enabled) + + external_data = sanitize(ENV['DD_EXTERNAL_ENV']) + + @serializer = Serialization::Serializer.new(prefix: @prefix, + container_id: container_id, + external_data: external_data, + global_tags: tags, + ) + + @cardinality = cardinality || ENV['DD_CARDINALITY'] || ENV['DATADOG_CARDINALITY'] + @sample_rate = sample_rate @delay_serialization = delay_serialization @@ -136,6 +157,10 @@ def initialize( sender_queue_size: sender_queue_size, telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil, + container_id: container_id, + external_data: external_data, + cardinality: @cardinality, + serializer: serializer ) end @@ -159,6 +184,7 @@ def self.open(*args, **kwargs) # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags # @option opts [Numeric] :by increment value, default 1 + # @option opts [String] :cardinality The tag cardinality to use # @see #count def increment(stat, opts = EMPTY_OPTIONS) opts = { sample_rate: opts } if opts.is_a?(Numeric) @@ -174,6 +200,7 @@ def increment(stat, opts = EMPTY_OPTIONS) # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags # @option opts [Numeric] :by decrement value, default 1 + # @option opts [String] :cardinality The tag cardinality to use # @see #count def decrement(stat, opts = EMPTY_OPTIONS) opts = { sample_rate: opts } if opts.is_a?(Numeric) @@ -189,6 +216,7 @@ def decrement(stat, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use def count(stat, count, opts = EMPTY_OPTIONS) opts = { sample_rate: opts } if opts.is_a?(Numeric) send_stats(stat, count, COUNTER_TYPE, opts) @@ -206,6 +234,7 @@ def count(stat, count, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use # @example Report the current user count: # $statsd.gauge('user.count', User.count) def gauge(stat, value, opts = EMPTY_OPTIONS) @@ -221,6 +250,7 @@ def gauge(stat, value, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use # @example Report the current user count: # $statsd.histogram('user.count', User.count) def histogram(stat, value, opts = EMPTY_OPTIONS) @@ -235,6 +265,7 @@ def histogram(stat, value, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use # @example Report the current user count: # $statsd.distribution('user.count', User.count) def distribution(stat, value, opts = EMPTY_OPTIONS) @@ -251,6 +282,7 @@ def distribution(stat, value, opts = EMPTY_OPTIONS) # @param [Hash] opts the options to create the metric with # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use # @example Report the time (in ms) taken to activate an account # $statsd.distribution_time('account.activate') { @account.activate! } def distribution_time(stat, opts = EMPTY_OPTIONS) @@ -272,6 +304,7 @@ def distribution_time(stat, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use def timing(stat, ms, opts = EMPTY_OPTIONS) opts = { sample_rate: opts } if opts.is_a?(Numeric) send_stats(stat, ms, TIMING_TYPE, opts) @@ -287,6 +320,7 @@ def timing(stat, ms, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use # @yield The operation to be timed # @see #timing # @example Report the time (in ms) taken to activate an account @@ -307,6 +341,7 @@ def time(stat, opts = EMPTY_OPTIONS) # @option opts [Numeric] :sample_rate sample rate, 1 for always # @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling. # @option opts [Array] :tags An array of tags + # @option opts [String] :cardinality The tag cardinality to use # @example Record a unique visitory by id: # $statsd.set('visitors.uniques', User.id) def set(stat, value, opts = EMPTY_OPTIONS) @@ -348,6 +383,7 @@ def service_check(name, status, opts = EMPTY_OPTIONS) # @option opts [String, nil] :alert_type ('info') Can be "error", "warning", "info" or "success". # @option opts [Boolean, false] :truncate_if_too_long (false) Truncate the event if it is too long # @option opts [Array] :tags tags to be added to every metric + # @option opts [String] :cardinality The tag cardinality to use # @example Report an awful event: # $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent']) def event(title, text, opts = EMPTY_OPTIONS) @@ -427,17 +463,43 @@ def send_stats(stat, delta, type, opts = EMPTY_OPTIONS) telemetry.sent(metrics: 1) if telemetry sample_rate = opts[:sample_rate] || @sample_rate || 1 + cardinality = opts[:cardinality] || @cardinality if sample_rate == 1 || opts[:pre_sampled] || rand <= sample_rate full_stat = if @delay_serialization - [stat, delta, type, opts[:tags], sample_rate] + [stat, delta, type, opts[:tags], sample_rate, cardinality] else - serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate) + serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate, cardinality: cardinality) end forwarder.send_message(full_stat) end end + + def origin_detection_enabled?(origin_detection) + if !origin_detection.nil? && !origin_detection + return false + end + + if ENV['DD_ORIGIN_DETECTION_ENABLED'] + return ![ + '0', + 'f', + 'false' + ].include?( + ENV['DD_ORIGIN_DETECTION_ENABLED'].downcase + ) + end + + return true + end + + # Sanitize the DD_EXTERNAL_ENV input to ensure it doesn't contain invalid characters + # that may break the protocol. + # Removing any non-printable characters and `|`. + def sanitize(external_data) + external_data.gsub(/[^[:print:]]|`\|/, '') unless external_data.nil? + end end end diff --git a/lib/datadog/statsd/forwarder.rb b/lib/datadog/statsd/forwarder.rb index 4e5f80e5..5cadeaac 100644 --- a/lib/datadog/statsd/forwarder.rb +++ b/lib/datadog/statsd/forwarder.rb @@ -22,6 +22,9 @@ def initialize( single_thread: false, logger: nil, + container_id: nil, + external_data: nil, + cardinality: nil, serializer: ) @@ -29,6 +32,9 @@ def initialize( @telemetry = if telemetry_flush_interval Telemetry.new(telemetry_flush_interval, + container_id, + external_data, + cardinality, global_tags: global_tags, transport_type: @transport_type ) diff --git a/lib/datadog/statsd/message_buffer.rb b/lib/datadog/statsd/message_buffer.rb index 2ea01d25..fc3bb7f7 100644 --- a/lib/datadog/statsd/message_buffer.rb +++ b/lib/datadog/statsd/message_buffer.rb @@ -28,8 +28,8 @@ def add(message) # Serializes the message if it hasn't been already. Part of the # delay_serialization feature. if message.is_a?(Array) - stat, delta, type, tags, sample_rate = message - message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate) + stat, delta, type, tags, sample_rate, cardinality = message + message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate, cardinality: cardinality) end message_size = message.bytesize diff --git a/lib/datadog/statsd/origin_detection.rb b/lib/datadog/statsd/origin_detection.rb new file mode 100644 index 00000000..93d58d42 --- /dev/null +++ b/lib/datadog/statsd/origin_detection.rb @@ -0,0 +1,155 @@ +# frozen_string_literal: true +module Datadog + class Statsd + private + + CGROUPV1BASECONTROLLER = "memory" + HOSTCGROUPNAMESPACEINODE = 0xEFFFFFFB + + def host_cgroup_namespace? + stat = File.stat("/proc/self/ns/cgroup") rescue nil + return false unless stat + stat.ino == HOSTCGROUPNAMESPACEINODE + end + + def parse_cgroup_node_path(lines) + res = {} + lines.split("\n").each do |line| + tokens = line.split(':') + next unless tokens.length == 3 + + controller = tokens[1] + path = tokens[2] + + if controller == CGROUPV1BASECONTROLLER || controller == '' + res[controller] = path + end + end + + res + end + + def get_cgroup_inode(cgroup_mount_path, proc_self_cgroup_path) + content = File.read(proc_self_cgroup_path) rescue nil + return nil unless content + + controllers = parse_cgroup_node_path(content) + + [CGROUPV1BASECONTROLLER, ''].each do |controller| + next unless controllers[controller] + + segments = [ + cgroup_mount_path.chomp('/'), + controller.strip, + controllers[controller].sub(/^\//, '') + ] + path = segments.reject(&:empty?).join("/") + inode = inode_for_path(path) + return inode unless inode.nil? + end + + nil + end + + def inode_for_path(path) + stat = File.stat(path) rescue nil + return nil unless stat + "in-#{stat.ino}" + end + + def parse_container_id(handle) + exp_line = /^\d+:[^:]*:(.+)$/ + uuid = /[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}/ + container = /[0-9a-f]{64}/ + task = /[0-9a-f]{32}-\d+/ + exp_container_id = /(#{uuid}|#{container}|#{task})(?:\.scope)?$/ + + handle.each_line do |line| + match = line.match(exp_line) + next unless match && match[1] + id_match = match[1].match(exp_container_id) + + return id_match[1] if id_match && id_match[1] + end + + nil + end + + def read_container_id(fpath) + handle = File.open(fpath, 'r') rescue nil + return nil unless handle + + id = parse_container_id(handle) + handle.close + id + end + + # Extracts the final container info from a line in mount info + def extract_container_info(line) + parts = line.strip.split("/") + return nil unless parts.last == "hostname" + + # Expected structure: [..., , , ..., "hostname"] + container_id = nil + group = nil + + parts.each_with_index do |part, idx| + # Match the container id and include the section prior to it. + if part.length == 64 && !!(part =~ /\A[0-9a-f]{64}\z/) + group = parts[idx - 1] if idx >= 1 + container_id = part + elsif part.length > 32 && !!(part =~ /\A[0-9a-f]{32}-\d+\z/) + group = parts[idx - 1] if idx >= 1 + container_id = part + elsif !!(part =~ /\A[0-9a-f]{8}(-[0-9a-f]{4}){4}\z/) + group = parts[idx - 1] if idx >= 1 + container_id = part + end + end + + return container_id unless group == "sandboxes" + end + + # Parse /proc/self/mountinfo to extract the container id. + # Often container runtimes embed the container id in the mount paths. + # We parse the mount with a final `hostname` component, which is part of + # the containers `etc/hostname` bind mount. + def parse_mount_info(handle) + handle.each_line do |line| + split = line.split(" ") + mnt1 = split[3] + mnt2 = split[4] + [mnt1, mnt2].each do |line| + container_id = extract_container_info(line) + return container_id unless container_id.nil? + end + end + + nil + end + + def read_mount_info(path) + handle = File.open(path, 'r') rescue nil + return nil unless handle + + info = parse_mount_info(handle) + handle.close + info + end + + def get_container_id(user_provided_id, cgroup_fallback) + return user_provided_id unless user_provided_id.nil? + return nil unless cgroup_fallback + + container_id = read_container_id("/proc/self/cgroup") + return container_id unless container_id.nil? + + container_id = read_mount_info("/proc/self/mountinfo") + return container_id unless container_id.nil? + + return nil if host_cgroup_namespace? + + get_cgroup_inode("/sys/fs/cgroup", "/proc/self/cgroup") + end + end +end diff --git a/lib/datadog/statsd/serialization.rb b/lib/datadog/statsd/serialization.rb index b0717fcc..53de39e1 100644 --- a/lib/datadog/statsd/serialization.rb +++ b/lib/datadog/statsd/serialization.rb @@ -10,6 +10,7 @@ module Serialization require_relative 'serialization/tag_serializer' require_relative 'serialization/service_check_serializer' require_relative 'serialization/event_serializer' +require_relative 'serialization/field_serializer' require_relative 'serialization/stat_serializer' require_relative 'serialization/serializer' diff --git a/lib/datadog/statsd/serialization/event_serializer.rb b/lib/datadog/statsd/serialization/event_serializer.rb index 960589e2..c0611a93 100644 --- a/lib/datadog/statsd/serialization/event_serializer.rb +++ b/lib/datadog/statsd/serialization/event_serializer.rb @@ -13,8 +13,9 @@ class EventSerializer alert_type: 't:', }.freeze - def initialize(global_tags: []) + def initialize(container_id, external_data, global_tags: []) @tag_serializer = TagSerializer.new(global_tags) + @field_serializer = FieldSerializer.new(container_id, external_data) end def format(title, text, options = EMPTY_OPTIONS) @@ -47,6 +48,10 @@ def format(title, text, options = EMPTY_OPTIONS) event << tags end + if fields = field_serializer.format(options[:cardinality]) + event << fields + end + if event.bytesize > MAX_EVENT_SIZE if options[:truncate_if_too_long] event.slice!(MAX_EVENT_SIZE..event.length) @@ -59,6 +64,7 @@ def format(title, text, options = EMPTY_OPTIONS) protected attr_reader :tag_serializer + attr_reader :field_serializer def escape(text) text.delete('|').tap do |t| diff --git a/lib/datadog/statsd/serialization/field_serializer.rb b/lib/datadog/statsd/serialization/field_serializer.rb new file mode 100644 index 00000000..f37ba42b --- /dev/null +++ b/lib/datadog/statsd/serialization/field_serializer.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module Datadog + class Statsd + module Serialization + class FieldSerializer + VALID_CARDINALITY = [:none, :low, :orchestrator, :high] + + def initialize(container_id, external_data) + @container_id = container_id + @external_data = external_data + end + + def format(cardinality) + if @container_id.nil? && @external_data.nil? && cardinality.nil? + return "" + end + + field = String.new + field << "|c:#{@container_id}" unless @container_id.nil? + field << "|e:#{@external_data}" unless @external_data.nil? + + unless cardinality.nil? + unless VALID_CARDINALITY.include?(cardinality.to_sym) + raise ArgumentError, "Invalid cardinality #{cardinality}. Valid options are #{VALID_CARDINALITY.join(', ')}." + end + + field << "|card:#{cardinality}" + end + + field + end + end + end + end +end diff --git a/lib/datadog/statsd/serialization/serializer.rb b/lib/datadog/statsd/serialization/serializer.rb index e7ed23df..f4bdcfed 100644 --- a/lib/datadog/statsd/serialization/serializer.rb +++ b/lib/datadog/statsd/serialization/serializer.rb @@ -6,15 +6,15 @@ module Datadog class Statsd module Serialization class Serializer - def initialize(prefix: nil, global_tags: []) - @stat_serializer = StatSerializer.new(prefix, global_tags: global_tags) - @service_check_serializer = ServiceCheckSerializer.new(global_tags: global_tags) - @event_serializer = EventSerializer.new(global_tags: global_tags) + def initialize(prefix: nil, container_id: nil, external_data: nil, global_tags: []) + @stat_serializer = StatSerializer.new(prefix, container_id, external_data, global_tags: global_tags) + @service_check_serializer = ServiceCheckSerializer.new(container_id, external_data, global_tags: global_tags) + @event_serializer = EventSerializer.new(container_id, external_data, global_tags: global_tags) end # using *args would make new allocations - def to_stat(name, delta, type, tags: [], sample_rate: 1) - stat_serializer.format(name, delta, type, tags: tags, sample_rate: sample_rate) + def to_stat(name, delta, type, tags: [], sample_rate: 1, cardinality: nil) + stat_serializer.format(name, delta, type, tags: tags, sample_rate: sample_rate, cardinality: cardinality) end # using *args would make new allocations diff --git a/lib/datadog/statsd/serialization/service_check_serializer.rb b/lib/datadog/statsd/serialization/service_check_serializer.rb index 828098c4..8248f86f 100644 --- a/lib/datadog/statsd/serialization/service_check_serializer.rb +++ b/lib/datadog/statsd/serialization/service_check_serializer.rb @@ -9,8 +9,9 @@ class ServiceCheckSerializer hostname: 'h:', }.freeze - def initialize(global_tags: []) + def initialize(container_id, external_data, global_tags: []) @tag_serializer = TagSerializer.new(global_tags) + @field_serializer = FieldSerializer.new(container_id, external_data) end def format(name, status, options = EMPTY_OPTIONS) @@ -42,11 +43,16 @@ def format(name, status, options = EMPTY_OPTIONS) service_check << '|#' service_check << tags end + + if fields = field_serializer.format(options[:cardinality]) + service_check << fields + end end end protected attr_reader :tag_serializer + attr_reader :field_serializer def escape_message(message) message.delete('|').tap do |m| diff --git a/lib/datadog/statsd/serialization/stat_serializer.rb b/lib/datadog/statsd/serialization/stat_serializer.rb index c8e9bcb3..1e37b7e2 100644 --- a/lib/datadog/statsd/serialization/stat_serializer.rb +++ b/lib/datadog/statsd/serialization/stat_serializer.rb @@ -4,26 +4,28 @@ module Datadog class Statsd module Serialization class StatSerializer - def initialize(prefix, global_tags: []) + def initialize(prefix, container_id, external_data, global_tags: []) @prefix = prefix @prefix_str = prefix.to_s @tag_serializer = TagSerializer.new(global_tags) + @field_serializer = FieldSerializer.new(container_id, external_data) end - def format(metric_name, delta, type, tags: [], sample_rate: 1) + def format(metric_name, delta, type, tags: [], sample_rate: 1, cardinality: nil) metric_name = formatted_metric_name(metric_name) + fields = field_serializer.format(cardinality) if sample_rate != 1 if tags_list = tag_serializer.format(tags) - "#{@prefix_str}#{metric_name}:#{delta}|#{type}|@#{sample_rate}|##{tags_list}" + "#{@prefix_str}#{metric_name}:#{delta}|#{type}|@#{sample_rate}|##{tags_list}#{fields}" else - "#{@prefix_str}#{metric_name}:#{delta}|#{type}|@#{sample_rate}" + "#{@prefix_str}#{metric_name}:#{delta}|#{type}|@#{sample_rate}#{fields}" end else if tags_list = tag_serializer.format(tags) - "#{@prefix_str}#{metric_name}:#{delta}|#{type}|##{tags_list}" + "#{@prefix_str}#{metric_name}:#{delta}|#{type}|##{tags_list}#{fields}" else - "#{@prefix_str}#{metric_name}:#{delta}|#{type}" + "#{@prefix_str}#{metric_name}:#{delta}|#{type}#{fields}" end end end @@ -36,6 +38,7 @@ def global_tags attr_reader :prefix attr_reader :tag_serializer + attr_reader :field_serializer if RUBY_VERSION < '3' def metric_name_to_string(metric_name) diff --git a/lib/datadog/statsd/telemetry.rb b/lib/datadog/statsd/telemetry.rb index 89b19357..69a13418 100644 --- a/lib/datadog/statsd/telemetry.rb +++ b/lib/datadog/statsd/telemetry.rb @@ -19,7 +19,7 @@ class Telemetry # Rough estimation of maximum telemetry message size without tags MAX_TELEMETRY_MESSAGE_SIZE_WT_TAGS = 50 # bytes - def initialize(flush_interval, global_tags: [], transport_type: :udp) + def initialize(flush_interval, container_id, external_data, cardinality, global_tags: [], transport_type: :udp) @flush_interval = flush_interval @global_tags = global_tags @transport_type = transport_type @@ -32,6 +32,11 @@ def initialize(flush_interval, global_tags: [], transport_type: :udp) client_version: VERSION, client_transport: transport_type, ).format(global_tags) + + @serialized_fields = Serialization::FieldSerializer.new( + container_id, + external_data + ).format(cardinality) end def would_fit_in?(max_buffer_payload_size) @@ -98,9 +103,10 @@ def flush private attr_reader :serialized_tags + attr_reader :serialized_fields def pattern - @pattern ||= "datadog.dogstatsd.client.%s:%d|#{COUNTER_TYPE}|##{serialized_tags}" + @pattern ||= "datadog.dogstatsd.client.%s:%d|#{COUNTER_TYPE}|##{serialized_tags}#{serialized_fields}" end if Kernel.const_defined?('Process') && Process.respond_to?(:clock_gettime) diff --git a/spec/integrations/allocation_spec.rb b/spec/integrations/allocation_spec.rb index 8751e6e7..3ddf90e5 100644 --- a/spec/integrations/allocation_spec.rb +++ b/spec/integrations/allocation_spec.rb @@ -16,6 +16,7 @@ tags: tags, logger: logger, telemetry_flush_interval: -1, + origin_detection: false, ) end @@ -67,6 +68,7 @@ tags: tags, logger: logger, telemetry_enable: false, + origin_detection: false, ) end @@ -99,7 +101,7 @@ elsif RUBY_VERSION < '2.6.0' 26 else - 25 + 26 end end @@ -146,6 +148,7 @@ tags: tags, logger: logger, telemetry_enable: false, + origin_detection: false, ) end @@ -200,13 +203,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 22 + 23 elsif RUBY_VERSION < '2.5.0' - 21 + 22 elsif RUBY_VERSION < '2.6.0' - 20 + 21 else - 19 + 20 end end @@ -225,18 +228,19 @@ tags: tags, logger: logger, telemetry_enable: false, + origin_detection: false, ) end let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 14 + 15 elsif RUBY_VERSION < '2.5.0' - 13 + 14 elsif RUBY_VERSION < '2.6.0' - 12 + 13 else - 11 + 12 end end @@ -249,6 +253,12 @@ end context 'with tags' do + before do + # warmup + subject.event('foobar', 'happening', tags: { something: 'a value' }) + subject.flush(sync: true) + end + let(:expected_allocations) do if RUBY_VERSION < '2.4.0' 31 @@ -279,13 +289,13 @@ let(:expected_allocations) do if RUBY_VERSION < '2.4.0' - 18 + 19 elsif RUBY_VERSION < '2.5.0' - 17 + 18 elsif RUBY_VERSION < '2.6.0' - 16 + 17 else - 15 + 16 end end @@ -304,6 +314,7 @@ tags: tags, logger: logger, telemetry_enable: false, + origin_detection: false, ) end @@ -328,6 +339,12 @@ end context 'with tags' do + before do + # warmup + subject.service_check('foobar', 'happening', tags: { something: 'a value' }) + subject.flush(sync: true) + end + let(:expected_allocations) do if RUBY_VERSION < '2.4.0' 27 diff --git a/spec/integrations/buffering_spec.rb b/spec/integrations/buffering_spec.rb index 2b767010..a5d8a3aa 100644 --- a/spec/integrations/buffering_spec.rb +++ b/spec/integrations/buffering_spec.rb @@ -9,6 +9,7 @@ telemetry_enable: false, single_thread: single_thread, buffer_max_pool_size: buffer_max_pool_size, + origin_detection: false, ) end let(:socket) { FakeUDPSocket.new(copy_message: true) } @@ -137,6 +138,7 @@ telemetry_flush_interval: 60, buffer_max_pool_size: buffer_max_pool_size, single_thread: single_thread, + origin_detection: false, ) end diff --git a/spec/integrations/delay_serialization_spec.rb b/spec/integrations/delay_serialization_spec.rb index 18b2cfc3..915061c6 100644 --- a/spec/integrations/delay_serialization_spec.rb +++ b/spec/integrations/delay_serialization_spec.rb @@ -6,14 +6,17 @@ # expects an Array is passed and not a String expect(buffer) .to receive(:add) - .with(["boo", 1, "c", nil, 1]) + .with(["boo", 1, "c", nil, 1, nil]) # and then expect no more adds! expect(buffer).to receive(:add).exactly(0).times expect(buffer) .to receive(:flush) allow(Datadog::Statsd::MessageBuffer).to receive(:new).and_return(buffer) - dogstats = Datadog::Statsd.new("localhost", 1234, delay_serialization: true) + dogstats = Datadog::Statsd.new("localhost", 1234, + delay_serialization: true, + origin_detection: false, + ) dogstats.increment("boo") dogstats.flush(sync: true) @@ -22,7 +25,10 @@ it "serializes messages normally" do socket = FakeUDPSocket.new(copy_message: true) allow(UDPSocket).to receive(:new).and_return(socket) - dogstats = Datadog::Statsd.new("localhost", 1234, delay_serialization: true) + dogstats = Datadog::Statsd.new("localhost", 1234, + delay_serialization: true, + origin_detection: false, + ) dogstats.increment("boo") dogstats.increment("oob", tags: {tag1: "val1"}) @@ -35,4 +41,25 @@ "pow:1|c|@2|#tag1:val1" ].join("\n")) end + + it "serializes container id normally" do + socket = FakeUDPSocket.new(copy_message: true) + allow(UDPSocket).to receive(:new).and_return(socket) + dogstats = Datadog::Statsd.new("localhost", 1234, + delay_serialization: true, + origin_detection: false, + container_id: "banana", + ) + + dogstats.increment("boo") + dogstats.increment("oob", tags: {tag1: "val1"}) + dogstats.increment("pow", tags: {tag1: "val1"}, sample_rate: 2) + dogstats.flush(sync: true) + + expect(socket.recv[0]).to eq([ + "boo:1|c|c:banana", + "oob:1|c|#tag1:val1|c:banana", + "pow:1|c|@2|#tag1:val1|c:banana" + ].join("\n")) + end end diff --git a/spec/integrations/telemetry_spec.rb b/spec/integrations/telemetry_spec.rb index 5703a66a..24166775 100644 --- a/spec/integrations/telemetry_spec.rb +++ b/spec/integrations/telemetry_spec.rb @@ -7,7 +7,8 @@ subject do Datadog::Statsd.new('localhost', 1234, - telemetry_flush_interval: -1 + telemetry_flush_interval: -1, + origin_detection: false, ) end @@ -26,6 +27,7 @@ subject do Datadog::Statsd.new('localhost', 1234, telemetry_enable: false, + origin_detection: false, ) end @@ -59,6 +61,7 @@ subject do Datadog::Statsd.new('localhost', 1234, telemetry_flush_interval: 2, + origin_detection: false, ) end diff --git a/spec/statsd/origin_detection_spec.rb b/spec/statsd/origin_detection_spec.rb new file mode 100644 index 00000000..b840b153 --- /dev/null +++ b/spec/statsd/origin_detection_spec.rb @@ -0,0 +1,543 @@ +require 'spec_helper' +require 'fakefs/safe' + +# To ensure that we can against Ruby v2 we need to use quite an old +# version of fakefs that doesn't provide the ino function. +# Monkey patch a version here. +if RUBY_VERSION < '3.0' + class FakeFS::File::Stat + def ino + 42 + end + end +end + +describe Datadog::Statsd do + subject do + described_class.new() + end + + it 'writes a file in the fake filesystem' do + read = nil + FakeFS.with_fresh do + FileUtils.mkdir_p('/tmp') + File.write('/tmp/test.txt', 'hello') + read = File.read('/tmp/test.txt') + end + + expect(read).to eq('hello') + end + + it 'extracts the container ID from the cgroup file' do + container_id = nil + FakeFS.with_fresh do + FileUtils.mkdir_p('/proc/self') + File.write('/proc/self/cgroup', < '/', + } + }, + { + name: 'hybrid', + content: "other_line\n0::/\n1:memory:/docker/abc123", + expected: { + '' => '/', + 'memory' => '/docker/abc123', + } + }, + { + name: 'with other controllers', + content: "other_line\n12:pids:/docker/abc123\n11:hugetlb:/docker/abc123\n10:net_cls,net_prio:/docker/abc123\n0::/docker/abc123\n", + expected: { + '' => '/docker/abc123' + } + }, + { + name: 'no controller', + content: 'empty', + expected: {} + } + ] + + test_cases.each do |test_case| + it "parses correctly for: #{test_case[:name]}" do + result = subject.send(:parse_cgroup_node_path, test_case[:content]) + expect(result).to eq(test_case[:expected]) + end + end + end + + describe '#get_cgroup_inode' do + test_cases = [ + { + description: 'matching entry in /proc/self/cgroup and /proc/mounts - cgroup2 only', + cgroup_node_dir: 'system.slice/docker-abcdef0123456789abcdef0123456789.scope', + proc_self_cgroup_content: "0::/system.slice/docker-abcdef0123456789abcdef0123456789.scope\n", + controller: nil, + expected_result: 'in-{inode}' + }, + { + description: 'matching entry in /proc/self/cgroup and /proc/mounts - cgroup/hybrid only', + cgroup_node_dir: 'system.slice/docker-abcdef0123456789abcdef0123456789.scope', + proc_self_cgroup_content: <