From 27e1ec9d9d7fc649c55def18e0f4baa1ddabbfa1 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Thu, 26 Mar 2026 16:35:41 -0400 Subject: [PATCH 01/23] Add bench/ --- bench/event_handler_bench.exs | 46 +++++++++++++++++++++++++++++++++++ mix.exs | 1 + mix.lock | 3 +++ 3 files changed, 50 insertions(+) create mode 100644 bench/event_handler_bench.exs diff --git a/bench/event_handler_bench.exs b/bench/event_handler_bench.exs new file mode 100644 index 0000000..9ebb930 --- /dev/null +++ b/bench/event_handler_bench.exs @@ -0,0 +1,46 @@ +import Telemetry.Metrics + +metrics = [ + counter("bench.event.count", tags: [:tag_a]), + counter("bench.event.count2", tags: [:tag_a, :tag_b]), + sum("bench.event.payload_size", tags: [:tag_a]), + last_value("bench.event.queue_depth", tags: [:tag_a]), + distribution("bench.event.duration", + tags: [:tag_a], + reporter_options: [max_value: 1_000_000, bucket_variability: 0.3] + ), + distribution("bench.event.latency", + tags: [:tag_a, :tag_b], + reporter_options: [max_value: 65536, bucket_variability: 0.3] + ) +] + +{:ok, _pid} = Peep.start_link(name: :bench, metrics: metrics, storage: :striped) + +measurements = %{ + count: 1, + payload_size: 512, + queue_depth: 42, + duration: 1500, + latency: 350 +} + +metadata = %{tag_a: "exchange_1", tag_b: "channel_2"} +event = [:bench, :event] + +parallel = String.to_integer(System.get_env("BENCH_PARALLEL", "1")) +IO.puts("Running with parallel: #{parallel}") + +Benchee.run( + %{ + "telemetry_execute" => fn -> + :telemetry.execute(event, measurements, metadata) + end + }, + warmup: 2, + time: 5, + memory_time: 2, + parallel: parallel +) + +GenServer.stop(:bench) diff --git a/mix.exs b/mix.exs index 9d26128..2983a11 100644 --- a/mix.exs +++ b/mix.exs @@ -39,6 +39,7 @@ defmodule Peep.MixProject do {:telemetry_metrics, "~> 1.0"}, # testing, docs, & linting {:bandit, "~> 1.6", only: [:test], runtime: false}, + {:benchee, "~> 1.5", only: [:dev], runtime: false}, {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}, {:inch_ex, "~> 2.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 6cce936..ff925cc 100644 --- a/mix.lock +++ b/mix.lock @@ -1,10 +1,12 @@ %{ "bandit": {:hex, :bandit, "1.7.0", "d1564f30553c97d3e25f9623144bb8df11f3787a26733f00b21699a128105c0c", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3e2f7a98c7a11f48d9d8c037f7177cd39778e74d55c7af06fe6227c742a8168a"}, + "benchee": {:hex, :benchee, "1.5.0", "4d812c31d54b0ec0167e91278e7de3f596324a78a096fd3d0bea68bb0c513b10", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.1", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "5b075393aea81b8ae74eadd1c28b1d87e8a63696c649d8293db7c4df3eb67535"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cowboy": {:hex, :cowboy, "2.13.0", "09d770dd5f6a22cc60c071f432cd7cb87776164527f205c5a6b0f24ff6b38990", [:make, :rebar3], [{:cowlib, ">= 2.14.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "e724d3a70995025d654c1992c7b11dbfea95205c047d86ff9bf1cda92ddc5614"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.15.0", "3c97a318a933962d1c12b96ab7c1d728267d2c523c25a5b57b0f93392b6e9e25", [:make, :rebar3], [], "hexpm", "4f00c879a64b4fe7c8fcb42a4281925e9ffdb928820b03c3ad325a617e857532"}, "credo": {:hex, :credo, "1.7.12", "9e3c20463de4b5f3f23721527fcaf16722ec815e70ff6c60b86412c695d426c1", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8493d45c656c5427d9c729235b99d498bd133421f3e0a683e5c1b561471291e5"}, + "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, @@ -23,6 +25,7 @@ "plug_cowboy": {:hex, :plug_cowboy, "2.7.4", "729c752d17cf364e2b8da5bdb34fb5804f56251e88bb602aff48ae0bd8673d11", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "9b85632bd7012615bae0a5d70084deb1b25d2bcbb32cab82d1e9a1e023168aa3"}, "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "ranch": {:hex, :ranch, "2.2.0", "25528f82bc8d7c6152c57666ca99ec716510fe0925cb188172f41ce93117b1b0", [:make, :rebar3], [], "hexpm", "fa0b99a1780c80218a4197a59ea8d3bdae32fbff7e88527d7d8a4787eff4f8e7"}, + "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "thousand_island": {:hex, :thousand_island, "1.3.14", "ad45ebed2577b5437582bcc79c5eccd1e2a8c326abf6a3464ab6c06e2055a34a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d0d24a929d31cdd1d7903a4fe7f2409afeedff092d277be604966cd6aa4307ef"}, From b5d0b613e675552f42d7ffbaed5b838af9798773 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Thu, 26 Mar 2026 16:26:39 -0400 Subject: [PATCH 02/23] Depend on :telemetry ~> 1.4 --- mix.exs | 2 +- mix.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 2983a11..af9f01c 100644 --- a/mix.exs +++ b/mix.exs @@ -35,7 +35,7 @@ defmodule Peep.MixProject do defp deps do [ {:nimble_options, "~> 1.1"}, - {:telemetry, "~> 1.0"}, + {:telemetry, "~> 1.4"}, {:telemetry_metrics, "~> 1.0"}, # testing, docs, & linting {:bandit, "~> 1.6", only: [:test], runtime: false}, diff --git a/mix.lock b/mix.lock index ff925cc..6194e97 100644 --- a/mix.lock +++ b/mix.lock @@ -26,7 +26,7 @@ "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "ranch": {:hex, :ranch, "2.2.0", "25528f82bc8d7c6152c57666ca99ec716510fe0925cb188172f41ce93117b1b0", [:make, :rebar3], [], "hexpm", "fa0b99a1780c80218a4197a59ea8d3bdae32fbff7e88527d7d8a4787eff4f8e7"}, "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, - "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"}, "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "thousand_island": {:hex, :thousand_island, "1.3.14", "ad45ebed2577b5437582bcc79c5eccd1e2a8c326abf6a3464ab6c06e2055a34a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d0d24a929d31cdd1d7903a4fe7f2409afeedff092d277be604966cd6aa4307ef"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, From 6c733682b34808faf8994b1e0a13bd825aa87f98 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Thu, 26 Mar 2026 16:32:12 -0400 Subject: [PATCH 03/23] Store compound term in handler config, use :telemetry.persist/0 --- lib/peep.ex | 1 + lib/peep/event_handler.ex | 16 +++++++--------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index 67126a4..5ab9f06 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -259,6 +259,7 @@ defmodule Peep do |> Peep.Persistent.store() handler_ids = EventHandler.attach(name) + :telemetry.persist() statsd_opts = options.statsd statsd_flush_interval = statsd_opts[:flush_interval_ms] diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index 36c9fdc..8864303 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -6,9 +6,12 @@ defmodule Peep.EventHandler do import Peep.Persistent, only: [persistent: 1] def attach(name) do - persistent(events_to_metrics: metrics_by_event) = Peep.Persistent.fetch(name) + persistent( + events_to_metrics: metrics_by_event, + storage: {storage_mod, storage} + ) = Peep.Persistent.fetch(name) - for {event_name, _metrics} <- metrics_by_event do + for {event_name, metrics} <- metrics_by_event do handler_id = handler_id(event_name, name) :ok = @@ -16,7 +19,7 @@ defmodule Peep.EventHandler do handler_id, event_name, &__MODULE__.handle_event/4, - name + {name, metrics, storage_mod, storage} ) handler_id @@ -32,12 +35,7 @@ defmodule Peep.EventHandler do {__MODULE__, peep_name, event_name} end - def handle_event(event, measurements, metadata, name) do - persistent( - events_to_metrics: %{^event => metrics}, - storage: {storage_mod, storage} - ) = Peep.Persistent.fetch(name) - + def handle_event(_event, measurements, metadata, {_name, metrics, storage_mod, storage}) do store_metrics(metrics, measurements, metadata, storage_mod, storage) end From 480c124a075bcb87533d125866e31afa79e6fd8d Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Thu, 26 Mar 2026 18:19:47 -0400 Subject: [PATCH 04/23] Deduplicate tag funs --- lib/peep/event_handler.ex | 83 ++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index 8864303..7db419e 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -1,7 +1,7 @@ defmodule Peep.EventHandler do @moduledoc false - @compile {:inline, keep?: 3, meta: 3, fetch_measurement: 3} + @compile {:inline, keep?: 3, fetch_measurement: 3} import Peep.Persistent, only: [persistent: 1] @@ -14,12 +14,14 @@ defmodule Peep.EventHandler do for {event_name, metrics} <- metrics_by_event do handler_id = handler_id(event_name, name) + {tag_fns, metrics_with_tag_fn_idx} = deduplicate_tag_fns(metrics) + :ok = :telemetry.attach( handler_id, event_name, &__MODULE__.handle_event/4, - {name, metrics, storage_mod, storage} + {metrics_with_tag_fn_idx, storage_mod, storage, tag_fns} ) handler_id @@ -35,17 +37,36 @@ defmodule Peep.EventHandler do {__MODULE__, peep_name, event_name} end - def handle_event(_event, measurements, metadata, {_name, metrics, storage_mod, storage}) do - store_metrics(metrics, measurements, metadata, storage_mod, storage) + def handle_event( + _event, + measurements, + metadata, + {metrics, storage_mod, storage, tag_fns} + ) do + tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, []) + store_metrics(metrics, measurements, metadata, storage_mod, storage, tag_results) + end + + defp compute_tags(_tag_fns, _metadata, size, size, acc) do + acc |> :lists.reverse() |> List.to_tuple() + end + + defp compute_tags(tag_fns, metadata, size, idx, acc) do + compute_tags(tag_fns, metadata, size, idx + 1, [elem(tag_fns, idx).(metadata) | acc]) end - defp store_metrics([], _measurements, _metadata, _mod, _data), do: :ok + defp store_metrics([], _measurements, _metadata, _mod, _data, _tag_results), do: :ok - defp store_metrics([{metric, id} | rest], measurements, metadata, mod, data) do + defp store_metrics( + [{metric, id, tag_idx} | rest], + measurements, + metadata, + mod, + data, + tag_results + ) do %{ measurement: measurement, - tag_values: tag_values, - tags: tags, keep: keep } = metric @@ -53,12 +74,14 @@ defmodule Peep.EventHandler do # credo:disable-for-next-line Credo.Check.Refactor.Nesting case fetch_measurement(measurement, measurements, metadata) do value when is_number(value) -> + meta = elem(tag_results, tag_idx) + mod.insert_metric( data, id, metric, value, - meta(metadata, tag_values, tags) + meta ) _ -> @@ -66,7 +89,7 @@ defmodule Peep.EventHandler do end end - store_metrics(rest, measurements, metadata, mod, data) + store_metrics(rest, measurements, metadata, mod, data, tag_results) end defp keep?(keep, metadata, measurement) when is_function(keep, 2), @@ -75,11 +98,6 @@ defmodule Peep.EventHandler do defp keep?(keep, metadata, _measurement) when is_function(keep, 1), do: keep.(metadata) defp keep?(_keep, _metadata, _measurement), do: true - # When selected list is empty, just return empty map - defp meta(_tags, _map, []), do: %{} - defp meta(meta, _map, tags) when is_function(tags, 1), do: tags.(meta) - defp meta(tags, map, keys), do: Map.take(map.(tags), keys) - defp fetch_measurement(%Telemetry.Metrics.Counter{}, _measurements, _metadata) do 1 end @@ -102,4 +120,39 @@ defmodule Peep.EventHandler do end end end + + defp deduplicate_tag_fns(metrics) do + {unique_metrics_tags, _} = + Enum.reduce(metrics, {%{}, 0}, fn {metric, _id}, {map, next_idx} -> + key = tag_fn_key(metric) + + case map do + %{^key => _} -> {map, next_idx} + _ -> {Map.put(map, key, next_idx), next_idx + 1} + end + end) + + tag_fns = + unique_metrics_tags + |> Enum.sort_by(fn {_key, idx} -> idx end) + |> Enum.map(fn {{tag_values, tags}, _idx} -> compile_tag_fn(tag_values, tags) end) + |> List.to_tuple() + + metrics_with_tag_fn_idx = + Enum.map(metrics, fn {metric, id} -> + key = tag_fn_key(metric) + {metric, id, Map.fetch!(unique_metrics_tags, key)} + end) + + {tag_fns, metrics_with_tag_fn_idx} + end + + defp tag_fn_key(metric), do: {metric.tag_values, metric.tags} + + defp compile_tag_fn(_tag_values, []), do: fn _metadata -> %{} end + defp compile_tag_fn(_tag_values, tags) when is_function(tags, 1), do: tags + + defp compile_tag_fn(tag_values, keys) do + fn metadata -> Map.take(tag_values.(metadata), keys) end + end end From d7b27c3eb230af23a3a99cdde328726b5ed89598 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 11:01:33 -0400 Subject: [PATCH 05/23] Optimize stored metrics --- lib/peep.ex | 7 ++-- lib/peep/event_handler.ex | 86 ++++++++++++++++++++++++++++++--------- 2 files changed, 71 insertions(+), 22 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index 5ab9f06..d204f1b 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -154,9 +154,10 @@ defmodule Peep do defp extend_with(metrics, global_tags) do Map.new(metrics, fn {metric, measurements} -> - updated = Map.new(measurements, fn {tags, val} -> - {Map.merge(global_tags, tags), val} - end) + updated = + Map.new(measurements, fn {tags, val} -> + {Map.merge(global_tags, tags), val} + end) {metric, updated} end) diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index 7db419e..0697c6b 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -16,12 +16,25 @@ defmodule Peep.EventHandler do {tag_fns, metrics_with_tag_fn_idx} = deduplicate_tag_fns(metrics) + metrics_rows = + Enum.map(metrics_with_tag_fn_idx, fn {metric, id, tag_idx} -> + %{measurement: measurement, keep: keep} = metric + + keep_val = if is_nil(keep), do: :no_keep, else: keep + + insert_fn = fn data, value, tags -> + storage_mod.insert_metric(data, id, metric, value, tags) + end + + {metric_type(metric), insert_fn, keep_val, measurement, tag_idx} + end) + :ok = :telemetry.attach( handler_id, event_name, &__MODULE__.handle_event/4, - {metrics_with_tag_fn_idx, storage_mod, storage, tag_fns} + {metrics_rows, storage_mod, storage, tag_fns} ) handler_id @@ -41,10 +54,10 @@ defmodule Peep.EventHandler do _event, measurements, metadata, - {metrics, storage_mod, storage, tag_fns} + {metrics_rows, storage_mod, storage, tag_fns} ) do tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, []) - store_metrics(metrics, measurements, metadata, storage_mod, storage, tag_results) + store_metrics(metrics_rows, measurements, metadata, storage_mod, storage, tag_results) end defp compute_tags(_tag_fns, _metadata, size, size, acc) do @@ -58,31 +71,63 @@ defmodule Peep.EventHandler do defp store_metrics([], _measurements, _metadata, _mod, _data, _tag_results), do: :ok defp store_metrics( - [{metric, id, tag_idx} | rest], + [{:counter, insert_fn, :no_keep, _measurement, tag_idx} | rest], measurements, metadata, mod, data, tag_results ) do - %{ - measurement: measurement, - keep: keep - } = metric + insert_fn.(data, 1, elem(tag_results, tag_idx)) + store_metrics(rest, measurements, metadata, mod, data, tag_results) + end - if keep?(keep, metadata, measurement) do - # credo:disable-for-next-line Credo.Check.Refactor.Nesting + defp store_metrics( + [{_type, insert_fn, :no_keep, measurement, tag_idx} | rest], + measurements, + metadata, + mod, + data, + tag_results + ) do + case fetch_measurement(measurement, measurements, metadata) do + value when is_number(value) -> + insert_fn.(data, value, elem(tag_results, tag_idx)) + + _ -> + nil + end + + store_metrics(rest, measurements, metadata, mod, data, tag_results) + end + + defp store_metrics( + [{:counter, insert_fn, keep, _measurement, tag_idx} | rest], + measurements, + metadata, + mod, + data, + tag_results + ) do + if keep?(keep, metadata, nil) do + insert_fn.(data, 1, elem(tag_results, tag_idx)) + end + + store_metrics(rest, measurements, metadata, mod, data, tag_results) + end + + defp store_metrics( + [{_type, insert_fn, keep, measurement, tag_idx} | rest], + measurements, + metadata, + mod, + data, + tag_results + ) do + if keep?(keep, metadata, nil) do case fetch_measurement(measurement, measurements, metadata) do value when is_number(value) -> - meta = elem(tag_results, tag_idx) - - mod.insert_metric( - data, - id, - metric, - value, - meta - ) + insert_fn.(data, value, elem(tag_results, tag_idx)) _ -> nil @@ -155,4 +200,7 @@ defmodule Peep.EventHandler do defp compile_tag_fn(tag_values, keys) do fn metadata -> Map.take(tag_values.(metadata), keys) end end + + defp metric_type(%Telemetry.Metrics.Counter{}), do: :counter + defp metric_type(_), do: :other end From e93642583c06f691a25bc546fa90301ec52e1154 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 11:02:22 -0400 Subject: [PATCH 06/23] Add Peep.Storage.resolve/1 --- lib/peep.ex | 4 +++- lib/peep/event_handler.ex | 3 ++- lib/peep/storage.ex | 6 ++++++ lib/peep/storage/ets.ex | 3 +++ lib/peep/storage/striped.ex | 23 ++++++++++------------- test/support/custom_storage.ex | 3 +++ 6 files changed, 27 insertions(+), 15 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index d204f1b..c078d1f 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -107,7 +107,9 @@ defmodule Peep do storage: {storage_mod, storage}, metrics_to_ids: %{^metric => id} ) -> - storage_mod.insert_metric(storage, id, metric, value, tags) + storage + |> storage_mod.resolve() + |> storage_mod.insert_metric(id, metric, value, tags) _ -> nil diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index 0697c6b..af8236a 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -56,8 +56,9 @@ defmodule Peep.EventHandler do metadata, {metrics_rows, storage_mod, storage, tag_fns} ) do + resolved = storage_mod.resolve(storage) tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, []) - store_metrics(metrics_rows, measurements, metadata, storage_mod, storage, tag_results) + store_metrics(metrics_rows, measurements, metadata, storage_mod, resolved, tag_results) end defp compute_tags(_tag_fns, _metadata, size, size, acc) do diff --git a/lib/peep/storage.ex b/lib/peep/storage.ex index af5b27e..d189ee5 100644 --- a/lib/peep/storage.ex +++ b/lib/peep/storage.ex @@ -32,4 +32,10 @@ defmodule Peep.Storage do have high cardinality. """ @callback prune_tags(Enumerable.t(%{Metrics.tag() => term()}), map()) :: :ok + + @doc """ + Resolves storage to its per-scheduler form for the current scheduler. + Called once per event to avoid redundant scheduler lookups across metrics. + """ + @callback resolve(term()) :: term() end diff --git a/lib/peep/storage/ets.ex b/lib/peep/storage/ets.ex index 64b27db..54d6941 100644 --- a/lib/peep/storage/ets.ex +++ b/lib/peep/storage/ets.ex @@ -31,6 +31,9 @@ defmodule Peep.Storage.ETS do :ets.new(__MODULE__, opts) end + @impl true + def resolve(tid), do: tid + @impl true def storage_size(tid) do %{ diff --git a/lib/peep/storage/striped.ex b/lib/peep/storage/striped.ex index 5f28b0a..976f156 100644 --- a/lib/peep/storage/striped.ex +++ b/lib/peep/storage/striped.ex @@ -45,27 +45,29 @@ defmodule Peep.Storage.Striped do end @impl true - def insert_metric(tids, id, %Metrics.Counter{}, _value, %{} = tags) do - tid = get_tid(tids) + def resolve(tids) do + scheduler_id = :erlang.system_info(:scheduler_id) + elem(tids, scheduler_id - 1) + end + + @impl true + def insert_metric(tid, id, %Metrics.Counter{}, _value, %{} = tags) do key = {id, tags} :ets.update_counter(tid, key, {2, 1}, {key, 0}) end - def insert_metric(tids, id, %Metrics.Sum{}, value, %{} = tags) do - tid = get_tid(tids) + def insert_metric(tid, id, %Metrics.Sum{}, value, %{} = tags) do key = {id, tags} :ets.update_counter(tid, key, {2, value}, {key, 0}) end - def insert_metric(tids, id, %Metrics.LastValue{}, value, %{} = tags) do - tid = get_tid(tids) + def insert_metric(tid, id, %Metrics.LastValue{}, value, %{} = tags) do now = System.monotonic_time() key = {id, tags} :ets.insert(tid, {key, {now, value}}) end - def insert_metric(tids, id, %Metrics.Distribution{} = metric, value, %{} = tags) do - tid = get_tid(tids) + def insert_metric(tid, id, %Metrics.Distribution{} = metric, value, %{} = tags) do key = {id, tags} atomics = @@ -93,11 +95,6 @@ defmodule Peep.Storage.Striped do Storage.Atomics.insert(atomics, value) end - defp get_tid(tids) do - scheduler_id = :erlang.system_info(:scheduler_id) - elem(tids, scheduler_id - 1) - end - @impl true def prune_tags(tids, patterns) do match_spec = diff --git a/test/support/custom_storage.ex b/test/support/custom_storage.ex index 3a82d7b..8ba2485 100644 --- a/test/support/custom_storage.ex +++ b/test/support/custom_storage.ex @@ -13,6 +13,9 @@ defmodule CustomStorage do alias Telemetry.Metrics alias Peep.Storage + @impl true + def resolve(agents), do: agents + @impl true @spec new(non_neg_integer) :: tuple def new(n_agents) do From a76feb9bea0f3dd13a8afb927c7ecec598673cf0 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 11:09:12 -0400 Subject: [PATCH 07/23] Move Peep.insert_metric/4 to test helpers --- lib/peep.ex | 17 ----------------- test/prometheus_test.exs | 36 ++++++++++++++++++------------------ test/shared/storage_test.exs | 24 ++++++++++++------------ test/shared/test_helpers.ex | 18 ++++++++++++++++++ test/statsd_cache_test.exs | 22 +++++++++++----------- test/statsd_test.exs | 28 ++++++++++++++-------------- 6 files changed, 73 insertions(+), 72 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index c078d1f..395667f 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -101,23 +101,6 @@ defmodule Peep do end end - def insert_metric(name, metric, value, tags) when is_number(value) do - case Peep.Persistent.fetch(name) do - Peep.Persistent.persistent( - storage: {storage_mod, storage}, - metrics_to_ids: %{^metric => id} - ) -> - storage - |> storage_mod.resolve() - |> storage_mod.insert_metric(id, metric, value, tags) - - _ -> - nil - end - end - - def insert_metric(_name, _metric, _value, _tags), do: nil - @doc """ Returns measurements about the size of a running Peep's storage, in number of ETS elements and in bytes of memory. diff --git a/test/prometheus_test.exs b/test/prometheus_test.exs index 97e3f38..71fa313 100644 --- a/test/prometheus_test.exs +++ b/test/prometheus_test.exs @@ -28,7 +28,7 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, counter, 1, %{baz: "quux"}) + Peep.Test.insert_metric(name, counter, 1, %{baz: "quux"}) expected = [ "# HELP prometheus_test_counter a counter", @@ -52,7 +52,7 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, counter, 1, %{foo: 2137, baz: "quux"}) + Peep.Test.insert_metric(name, counter, 1, %{foo: 2137, baz: "quux"}) expected = [ "# HELP prometheus_test_counter a counter", @@ -76,7 +76,7 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, counter, 1, %{foo: :bar, baz: "quux"}) + Peep.Test.insert_metric(name, counter, 1, %{foo: :bar, baz: "quux"}) expected = [ "# HELP prometheus_test_counter a counter", @@ -100,8 +100,8 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, sum, 5, %{foo: :bar, baz: "quux"}) - Peep.insert_metric(name, sum, 3, %{foo: :bar, baz: "quux"}) + Peep.Test.insert_metric(name, sum, 5, %{foo: :bar, baz: "quux"}) + Peep.Test.insert_metric(name, sum, 3, %{foo: :bar, baz: "quux"}) expected = [ "# HELP prometheus_test_sum a sum", @@ -129,8 +129,8 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, sum, 5, %{foo: :bar, baz: "quux"}) - Peep.insert_metric(name, sum, 3, %{foo: :bar, baz: "quux"}) + Peep.Test.insert_metric(name, sum, 5, %{foo: :bar, baz: "quux"}) + Peep.Test.insert_metric(name, sum, 3, %{foo: :bar, baz: "quux"}) expected = [ "# HELP prometheus_test_sum a sum", @@ -155,7 +155,7 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, last_value, 5, %{blee: :bloo, flee: "floo"}) + Peep.Test.insert_metric(name, last_value, 5, %{blee: :bloo, flee: "floo"}) expected = [ "# HELP prometheus_test_gauge a last_value", @@ -183,7 +183,7 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, last_value, 5, %{blee: :bloo, flee: "floo"}) + Peep.Test.insert_metric(name, last_value, 5, %{blee: :bloo, flee: "floo"}) expected = [ "# HELP prometheus_test_gauge a last_value", @@ -215,7 +215,7 @@ defmodule PrometheusTest do expected = [] assert export(name) == lines_to_string(expected) - Peep.insert_metric(name, dist, 1, %{glee: :gloo}) + Peep.Test.insert_metric(name, dist, 1, %{glee: :gloo}) expected = [ "# HELP prometheus_test_distribution a distribution", @@ -264,7 +264,7 @@ defmodule PrometheusTest do assert export(name) == lines_to_string(expected) for i <- 2..2000 do - Peep.insert_metric(name, dist, i, %{glee: :gloo}) + Peep.Test.insert_metric(name, dist, i, %{glee: :gloo}) end expected = [ @@ -337,7 +337,7 @@ defmodule PrometheusTest do expected = [] assert export(name) == lines_to_string(expected) - Peep.insert_metric(name, dist, 1, %{glee: :gloo}) + Peep.Test.insert_metric(name, dist, 1, %{glee: :gloo}) expected = [ "# HELP prometheus_test_distribution a distribution", @@ -360,7 +360,7 @@ defmodule PrometheusTest do f = fn -> for i <- 1..2000 do - Peep.insert_metric(name, dist, i, %{glee: :gloo}) + Peep.Test.insert_metric(name, dist, i, %{glee: :gloo}) end end @@ -404,10 +404,10 @@ defmodule PrometheusTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, counter, 1, %{atom: "\"string\""}) - Peep.insert_metric(name, counter, 1, %{"\"string\"" => :atom}) - Peep.insert_metric(name, counter, 1, %{"\"string\"" => "\"string\""}) - Peep.insert_metric(name, counter, 1, %{"string" => "string\n"}) + Peep.Test.insert_metric(name, counter, 1, %{atom: "\"string\""}) + Peep.Test.insert_metric(name, counter, 1, %{"\"string\"" => :atom}) + Peep.Test.insert_metric(name, counter, 1, %{"\"string\"" => "\"string\""}) + Peep.Test.insert_metric(name, counter, 1, %{"string" => "string\n"}) expected = [ "# HELP prometheus_test_counter a counter", @@ -441,7 +441,7 @@ defmodule PrometheusTest do # Create a struct that doesn't implement String.Chars error_struct = %TestError{reason: :tcp_closed, code: 1001} - Peep.insert_metric(name, counter, 1, %{error: error_struct}) + Peep.Test.insert_metric(name, counter, 1, %{error: error_struct}) result = export(name) diff --git a/test/shared/storage_test.exs b/test/shared/storage_test.exs index 2af5430..2a81061 100644 --- a/test/shared/storage_test.exs +++ b/test/shared/storage_test.exs @@ -28,10 +28,10 @@ defmodule Peep.Storage.Test do f = fn -> for i <- 1..10 do - Peep.insert_metric(name, counter, 1, %{}) + Peep.Test.insert_metric(name, counter, 1, %{}) if rem(i, 2) == 0 do - Peep.insert_metric(name, counter, 1, %{even: true}) + Peep.Test.insert_metric(name, counter, 1, %{even: true}) end end end @@ -50,10 +50,10 @@ defmodule Peep.Storage.Test do f = fn -> for i <- 1..10 do - Peep.insert_metric(name, sum, 2, %{}) + Peep.Test.insert_metric(name, sum, 2, %{}) if rem(i, 2) == 0 do - Peep.insert_metric(name, sum, 3, %{even: true}) + Peep.Test.insert_metric(name, sum, 3, %{even: true}) end end end @@ -72,10 +72,10 @@ defmodule Peep.Storage.Test do f = fn -> for i <- 1..10 do - Peep.insert_metric(name, last_value, i, %{}) + Peep.Test.insert_metric(name, last_value, i, %{}) if rem(i, 2) == 1 do - Peep.insert_metric(name, last_value, i, %{odd: true}) + Peep.Test.insert_metric(name, last_value, i, %{odd: true}) end end end @@ -95,7 +95,7 @@ defmodule Peep.Storage.Test do f = fn -> for i <- 0..2000 do - Peep.insert_metric(name, dist, i, %{}) + Peep.Test.insert_metric(name, dist, i, %{}) end end @@ -159,7 +159,7 @@ defmodule Peep.Storage.Test do f = fn -> for i <- 0..1000 do - Peep.insert_metric(name, dist, i, %{}) + Peep.Test.insert_metric(name, dist, i, %{}) end end @@ -202,7 +202,7 @@ defmodule Peep.Storage.Test do f = fn -> for i <- -500..500 do - Peep.insert_metric(name, dist, i, %{}) + Peep.Test.insert_metric(name, dist, i, %{}) end end @@ -251,7 +251,7 @@ defmodule Peep.Storage.Test do for metric <- metrics, tags <- tags_sets do %{size: size_before, memory: mem_before} = Peep.storage_size(name) - Peep.insert_metric(name, metric, 5, tags) + Peep.Test.insert_metric(name, metric, 5, tags) %{size: size_after, memory: mem_after} = Peep.storage_size(name) assert size_after > size_before @@ -273,8 +273,8 @@ defmodule Peep.Storage.Test do populate = fn -> for metric <- metrics do - Peep.insert_metric(name, metric, 5, %{foo: :bar}) - Peep.insert_metric(name, metric, 5, %{baz: :quux}) + Peep.Test.insert_metric(name, metric, 5, %{foo: :bar}) + Peep.Test.insert_metric(name, metric, 5, %{baz: :quux}) end assert Peep.get_all_metrics(name) != %{} diff --git a/test/shared/test_helpers.ex b/test/shared/test_helpers.ex index 881d67c..82d75ce 100644 --- a/test/shared/test_helpers.ex +++ b/test/shared/test_helpers.ex @@ -1,6 +1,24 @@ defmodule Peep.Test do @moduledoc false alias Telemetry.Metrics + require Peep.Persistent + + def insert_metric(name, metric, value, tags) when is_number(value) do + case Peep.Persistent.fetch(name) do + Peep.Persistent.persistent( + storage: {storage_mod, storage}, + metrics_to_ids: %{^metric => id} + ) -> + storage + |> storage_mod.resolve() + |> storage_mod.insert_metric(id, metric, value, tags) + + _ -> + nil + end + end + + def insert_metric(_name, _metric, _value, _tags), do: nil def get_metric(all_metrics, metric, tags) do tags = to_map(tags) diff --git a/test/statsd_cache_test.exs b/test/statsd_cache_test.exs index ee9dcfb..c185859 100644 --- a/test/statsd_cache_test.exs +++ b/test/statsd_cache_test.exs @@ -22,7 +22,7 @@ defmodule StatsdCacheTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, counter, 1, %{}) + Peep.Test.insert_metric(name, counter, 1, %{}) {delta_one, cache_one} = calculate_deltas_and_replacement(cache_of(name), Cache.new([])) @@ -32,7 +32,7 @@ defmodule StatsdCacheTest do assert Map.values(delta_two) == [] - Peep.insert_metric(name, counter, 1, %{}) + Peep.Test.insert_metric(name, counter, 1, %{}) {delta_three, _cache_three} = calculate_deltas_and_replacement(cache_of(name), cache_two) assert Map.values(delta_three) == [1] @@ -51,7 +51,7 @@ defmodule StatsdCacheTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, sum, 10, %{}) + Peep.Test.insert_metric(name, sum, 10, %{}) {delta_one, cache_one} = calculate_deltas_and_replacement(cache_of(name), Cache.new([])) @@ -61,7 +61,7 @@ defmodule StatsdCacheTest do assert Map.values(delta_two) == [] - Peep.insert_metric(name, sum, 10, %{}) + Peep.Test.insert_metric(name, sum, 10, %{}) {delta_three, _cache_three} = calculate_deltas_and_replacement(cache_of(name), cache_two) assert Map.values(delta_three) == [10] @@ -80,9 +80,9 @@ defmodule StatsdCacheTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, dist, 500, %{}) - Peep.insert_metric(name, dist, 500, %{}) - Peep.insert_metric(name, dist, 500, %{}) + Peep.Test.insert_metric(name, dist, 500, %{}) + Peep.Test.insert_metric(name, dist, 500, %{}) + Peep.Test.insert_metric(name, dist, 500, %{}) {delta_one, cache_one} = calculate_deltas_and_replacement(cache_of(name), Cache.new([])) @@ -92,9 +92,9 @@ defmodule StatsdCacheTest do assert Map.values(delta_two) == [] - Peep.insert_metric(name, dist, 500, %{}) - Peep.insert_metric(name, dist, 500, %{}) - Peep.insert_metric(name, dist, 1000, %{}) + Peep.Test.insert_metric(name, dist, 500, %{}) + Peep.Test.insert_metric(name, dist, 500, %{}) + Peep.Test.insert_metric(name, dist, 1000, %{}) {delta_three, _cache_three} = calculate_deltas_and_replacement(cache_of(name), cache_two) assert Map.values(delta_three) |> Enum.sort() == [1, 2] @@ -113,7 +113,7 @@ defmodule StatsdCacheTest do {:ok, _pid} = Peep.start_link(opts) - Peep.insert_metric(name, last_value, 10, %{}) + Peep.Test.insert_metric(name, last_value, 10, %{}) {delta_one, cache_one} = calculate_deltas_and_replacement(cache_of(name), Cache.new([])) diff --git a/test/statsd_test.exs b/test/statsd_test.exs index e9295c3..f31a039 100644 --- a/test/statsd_test.exs +++ b/test/statsd_test.exs @@ -24,7 +24,7 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for _ <- 1..10 do - Peep.insert_metric(name, counter, 1, %{bar: "quuz"}) + Peep.Test.insert_metric(name, counter, 1, %{bar: "quuz"}) end expected = ["statsd.test.counter:10|c|#foo:bar,bar:quuz"] @@ -45,7 +45,7 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for _ <- 1..10 do - Peep.insert_metric(name, counter, 1, %{foo: 2137, bar: "quuz"}) + Peep.Test.insert_metric(name, counter, 1, %{foo: 2137, bar: "quuz"}) end expected = ["statsd.test.counter:10|c|#foo:2137,bar:quuz"] @@ -67,10 +67,10 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for i <- 1..10 do - Peep.insert_metric(name, counter, 1, %{}) + Peep.Test.insert_metric(name, counter, 1, %{}) if rem(i, 2) == 0 do - Peep.insert_metric(name, counter, 1, %{even: true}) + Peep.Test.insert_metric(name, counter, 1, %{even: true}) end end @@ -92,10 +92,10 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for i <- 1..10 do - Peep.insert_metric(name, sum, 1, %{}) + Peep.Test.insert_metric(name, sum, 1, %{}) if rem(i, 2) == 0 do - Peep.insert_metric(name, sum, 1, %{even: true}) + Peep.Test.insert_metric(name, sum, 1, %{even: true}) end end @@ -117,10 +117,10 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for i <- 1..10 do - Peep.insert_metric(name, last_value, i, %{}) + Peep.Test.insert_metric(name, last_value, i, %{}) if rem(i, 2) == 1 do - Peep.insert_metric(name, last_value, i, %{odd: true}) + Peep.Test.insert_metric(name, last_value, i, %{odd: true}) end end @@ -142,10 +142,10 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for i <- 1..1000 do - Peep.insert_metric(name, dist, i, %{}) + Peep.Test.insert_metric(name, dist, i, %{}) if rem(i, 100) == 0 do - Peep.insert_metric(name, dist, i, %{foo: :bar}) + Peep.Test.insert_metric(name, dist, i, %{foo: :bar}) end end @@ -212,10 +212,10 @@ defmodule StatsdTest do {:ok, _pid} = Peep.start_link(opts) for i <- 1..1000 do - Peep.insert_metric(name, dist, i, %{}) + Peep.Test.insert_metric(name, dist, i, %{}) if rem(i, 100) == 0 do - Peep.insert_metric(name, dist, i, %{foo: :bar}) + Peep.Test.insert_metric(name, dist, i, %{foo: :bar}) end end @@ -291,8 +291,8 @@ defmodule StatsdTest do last_value = last_value.(i) for j <- 1..10 do - Peep.insert_metric(name, sum, j, %{}) - Peep.insert_metric(name, last_value, j, %{}) + Peep.Test.insert_metric(name, sum, j, %{}) + Peep.Test.insert_metric(name, last_value, j, %{}) end end From 366bff831b6c4a3a8aa6f967b9d93c743d6494b2 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 11:36:15 -0400 Subject: [PATCH 08/23] Fix race conditions in tests --- test/shared/storage_test.exs | 2 +- test/support/custom_storage.ex | 2 +- test/support/storage_counter.ex | 7 +------ test/test_helper.exs | 1 - 4 files changed, 3 insertions(+), 9 deletions(-) diff --git a/test/shared/storage_test.exs b/test/shared/storage_test.exs index 2a81061..04f2b1b 100644 --- a/test/shared/storage_test.exs +++ b/test/shared/storage_test.exs @@ -295,7 +295,7 @@ defmodule Peep.Storage.Test do end defp start_peep!(options) do - name = System.unique_integer([:positive]) |> Integer.to_string() |> String.to_atom() + name = Peep.Support.StorageCounter.fresh_id() {:ok, _pid} = Peep.start_link(Keyword.put(options, :name, name)) name diff --git a/test/support/custom_storage.ex b/test/support/custom_storage.ex index 8ba2485..b131329 100644 --- a/test/support/custom_storage.ex +++ b/test/support/custom_storage.ex @@ -62,7 +62,7 @@ defmodule CustomStorage do end def insert_metric(agents, id, %Metrics.LastValue{}, value, %{} = tags) do - agent = pick_agent(agents) + agent = elem(agents, 0) key = {id, tags} Agent.update(agent, fn state -> diff --git a/test/support/storage_counter.ex b/test/support/storage_counter.ex index 2f1f83b..a7a8105 100644 --- a/test/support/storage_counter.ex +++ b/test/support/storage_counter.ex @@ -1,12 +1,7 @@ defmodule Peep.Support.StorageCounter do @moduledoc false - use Agent - - def start() do - Agent.start(fn -> 0 end, name: __MODULE__) - end def fresh_id() do - Agent.get_and_update(__MODULE__, fn i -> {:"#{i}", i + 1} end) + :"#{System.unique_integer([:positive])}" end end diff --git a/test/test_helper.exs b/test/test_helper.exs index 0437b13..b7817f0 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,3 +1,2 @@ ExUnit.start() Application.put_env(:peep, :test_storages, [:default, :striped, {CustomStorage, 3}]) -Peep.Support.StorageCounter.start() From 1bbd55791ed91b41aa1b51ad45e11b58dd23c8ab Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 11:48:20 -0400 Subject: [PATCH 09/23] Unify Peep startup across all tests --- test/plug_test.exs | 4 +- test/prometheus_test.exs | 122 ++++++-------------------------- test/shared/storage_test.exs | 23 +++--- test/shared/test_helpers.ex | 10 +++ test/statsd_cache_test.exs | 46 ++---------- test/statsd_test.exs | 97 +++++-------------------- test/support/storage_counter.ex | 7 -- 7 files changed, 62 insertions(+), 247 deletions(-) delete mode 100644 test/support/storage_counter.ex diff --git a/test/plug_test.exs b/test/plug_test.exs index 2ba9711..03e87c6 100644 --- a/test/plug_test.exs +++ b/test/plug_test.exs @@ -4,8 +4,6 @@ defmodule PlugTest do import Plug.Test import Telemetry.Metrics - alias Peep.Support.StorageCounter - describe "init/1" do test "should raise an error if peep_worker is not provided" do assert_raise KeyError, ~r/^key :peep_worker not found.*/, fn -> @@ -111,7 +109,7 @@ defmodule PlugTest do end def setup_peep_worker(context) do - name = StorageCounter.fresh_id() + name = Peep.Test.fresh_id() start_supervised!( {Peep, name: name, metrics: [last_value("vm.memory.total", unit: :byte)]}, diff --git a/test/prometheus_test.exs b/test/prometheus_test.exs index 71fa313..d717753 100644 --- a/test/prometheus_test.exs +++ b/test/prometheus_test.exs @@ -4,8 +4,6 @@ defmodule PrometheusTest do alias Peep.Prometheus alias Telemetry.Metrics - alias Peep.Support.StorageCounter - # Test struct that doesn't implement String.Chars defmodule TestError do defstruct [:reason, :code] @@ -17,16 +15,13 @@ defmodule PrometheusTest do describe "#{impl} - global metadata" do test "is present in formatted output" do counter = Metrics.counter("prometheus.test.counter", description: "a counter") - name = StorageCounter.fresh_id() - - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl), - global_tags: %{foo: :bar} - ] - {:ok, _pid} = Peep.start_link(opts) + name = + Peep.Test.start_peep!( + metrics: [counter], + storage: unquote(impl), + global_tags: %{foo: :bar} + ) Peep.Test.insert_metric(name, counter, 1, %{baz: "quux"}) @@ -41,16 +36,13 @@ defmodule PrometheusTest do test "can be overridden by event metadata" do counter = Metrics.counter("prometheus.test.counter", description: "a counter") - name = StorageCounter.fresh_id() - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl), - global_tags: %{foo: :bar} - ] - - {:ok, _pid} = Peep.start_link(opts) + name = + Peep.Test.start_peep!( + metrics: [counter], + storage: unquote(impl), + global_tags: %{foo: :bar} + ) Peep.Test.insert_metric(name, counter, 1, %{foo: 2137, baz: "quux"}) @@ -66,15 +58,7 @@ defmodule PrometheusTest do test "#{impl} - counter formatting" do counter = Metrics.counter("prometheus.test.counter", description: "a counter") - name = StorageCounter.fresh_id() - - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [counter], storage: unquote(impl)) Peep.Test.insert_metric(name, counter, 1, %{foo: :bar, baz: "quux"}) @@ -89,16 +73,8 @@ defmodule PrometheusTest do describe "#{impl} - sum" do test "sum formatting" do - name = StorageCounter.fresh_id() sum = Metrics.sum("prometheus.test.sum", description: "a sum") - - opts = [ - name: name, - metrics: [sum], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [sum], storage: unquote(impl)) Peep.Test.insert_metric(name, sum, 5, %{foo: :bar, baz: "quux"}) Peep.Test.insert_metric(name, sum, 3, %{foo: :bar, baz: "quux"}) @@ -113,21 +89,13 @@ defmodule PrometheusTest do end test "custom type" do - name = StorageCounter.fresh_id() - sum = Metrics.sum("prometheus.test.sum", description: "a sum", reporter_options: [prometheus_type: "gauge"] ) - opts = [ - name: name, - metrics: [sum], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [sum], storage: unquote(impl)) Peep.Test.insert_metric(name, sum, 5, %{foo: :bar, baz: "quux"}) Peep.Test.insert_metric(name, sum, 3, %{foo: :bar, baz: "quux"}) @@ -144,16 +112,8 @@ defmodule PrometheusTest do describe "#{impl} - last_value" do test "formatting" do - name = StorageCounter.fresh_id() last_value = Metrics.last_value("prometheus.test.gauge", description: "a last_value") - - opts = [ - name: name, - metrics: [last_value], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [last_value], storage: unquote(impl)) Peep.Test.insert_metric(name, last_value, 5, %{blee: :bloo, flee: "floo"}) @@ -167,21 +127,13 @@ defmodule PrometheusTest do end test "custom type" do - name = StorageCounter.fresh_id() - last_value = Metrics.last_value("prometheus.test.gauge", description: "a last_value", reporter_options: [prometheus_type: :sum] ) - opts = [ - name: name, - metrics: [last_value], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [last_value], storage: unquote(impl)) Peep.Test.insert_metric(name, last_value, 5, %{blee: :bloo, flee: "floo"}) @@ -196,21 +148,13 @@ defmodule PrometheusTest do end test "#{impl} - dist formatting" do - name = StorageCounter.fresh_id() - dist = Metrics.distribution("prometheus.test.distribution", description: "a distribution", reporter_options: [max_value: 1000] ) - opts = [ - name: name, - metrics: [dist], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [dist], storage: unquote(impl)) expected = [] assert export(name) == lines_to_string(expected) @@ -315,8 +259,6 @@ defmodule PrometheusTest do end test "#{impl} - dist formatting pow10" do - name = StorageCounter.fresh_id() - dist = Metrics.distribution("prometheus.test.distribution", description: "a distribution", @@ -326,13 +268,7 @@ defmodule PrometheusTest do ] ) - opts = [ - name: name, - metrics: [dist], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [dist], storage: unquote(impl)) expected = [] assert export(name) == lines_to_string(expected) @@ -388,21 +324,13 @@ defmodule PrometheusTest do end test "#{impl} - regression: label escaping" do - name = StorageCounter.fresh_id() - counter = Metrics.counter( "prometheus.test.counter", description: "a counter" ) - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [counter], storage: unquote(impl)) Peep.Test.insert_metric(name, counter, 1, %{atom: "\"string\""}) Peep.Test.insert_metric(name, counter, 1, %{"\"string\"" => :atom}) @@ -422,21 +350,13 @@ defmodule PrometheusTest do end test "#{impl} - regression: handle structs without String.Chars" do - name = StorageCounter.fresh_id() - counter = Metrics.counter( "prometheus.test.counter", description: "a counter" ) - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [counter], storage: unquote(impl)) # Create a struct that doesn't implement String.Chars error_struct = %TestError{reason: :tcp_closed, code: 1001} diff --git a/test/shared/storage_test.exs b/test/shared/storage_test.exs index 04f2b1b..b602391 100644 --- a/test/shared/storage_test.exs +++ b/test/shared/storage_test.exs @@ -24,7 +24,7 @@ defmodule Peep.Storage.Test do test "#{inspect(impl)} - a counter can be stored and retrieved" do counter = Metrics.counter("storage.test.counter") - name = start_peep!(storage: unquote(impl), metrics: [counter]) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: [counter]) f = fn -> for i <- 1..10 do @@ -46,7 +46,7 @@ defmodule Peep.Storage.Test do test "#{inspect(impl)} - a sum can be stored and retrieved" do sum = Metrics.sum("storage.test.sum") - name = start_peep!(storage: unquote(impl), metrics: [sum]) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: [sum]) f = fn -> for i <- 1..10 do @@ -68,7 +68,7 @@ defmodule Peep.Storage.Test do test "#{inspect(impl)} - a last_value can be stored and retrieved" do last_value = Metrics.last_value("storage.test.gauge") - name = start_peep!(storage: unquote(impl), metrics: [last_value]) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: [last_value]) f = fn -> for i <- 1..10 do @@ -91,7 +91,7 @@ defmodule Peep.Storage.Test do dist = Metrics.distribution("storage.test.distribution", reporter_options: [max_value: 1000]) - name = start_peep!(storage: unquote(impl), metrics: [dist]) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: [dist]) f = fn -> for i <- 0..2000 do @@ -155,7 +155,7 @@ defmodule Peep.Storage.Test do ] ) - name = start_peep!(storage: unquote(impl), metrics: [dist]) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: [dist]) f = fn -> for i <- 0..1000 do @@ -198,7 +198,7 @@ defmodule Peep.Storage.Test do ] ) - name = start_peep!(storage: unquote(impl), metrics: [dist]) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: [dist]) f = fn -> for i <- -500..500 do @@ -241,7 +241,7 @@ defmodule Peep.Storage.Test do metrics = [counter, sum, last_value, dist] - name = start_peep!(storage: unquote(impl), metrics: metrics) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: metrics) tags_sets = [ %{}, @@ -269,7 +269,7 @@ defmodule Peep.Storage.Test do metrics = [counter, sum, last_value, dist] - name = start_peep!(storage: unquote(impl), metrics: metrics) + name = Peep.Test.start_peep!(storage: unquote(impl), metrics: metrics) populate = fn -> for metric <- metrics do @@ -293,11 +293,4 @@ defmodule Peep.Storage.Test do assert Peep.get_all_metrics(name) == %{} end end - - defp start_peep!(options) do - name = Peep.Support.StorageCounter.fresh_id() - - {:ok, _pid} = Peep.start_link(Keyword.put(options, :name, name)) - name - end end diff --git a/test/shared/test_helpers.ex b/test/shared/test_helpers.ex index 82d75ce..7d55b1e 100644 --- a/test/shared/test_helpers.ex +++ b/test/shared/test_helpers.ex @@ -20,6 +20,16 @@ defmodule Peep.Test do def insert_metric(_name, _metric, _value, _tags), do: nil + def fresh_id do + :"#{System.unique_integer([:positive])}" + end + + def start_peep!(options) do + name = fresh_id() + {:ok, _pid} = Peep.start_link(Keyword.put(options, :name, name)) + name + end + def get_metric(all_metrics, metric, tags) do tags = to_map(tags) tags_map = Map.get(all_metrics, metric, %{}) diff --git a/test/statsd_cache_test.exs b/test/statsd_cache_test.exs index c185859..c2a7b69 100644 --- a/test/statsd_cache_test.exs +++ b/test/statsd_cache_test.exs @@ -4,23 +4,12 @@ defmodule StatsdCacheTest do alias Peep.Statsd.Cache alias Telemetry.Metrics - alias Peep.Support.StorageCounter - @impls [:default, :striped] for impl <- @impls do test "#{impl} - a counter with no increments is omitted from delta" do - name = StorageCounter.fresh_id() - counter = Metrics.counter("cache.test.counter") - - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [counter], storage: unquote(impl)) Peep.Test.insert_metric(name, counter, 1, %{}) @@ -39,17 +28,8 @@ defmodule StatsdCacheTest do end test "#{impl} - a sum with no increments is omitted from delta" do - name = StorageCounter.fresh_id() - sum = Metrics.sum("cache.test.counter") - - opts = [ - name: name, - metrics: [sum], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [sum], storage: unquote(impl)) Peep.Test.insert_metric(name, sum, 10, %{}) @@ -68,17 +48,8 @@ defmodule StatsdCacheTest do end test "#{impl} - a distribution with no samples is omitted from delta" do - name = StorageCounter.fresh_id() - dist = Metrics.distribution("cache.test.dist", reporter_options: [max_value: 1000]) - - opts = [ - name: name, - metrics: [dist], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [dist], storage: unquote(impl)) Peep.Test.insert_metric(name, dist, 500, %{}) Peep.Test.insert_metric(name, dist, 500, %{}) @@ -101,17 +72,8 @@ defmodule StatsdCacheTest do end test "#{impl} - a last_value with no changes is included in deltas" do - name = StorageCounter.fresh_id() - last_value = Metrics.last_value("cache.test.gauge") - - opts = [ - name: name, - metrics: [last_value], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [last_value], storage: unquote(impl)) Peep.Test.insert_metric(name, last_value, 10, %{}) diff --git a/test/statsd_test.exs b/test/statsd_test.exs index f31a039..b3e1c00 100644 --- a/test/statsd_test.exs +++ b/test/statsd_test.exs @@ -4,24 +4,19 @@ defmodule StatsdTest do alias Peep.Statsd alias Telemetry.Metrics - alias Peep.Support.StorageCounter - @impls [:default, :striped] for impl <- @impls do describe "#{impl} - global metadata" do test "is present in formatted output" do counter = Metrics.counter("statsd.test.counter", description: "a counter") - name = StorageCounter.fresh_id() - - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl), - global_tags: %{foo: :bar} - ] - {:ok, _pid} = Peep.start_link(opts) + name = + Peep.Test.start_peep!( + metrics: [counter], + storage: unquote(impl), + global_tags: %{foo: :bar} + ) for _ <- 1..10 do Peep.Test.insert_metric(name, counter, 1, %{bar: "quuz"}) @@ -33,16 +28,13 @@ defmodule StatsdTest do test "can be overridden by event metadata" do counter = Metrics.counter("statsd.test.counter", description: "a counter") - name = StorageCounter.fresh_id() - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl), - global_tags: %{foo: :bar} - ] - - {:ok, _pid} = Peep.start_link(opts) + name = + Peep.Test.start_peep!( + metrics: [counter], + storage: unquote(impl), + global_tags: %{foo: :bar} + ) for _ <- 1..10 do Peep.Test.insert_metric(name, counter, 1, %{foo: 2137, bar: "quuz"}) @@ -54,17 +46,8 @@ defmodule StatsdTest do end test "#{impl} - a counter can be formatted" do - name = StorageCounter.fresh_id() - counter = Metrics.counter("statsd.test.counter") - - opts = [ - name: name, - metrics: [counter], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [counter], storage: unquote(impl)) for i <- 1..10 do Peep.Test.insert_metric(name, counter, 1, %{}) @@ -79,17 +62,8 @@ defmodule StatsdTest do end test "#{impl} - a sum can be formatted" do - name = StorageCounter.fresh_id() - sum = Metrics.sum("statsd.test.sum") - - opts = [ - name: name, - metrics: [sum], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [sum], storage: unquote(impl)) for i <- 1..10 do Peep.Test.insert_metric(name, sum, 1, %{}) @@ -104,17 +78,8 @@ defmodule StatsdTest do end test "#{impl} - a last_value can be formatted" do - name = StorageCounter.fresh_id() - last_value = Metrics.last_value("statsd.test.gauge") - - opts = [ - name: name, - metrics: [last_value], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [last_value], storage: unquote(impl)) for i <- 1..10 do Peep.Test.insert_metric(name, last_value, i, %{}) @@ -129,17 +94,8 @@ defmodule StatsdTest do end test "#{impl} - a distribution can be formatted (standard)" do - name = StorageCounter.fresh_id() - dist = Metrics.distribution("statsd.test.dist", reporter_options: [max_value: 1000]) - - opts = [ - name: name, - metrics: [dist], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [dist], storage: unquote(impl)) for i <- 1..1000 do Peep.Test.insert_metric(name, dist, i, %{}) @@ -199,17 +155,8 @@ defmodule StatsdTest do end test "#{impl} - a distribution can be formatted (datadog)" do - name = StorageCounter.fresh_id() - dist = Metrics.distribution("statsd.test.dist", reporter_options: [max_value: 1000]) - - opts = [ - name: name, - metrics: [dist], - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: [dist], storage: unquote(impl)) for i <- 1..1000 do Peep.Test.insert_metric(name, dist, i, %{}) @@ -268,8 +215,6 @@ defmodule StatsdTest do end test "#{impl} - metrics are batched according to mtu option" do - name = StorageCounter.fresh_id() - sum = fn i -> Metrics.sum("statsd.test.sum.#{i}") end last_value = fn i -> Metrics.last_value("statsd.test.gauge.#{i}") end @@ -278,13 +223,7 @@ defmodule StatsdTest do [sum.(i), last_value.(i) | acc] end) - opts = [ - name: name, - metrics: metrics, - storage: unquote(impl) - ] - - {:ok, _pid} = Peep.start_link(opts) + name = Peep.Test.start_peep!(metrics: metrics, storage: unquote(impl)) for i <- 1..10 do sum = sum.(i) diff --git a/test/support/storage_counter.ex b/test/support/storage_counter.ex deleted file mode 100644 index a7a8105..0000000 --- a/test/support/storage_counter.ex +++ /dev/null @@ -1,7 +0,0 @@ -defmodule Peep.Support.StorageCounter do - @moduledoc false - - def fresh_id() do - :"#{System.unique_integer([:positive])}" - end -end From 68b1c0b137a3d5aa62cca60a24a1c403360c9c37 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 12:10:11 -0400 Subject: [PATCH 10/23] Add records for new tuples --- lib/peep/event_handler.ex | 124 ++++++++++++++++--------------------- lib/peep/handler/config.ex | 84 +++++++++++++++++++++++++ lib/peep/handler/metric.ex | 63 +++++++++++++++++++ 3 files changed, 202 insertions(+), 69 deletions(-) create mode 100644 lib/peep/handler/config.ex create mode 100644 lib/peep/handler/metric.ex diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index af8236a..b1b49af 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -5,6 +5,12 @@ defmodule Peep.EventHandler do import Peep.Persistent, only: [persistent: 1] + require Peep.Handler.Config + require Peep.Handler.Metric + + alias Peep.Handler.Config + alias Peep.Handler.Metric + def attach(name) do persistent( events_to_metrics: metrics_by_event, @@ -14,27 +20,12 @@ defmodule Peep.EventHandler do for {event_name, metrics} <- metrics_by_event do handler_id = handler_id(event_name, name) - {tag_fns, metrics_with_tag_fn_idx} = deduplicate_tag_fns(metrics) - - metrics_rows = - Enum.map(metrics_with_tag_fn_idx, fn {metric, id, tag_idx} -> - %{measurement: measurement, keep: keep} = metric - - keep_val = if is_nil(keep), do: :no_keep, else: keep - - insert_fn = fn data, value, tags -> - storage_mod.insert_metric(data, id, metric, value, tags) - end - - {metric_type(metric), insert_fn, keep_val, measurement, tag_idx} - end) - :ok = :telemetry.attach( handler_id, event_name, &__MODULE__.handle_event/4, - {metrics_rows, storage_mod, storage, tag_fns} + Config.new(metrics, storage_mod, storage) ) handler_id @@ -54,11 +45,16 @@ defmodule Peep.EventHandler do _event, measurements, metadata, - {metrics_rows, storage_mod, storage, tag_fns} + Config.handler_config( + metrics: metrics, + storage_mod: storage_mod, + storage: storage, + tag_fns: tag_fns + ) ) do resolved = storage_mod.resolve(storage) tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, []) - store_metrics(metrics_rows, measurements, metadata, storage_mod, resolved, tag_results) + store_metrics(metrics, measurements, metadata, resolved, tag_results) end defp compute_tags(_tag_fns, _metadata, size, size, acc) do @@ -69,25 +65,39 @@ defmodule Peep.EventHandler do compute_tags(tag_fns, metadata, size, idx + 1, [elem(tag_fns, idx).(metadata) | acc]) end - defp store_metrics([], _measurements, _metadata, _mod, _data, _tag_results), do: :ok + defp store_metrics([], _measurements, _metadata, _data, _tag_results), do: :ok defp store_metrics( - [{:counter, insert_fn, :no_keep, _measurement, tag_idx} | rest], + [ + Metric.handler_metric( + type: :counter, + insert_fn: insert_fn, + keep: :no_keep, + tag_idx: tag_idx + ) + | rest + ], measurements, metadata, - mod, data, tag_results ) do insert_fn.(data, 1, elem(tag_results, tag_idx)) - store_metrics(rest, measurements, metadata, mod, data, tag_results) + store_metrics(rest, measurements, metadata, data, tag_results) end defp store_metrics( - [{_type, insert_fn, :no_keep, measurement, tag_idx} | rest], + [ + Metric.handler_metric( + insert_fn: insert_fn, + keep: :no_keep, + measurement: measurement, + tag_idx: tag_idx + ) + | rest + ], measurements, metadata, - mod, data, tag_results ) do @@ -99,14 +109,21 @@ defmodule Peep.EventHandler do nil end - store_metrics(rest, measurements, metadata, mod, data, tag_results) + store_metrics(rest, measurements, metadata, data, tag_results) end defp store_metrics( - [{:counter, insert_fn, keep, _measurement, tag_idx} | rest], + [ + Metric.handler_metric( + type: :counter, + insert_fn: insert_fn, + keep: keep, + tag_idx: tag_idx + ) + | rest + ], measurements, metadata, - mod, data, tag_results ) do @@ -114,14 +131,21 @@ defmodule Peep.EventHandler do insert_fn.(data, 1, elem(tag_results, tag_idx)) end - store_metrics(rest, measurements, metadata, mod, data, tag_results) + store_metrics(rest, measurements, metadata, data, tag_results) end defp store_metrics( - [{_type, insert_fn, keep, measurement, tag_idx} | rest], + [ + Metric.handler_metric( + insert_fn: insert_fn, + keep: keep, + measurement: measurement, + tag_idx: tag_idx + ) + | rest + ], measurements, metadata, - mod, data, tag_results ) do @@ -135,7 +159,7 @@ defmodule Peep.EventHandler do end end - store_metrics(rest, measurements, metadata, mod, data, tag_results) + store_metrics(rest, measurements, metadata, data, tag_results) end defp keep?(keep, metadata, measurement) when is_function(keep, 2), @@ -166,42 +190,4 @@ defmodule Peep.EventHandler do end end end - - defp deduplicate_tag_fns(metrics) do - {unique_metrics_tags, _} = - Enum.reduce(metrics, {%{}, 0}, fn {metric, _id}, {map, next_idx} -> - key = tag_fn_key(metric) - - case map do - %{^key => _} -> {map, next_idx} - _ -> {Map.put(map, key, next_idx), next_idx + 1} - end - end) - - tag_fns = - unique_metrics_tags - |> Enum.sort_by(fn {_key, idx} -> idx end) - |> Enum.map(fn {{tag_values, tags}, _idx} -> compile_tag_fn(tag_values, tags) end) - |> List.to_tuple() - - metrics_with_tag_fn_idx = - Enum.map(metrics, fn {metric, id} -> - key = tag_fn_key(metric) - {metric, id, Map.fetch!(unique_metrics_tags, key)} - end) - - {tag_fns, metrics_with_tag_fn_idx} - end - - defp tag_fn_key(metric), do: {metric.tag_values, metric.tags} - - defp compile_tag_fn(_tag_values, []), do: fn _metadata -> %{} end - defp compile_tag_fn(_tag_values, tags) when is_function(tags, 1), do: tags - - defp compile_tag_fn(tag_values, keys) do - fn metadata -> Map.take(tag_values.(metadata), keys) end - end - - defp metric_type(%Telemetry.Metrics.Counter{}), do: :counter - defp metric_type(_), do: :other end diff --git a/lib/peep/handler/config.ex b/lib/peep/handler/config.ex new file mode 100644 index 0000000..a149887 --- /dev/null +++ b/lib/peep/handler/config.ex @@ -0,0 +1,84 @@ +defmodule Peep.Handler.Config do + @moduledoc """ + The configuration attached to a telemetry handler via `:telemetry.attach/4`. + + Built once per event during `Peep.EventHandler.attach/1`, compiling raw + Telemetry.Metrics definitions into an optimized form: + + * `metrics` — a list of `Peep.Handler.Metric` records, one per metric + definition attached to this event. + + * `storage_mod` — the `Peep.Storage` implementation module. Passed to Stored + here so `Peep.EventHandler.handle_event/4` can call + `storage_mod.resolve/1` once per event rather than once per metric. + + * `storage` — the storage state term (e.g. an ETS table id). Passed to + `storage_mod.resolve/1` to get the resolved handle for the current + scheduler. + + * `tag_fns` — a tuple of tag-computing functions, deduplicated across + metrics that share the same `tags`/`tag_values` configuration. Evaluated + once per event; each metric accesses its result in O(1) by `tag_idx`. + """ + + require Record + + alias Peep.Handler.Metric + + Record.defrecord(:handler_config, [:metrics, :storage_mod, :storage, :tag_fns]) + + @type t :: + record(:handler_config, + metrics: [Metric.t()], + storage_mod: module(), + storage: term(), + tag_fns: tuple() + ) + + @doc """ + Compiles a list of `{metric, id}` pairs into a handler config record. + + Metrics that share the same `tags`/`tag_values` configuration are assigned + the same `tag_idx`, so their tag function is evaluated only once per event. + """ + @spec new([{Telemetry.Metrics.t(), Peep.metric_id()}], module(), term()) :: t() + def new(metrics, storage_mod, storage) do + {tag_fn_indices, _} = + Enum.reduce(metrics, {%{}, 0}, fn {metric, _id}, {map, next_idx} -> + key = tag_fn_key(metric) + + case map do + %{^key => _} -> {map, next_idx} + _ -> {Map.put(map, key, next_idx), next_idx + 1} + end + end) + + tag_fns = + tag_fn_indices + |> Enum.sort_by(fn {_key, idx} -> idx end) + |> Enum.map(fn {{tag_values, tags}, _idx} -> compile_tag_fn(tag_values, tags) end) + |> List.to_tuple() + + handler_metrics = + Enum.map(metrics, fn {metric, id} -> + tag_idx = Map.fetch!(tag_fn_indices, tag_fn_key(metric)) + Metric.new(metric, id, storage_mod, tag_idx) + end) + + handler_config( + metrics: handler_metrics, + storage_mod: storage_mod, + storage: storage, + tag_fns: tag_fns + ) + end + + defp tag_fn_key(metric), do: {metric.tag_values, metric.tags} + + defp compile_tag_fn(_tag_values, []), do: fn _metadata -> %{} end + defp compile_tag_fn(_tag_values, tags) when is_function(tags, 1), do: tags + + defp compile_tag_fn(tag_values, keys) do + fn metadata -> Map.take(tag_values.(metadata), keys) end + end +end diff --git a/lib/peep/handler/metric.ex b/lib/peep/handler/metric.ex new file mode 100644 index 0000000..dfbf703 --- /dev/null +++ b/lib/peep/handler/metric.ex @@ -0,0 +1,63 @@ +defmodule Peep.Handler.Metric do + @moduledoc """ + A pre-compiled representation of a single metric within a telemetry handler. + + Each record captures everything needed to store a sample in the hot path: + + * `type` — `:counter` or `:other`. Counters are special-cased: they always + store value `1` and skip measurement lookup entirely. + + * `insert_fn` — a closure over `storage_mod`, metric id, and the metric + struct. Calling `insert_fn.(resolved_storage, value, tags)` writes + directly to the storage backend, avoiding repeated map/struct lookups + on every event. + + * `keep` — the keep filter, or `:no_keep` when absent. Splitting `:no_keep` + from function-valued keeps lets `store_metrics` clause-match into four + fast paths (counter/other × keep/no_keep). + + * `measurement` — the measurement key, or a 1-/2-arity function. Stored + verbatim from the metric struct so `fetch_measurement` can extract the + value from the measurements map without re-traversing the metric. + + * `tag_idx` — index into the pre-computed tag results tuple. Metrics that + share the same `tags`/`tag_values` configuration share a single tag + function (deduplicated during `Peep.Handler.Config.new/3`), and `tag_idx` + points to the result of that shared function, avoiding redundant tag + computation per event. + """ + + require Record + + Record.defrecord(:handler_metric, [:type, :insert_fn, :keep, :measurement, :tag_idx]) + + @type t :: + record(:handler_metric, + type: :counter | :other, + insert_fn: (term(), number(), map() -> any()), + keep: :no_keep | (map() -> boolean()) | (map(), term() -> boolean()), + measurement: atom() | (map() -> number()) | (map(), map() -> number()) | nil, + tag_idx: non_neg_integer() + ) + + @spec new(Telemetry.Metrics.t(), Peep.metric_id(), module(), non_neg_integer()) :: t() + def new(metric, id, storage_mod, tag_idx) do + insert_fn = fn data, value, tags -> + storage_mod.insert_metric(data, id, metric, value, tags) + end + + handler_metric( + type: metric_type(metric), + insert_fn: insert_fn, + keep: keep_value(metric), + measurement: metric.measurement, + tag_idx: tag_idx + ) + end + + defp metric_type(%Telemetry.Metrics.Counter{}), do: :counter + defp metric_type(_), do: :other + + defp keep_value(%{keep: nil}), do: :no_keep + defp keep_value(%{keep: keep}), do: keep +end From 47080c553ae1b3b39efdf3bb743d6a4fb4691a3d Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 12:12:40 -0400 Subject: [PATCH 11/23] Fix Peep.Persistent not cleaning up --- lib/peep/persistent.ex | 2 +- test/peep_test.exs | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index ae6af0a..fe70635 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -83,7 +83,7 @@ defmodule Peep.Persistent do @spec erase(name()) :: :ok def erase(name) when is_atom(name) do - :persistent_term.erase(name) + :persistent_term.erase(key(name)) :ok end diff --git a/test/peep_test.exs b/test/peep_test.exs index 333400d..59959e9 100644 --- a/test/peep_test.exs +++ b/test/peep_test.exs @@ -142,6 +142,21 @@ defmodule PeepTest do assert [] == :telemetry.list_handlers(prefix) end + test "persistent_term is cleaned up on shutdown" do + name = :"#{__MODULE__}_persistent_term_cleanup" + + {:ok, options} = + Peep.Options.validate(name: name, metrics: [Metrics.counter("cleanup.test.counter")]) + + {:ok, pid} = GenServer.start(Peep, options, name: options.name) + + assert Peep.Persistent.fetch(name) != nil + + GenServer.stop(pid, :shutdown) + + assert Peep.Persistent.fetch(name) == nil + end + test "assign_ids" do metrics = [c, s, d, l] = [ From c0a499a32a2829ac3275a8f156d6aed58df58ed8 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 12:25:56 -0400 Subject: [PATCH 12/23] Remove metrics_to_ids from Peep.Persistent --- lib/peep.ex | 11 ++++------- lib/peep/persistent.ex | 8 +------- test/peep_test.exs | 5 +---- test/shared/test_helpers.ex | 21 +++++++++++++++++---- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index 395667f..e06291c 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -202,20 +202,18 @@ defmodule Peep do Enum.reverse(filtered_metrics), %{}, %{}, - %{}, length(filtered_metrics) ) end - defp assign_metric_ids([], events_to_metrics, ids_to_metrics, metrics_to_ids, _counter) do + defp assign_metric_ids([], events_to_metrics, ids_to_metrics, _counter) do %{ events_to_metrics: events_to_metrics, - ids_to_metrics: ids_to_metrics, - metrics_to_ids: metrics_to_ids + ids_to_metrics: ids_to_metrics } end - defp assign_metric_ids([metric | rest], etm, itm, mti, counter) do + defp assign_metric_ids([metric | rest], etm, itm, counter) do %{event_name: event_name} = metric etm = @@ -228,9 +226,8 @@ defmodule Peep do end itm = Map.put(itm, counter, metric) - mti = Map.put(mti, metric, counter) - assign_metric_ids(rest, etm, itm, mti, counter - 1) + assign_metric_ids(rest, etm, itm, counter - 1) end # callbacks diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index fe70635..1f889fa 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -8,7 +8,6 @@ defmodule Peep.Persistent do :storage, events_to_metrics: %{}, ids_to_metrics: %{}, - metrics_to_ids: %{}, global_tags: %{} ]) @@ -20,8 +19,6 @@ defmodule Peep.Persistent do @typep events_to_metrics() :: %{ :telemetry.event_name() => [{Telemetry.Metrics.t(), non_neg_integer()}] } - @typep metrics_to_ids() :: %{Telemetry.Metrics.t() => Peep.metric_id()} - @type ids_to_metrics() :: %{Peep.metric_id() => Telemetry.Metrics.t()} @type t() :: record(:persistent, @@ -29,7 +26,6 @@ defmodule Peep.Persistent do storage: storage(), events_to_metrics: events_to_metrics(), ids_to_metrics: ids_to_metrics(), - metrics_to_ids: metrics_to_ids(), global_tags: map() ) @@ -56,8 +52,7 @@ defmodule Peep.Persistent do %{ events_to_metrics: events_to_metrics, - ids_to_metrics: ids_to_metrics, - metrics_to_ids: metrics_to_ids + ids_to_metrics: ids_to_metrics } = Peep.assign_metric_ids(metrics) persistent( @@ -65,7 +60,6 @@ defmodule Peep.Persistent do storage: storage, events_to_metrics: events_to_metrics, ids_to_metrics: ids_to_metrics, - metrics_to_ids: metrics_to_ids, global_tags: global_tags ) end diff --git a/test/peep_test.exs b/test/peep_test.exs index 59959e9..31bf11a 100644 --- a/test/peep_test.exs +++ b/test/peep_test.exs @@ -173,17 +173,14 @@ defmodule PeepTest do } expected_by_id = %{1 => c, 2 => s, 3 => d, 4 => l} - expected_by_metric = %{c => 1, s => 2, d => 3, l => 4} %{ events_to_metrics: actual_by_event, - ids_to_metrics: actual_by_id, - metrics_to_ids: actual_by_metric + ids_to_metrics: actual_by_id } = Peep.assign_metric_ids(metrics) assert actual_by_event == expected_by_event assert actual_by_id == expected_by_id - assert actual_by_metric == expected_by_metric end test "Non-numeric values are dropped" do diff --git a/test/shared/test_helpers.ex b/test/shared/test_helpers.ex index 7d55b1e..53bc491 100644 --- a/test/shared/test_helpers.ex +++ b/test/shared/test_helpers.ex @@ -7,11 +7,17 @@ defmodule Peep.Test do case Peep.Persistent.fetch(name) do Peep.Persistent.persistent( storage: {storage_mod, storage}, - metrics_to_ids: %{^metric => id} + ids_to_metrics: ids_to_metrics ) -> - storage - |> storage_mod.resolve() - |> storage_mod.insert_metric(id, metric, value, tags) + case find_metric_id(ids_to_metrics, metric) do + {:ok, id} -> + storage + |> storage_mod.resolve() + |> storage_mod.insert_metric(id, metric, value, tags) + + :error -> + nil + end _ -> nil @@ -20,6 +26,13 @@ defmodule Peep.Test do def insert_metric(_name, _metric, _value, _tags), do: nil + defp find_metric_id(ids_to_metrics, metric) do + Enum.find_value(ids_to_metrics, :error, fn + {id, ^metric} -> {:ok, id} + _ -> nil + end) + end + def fresh_id do :"#{System.unique_integer([:positive])}" end From 670db483d633e08af91552e2975fae94a30eeddd Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 12:43:57 -0400 Subject: [PATCH 13/23] Make Peep.Persistent.ids_to_metrics a tuple --- lib/peep.ex | 10 ++++------ lib/peep/persistent.ex | 2 +- lib/peep/storage/ets.ex | 6 +++--- lib/peep/storage/striped.ex | 2 +- test/peep_test.exs | 8 ++++---- test/shared/test_helpers.ex | 14 ++++++++++---- test/support/custom_storage.ex | 4 ++-- 7 files changed, 25 insertions(+), 21 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index e06291c..ba83a0f 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -201,15 +201,15 @@ defmodule Peep do assign_metric_ids( Enum.reverse(filtered_metrics), %{}, - %{}, - length(filtered_metrics) + [], + length(filtered_metrics) - 1 ) end defp assign_metric_ids([], events_to_metrics, ids_to_metrics, _counter) do %{ events_to_metrics: events_to_metrics, - ids_to_metrics: ids_to_metrics + ids_to_metrics: List.to_tuple(ids_to_metrics) } end @@ -225,9 +225,7 @@ defmodule Peep do Map.put(etm, event_name, [{metric, counter}]) end - itm = Map.put(itm, counter, metric) - - assign_metric_ids(rest, etm, itm, counter - 1) + assign_metric_ids(rest, etm, [metric | itm], counter - 1) end # callbacks diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index 1f889fa..9e92e1e 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -19,7 +19,7 @@ defmodule Peep.Persistent do @typep events_to_metrics() :: %{ :telemetry.event_name() => [{Telemetry.Metrics.t(), non_neg_integer()}] } - @type ids_to_metrics() :: %{Peep.metric_id() => Telemetry.Metrics.t()} + @type ids_to_metrics() :: tuple() @type t() :: record(:persistent, name: name(), diff --git a/lib/peep/storage/ets.ex b/lib/peep/storage/ets.ex index 54d6941..e48e025 100644 --- a/lib/peep/storage/ets.ex +++ b/lib/peep/storage/ets.ex @@ -128,17 +128,17 @@ defmodule Peep.Storage.ETS do end defp group_metric({{id, tags, _}, value}, itm, acc) do - %{^id => metric} = itm + metric = elem(itm, id) update_in(acc, [Access.key(metric, %{}), Access.key(tags, 0)], &(&1 + value)) end defp group_metric({{id, tags}, Storage.Atomics.atomic() = atomics}, itm, acc) do - %{^id => metric} = itm + metric = elem(itm, id) put_in(acc, [Access.key(metric, %{}), Access.key(tags)], Storage.Atomics.values(atomics)) end defp group_metric({{id, tags}, value}, itm, acc) do - %{^id => metric} = itm + metric = elem(itm, id) put_in(acc, [Access.key(metric, %{}), Access.key(tags)], value) end end diff --git a/lib/peep/storage/striped.ex b/lib/peep/storage/striped.ex index 976f156..9d941d0 100644 --- a/lib/peep/storage/striped.ex +++ b/lib/peep/storage/striped.ex @@ -136,7 +136,7 @@ defmodule Peep.Storage.Striped do end defp add_metric({{id, _tags}, _value} = kv, itm, acc) do - %{^id => metric} = itm + metric = elem(itm, id) add_metric2(kv, metric, acc) end diff --git a/test/peep_test.exs b/test/peep_test.exs index 31bf11a..db79a14 100644 --- a/test/peep_test.exs +++ b/test/peep_test.exs @@ -167,12 +167,12 @@ defmodule PeepTest do ] expected_by_event = %{ - [:one] => [{c, 1}, {s, 2}], - [:three] => [{d, 3}], - [:five] => [{l, 4}] + [:one] => [{c, 0}, {s, 1}], + [:three] => [{d, 2}], + [:five] => [{l, 3}] } - expected_by_id = %{1 => c, 2 => s, 3 => d, 4 => l} + expected_by_id = {c, s, d, l} %{ events_to_metrics: actual_by_event, diff --git a/test/shared/test_helpers.ex b/test/shared/test_helpers.ex index 53bc491..412e955 100644 --- a/test/shared/test_helpers.ex +++ b/test/shared/test_helpers.ex @@ -27,10 +27,16 @@ defmodule Peep.Test do def insert_metric(_name, _metric, _value, _tags), do: nil defp find_metric_id(ids_to_metrics, metric) do - Enum.find_value(ids_to_metrics, :error, fn - {id, ^metric} -> {:ok, id} - _ -> nil - end) + find_metric_id(ids_to_metrics, metric, tuple_size(ids_to_metrics), 0) + end + + defp find_metric_id(_tuple, _metric, size, size), do: :error + + defp find_metric_id(tuple, metric, size, idx) do + case elem(tuple, idx) do + ^metric -> {:ok, idx} + _ -> find_metric_id(tuple, metric, size, idx + 1) + end end def fresh_id do diff --git a/test/support/custom_storage.ex b/test/support/custom_storage.ex index b131329..0781a43 100644 --- a/test/support/custom_storage.ex +++ b/test/support/custom_storage.ex @@ -153,7 +153,7 @@ defmodule CustomStorage do end defp group_metric({{id, tags}, Storage.Atomics.atomic() = atomics}, itm, acc) do - %{^id => metric} = itm + metric = elem(itm, id) values = Storage.Atomics.values(atomics) update_in(acc, [Access.key(metric, %{}), Access.key(tags, %{})], fn m1 -> @@ -162,7 +162,7 @@ defmodule CustomStorage do end defp group_metric({{id, tags}, value}, itm, acc) do - %{^id => metric} = itm + metric = elem(itm, id) case metric do %Metrics.Counter{} -> From 07db85bf8ffd3ae26a2c25c99e0619943bce41a9 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 12:47:31 -0400 Subject: [PATCH 14/23] Simplify Peep.assign_metric_ids --- lib/peep.ex | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index ba83a0f..d93a338 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -196,38 +196,25 @@ defmodule Peep do end def assign_metric_ids(metrics) do - filtered_metrics = Enum.filter(metrics, &allow_metric?/1) - - assign_metric_ids( - Enum.reverse(filtered_metrics), - %{}, - [], - length(filtered_metrics) - 1 - ) - end + indexed = + metrics + |> Enum.filter(&allow_metric?/1) + |> Enum.with_index() + + events_to_metrics = + Enum.group_by(indexed, fn {metric, _id} -> metric.event_name end) + + ids_to_metrics = + indexed + |> Enum.map(fn {metric, _id} -> metric end) + |> List.to_tuple() - defp assign_metric_ids([], events_to_metrics, ids_to_metrics, _counter) do %{ events_to_metrics: events_to_metrics, - ids_to_metrics: List.to_tuple(ids_to_metrics) + ids_to_metrics: ids_to_metrics } end - defp assign_metric_ids([metric | rest], etm, itm, counter) do - %{event_name: event_name} = metric - - etm = - case etm do - %{^event_name => metrics} -> - %{etm | event_name => [{metric, counter} | metrics]} - - _ -> - Map.put(etm, event_name, [{metric, counter}]) - end - - assign_metric_ids(rest, etm, [metric | itm], counter - 1) - end - # callbacks @impl true From d592e0f9af4f82405e106f6658503c3d9314a5ab Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 12:55:53 -0400 Subject: [PATCH 15/23] Make Peep.assign_metric_ids private --- lib/peep.ex | 20 -------------------- lib/peep/persistent.ex | 22 ++++++++++++++++++---- test/peep_test.exs | 26 -------------------------- 3 files changed, 18 insertions(+), 50 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index d93a338..50f68cc 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -195,26 +195,6 @@ defmodule Peep do true end - def assign_metric_ids(metrics) do - indexed = - metrics - |> Enum.filter(&allow_metric?/1) - |> Enum.with_index() - - events_to_metrics = - Enum.group_by(indexed, fn {metric, _id} -> metric.event_name end) - - ids_to_metrics = - indexed - |> Enum.map(fn {metric, _id} -> metric end) - |> List.to_tuple() - - %{ - events_to_metrics: events_to_metrics, - ids_to_metrics: ids_to_metrics - } - end - # callbacks @impl true diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index 9e92e1e..da932b1 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -50,10 +50,7 @@ defmodule Peep.Persistent do {mod, mod.new(opts)} end - %{ - events_to_metrics: events_to_metrics, - ids_to_metrics: ids_to_metrics - } = Peep.assign_metric_ids(metrics) + {events_to_metrics, ids_to_metrics} = assign_metric_ids(metrics) persistent( name: name, @@ -101,6 +98,23 @@ defmodule Peep.Persistent do end end + defp assign_metric_ids(metrics) do + indexed = + metrics + |> Enum.filter(&Peep.allow_metric?/1) + |> Enum.with_index() + + events_to_metrics = + Enum.group_by(indexed, fn {metric, _id} -> metric.event_name end) + + ids_to_metrics = + indexed + |> Enum.map(fn {metric, _id} -> metric end) + |> List.to_tuple() + + {events_to_metrics, ids_to_metrics} + end + defp key(name) when is_atom(name) do {Peep, name} end diff --git a/test/peep_test.exs b/test/peep_test.exs index db79a14..eb3a779 100644 --- a/test/peep_test.exs +++ b/test/peep_test.exs @@ -157,32 +157,6 @@ defmodule PeepTest do assert Peep.Persistent.fetch(name) == nil end - test "assign_ids" do - metrics = - [c, s, d, l] = [ - Metrics.counter("one.two"), - Metrics.sum("one.two"), - Metrics.distribution("three.four"), - Metrics.last_value("five.six") - ] - - expected_by_event = %{ - [:one] => [{c, 0}, {s, 1}], - [:three] => [{d, 2}], - [:five] => [{l, 3}] - } - - expected_by_id = {c, s, d, l} - - %{ - events_to_metrics: actual_by_event, - ids_to_metrics: actual_by_id - } = Peep.assign_metric_ids(metrics) - - assert actual_by_event == expected_by_event - assert actual_by_id == expected_by_id - end - test "Non-numeric values are dropped" do name = :"#{__MODULE__}_non_numeric_values" From bb719743f3b3b261db408bbe70a7af804a7fb148 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 13:04:17 -0400 Subject: [PATCH 16/23] Make Peep.EventHandler.attach/1 pure --- lib/peep.ex | 7 +++---- lib/peep/event_handler.ex | 13 +++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index 50f68cc..f8bb239 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -202,11 +202,10 @@ defmodule Peep do Process.flag(:trap_exit, true) name = options.name - :ok = - Peep.Persistent.new(options) - |> Peep.Persistent.store() + peep_persistent = Peep.Persistent.new(options) + :ok = Peep.Persistent.store(peep_persistent) - handler_ids = EventHandler.attach(name) + handler_ids = EventHandler.attach(peep_persistent) :telemetry.persist() statsd_opts = options.statsd diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index b1b49af..ee515fc 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -11,12 +11,13 @@ defmodule Peep.EventHandler do alias Peep.Handler.Config alias Peep.Handler.Metric - def attach(name) do - persistent( - events_to_metrics: metrics_by_event, - storage: {storage_mod, storage} - ) = Peep.Persistent.fetch(name) - + def attach( + persistent( + name: name, + events_to_metrics: metrics_by_event, + storage: {storage_mod, storage} + ) + ) do for {event_name, metrics} <- metrics_by_event do handler_id = handler_id(event_name, name) From 889f9dccf87936d8623096855ec26c08008f5e23 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 13:09:37 -0400 Subject: [PATCH 17/23] Remove :lists.reverse call in compute_tags/4 --- lib/peep/event_handler.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index ee515fc..c2cb760 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -54,16 +54,16 @@ defmodule Peep.EventHandler do ) ) do resolved = storage_mod.resolve(storage) - tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, []) + tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns) - 1, []) store_metrics(metrics, measurements, metadata, resolved, tag_results) end - defp compute_tags(_tag_fns, _metadata, size, size, acc) do - acc |> :lists.reverse() |> List.to_tuple() + defp compute_tags(_tag_fns, _metadata, -1, acc) do + List.to_tuple(acc) end - defp compute_tags(tag_fns, metadata, size, idx, acc) do - compute_tags(tag_fns, metadata, size, idx + 1, [elem(tag_fns, idx).(metadata) | acc]) + defp compute_tags(tag_fns, metadata, idx, acc) do + compute_tags(tag_fns, metadata, idx - 1, [elem(tag_fns, idx).(metadata) | acc]) end defp store_metrics([], _measurements, _metadata, _data, _tag_results), do: :ok From 1c8e26524fe4227d1cae2534dd0f7968857608e5 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 14:20:09 -0400 Subject: [PATCH 18/23] Remove Peep.Persistent.fast_fetch/1 --- lib/peep/persistent.ex | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index da932b1..a1bef6f 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -92,12 +92,6 @@ defmodule Peep.Persistent do @spec ids_to_metrics(t()) :: ids_to_metrics() def ids_to_metrics(persistent(ids_to_metrics: itm)), do: itm - defmacro fast_fetch(name) when is_atom(name) do - quote do - :persistent_term.get(unquote(key(name)), nil) - end - end - defp assign_metric_ids(metrics) do indexed = metrics From 81d259fee1bc31a15361c51e0abc54cb6cbdcc69 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Fri, 27 Mar 2026 15:07:05 -0400 Subject: [PATCH 19/23] Remove unused function head/branch --- lib/peep/event_handler.ex | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index c2cb760..4f2bb77 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -169,10 +169,6 @@ defmodule Peep.EventHandler do defp keep?(keep, metadata, _measurement) when is_function(keep, 1), do: keep.(metadata) defp keep?(_keep, _metadata, _measurement), do: true - defp fetch_measurement(%Telemetry.Metrics.Counter{}, _measurements, _metadata) do - 1 - end - defp fetch_measurement(measurement, measurements, metadata) do case measurement do nil -> From 02e474ca22868eba02922290f2b977f718483d05 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Mon, 30 Mar 2026 10:16:49 -0400 Subject: [PATCH 20/23] Fix erroneous type specs --- lib/peep.ex | 2 +- lib/peep/persistent.ex | 2 +- lib/peep/storage.ex | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index f8bb239..763e4f6 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -85,7 +85,7 @@ defmodule Peep do statsd_state: nil end - @type metric_id() :: pos_integer() + @type metric_id() :: non_neg_integer() def child_spec(options) do %{id: peep_name!(options), start: {__MODULE__, :start_link, [options]}} diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index a1bef6f..c5ae79d 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -7,7 +7,7 @@ defmodule Peep.Persistent do :name, :storage, events_to_metrics: %{}, - ids_to_metrics: %{}, + ids_to_metrics: {}, global_tags: %{} ]) diff --git a/lib/peep/storage.ex b/lib/peep/storage.ex index d189ee5..0bfb8c0 100644 --- a/lib/peep/storage.ex +++ b/lib/peep/storage.ex @@ -31,7 +31,7 @@ defmodule Peep.Storage do This is intended to improve situations where Peep emits metrics whose tags have high cardinality. """ - @callback prune_tags(Enumerable.t(%{Metrics.tag() => term()}), map()) :: :ok + @callback prune_tags(term(), [%{Metrics.tag() => term()}]) :: :ok @doc """ Resolves storage to its per-scheduler form for the current scheduler. From 30bdaaf2fe9a94863fbe3e4be9894e9663ea0558 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Mon, 30 Mar 2026 10:16:59 -0400 Subject: [PATCH 21/23] Simplify statsd option checking --- lib/peep.ex | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/peep.ex b/lib/peep.ex index 763e4f6..60e2673 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -209,14 +209,10 @@ defmodule Peep do :telemetry.persist() statsd_opts = options.statsd - statsd_flush_interval = statsd_opts[:flush_interval_ms] - - if statsd_flush_interval != nil do - set_statsd_timer(statsd_flush_interval) - end statsd_state = - if options.statsd do + if statsd_opts do + set_statsd_timer(statsd_opts.flush_interval_ms) Statsd.make_state(statsd_opts) else nil From 2a312ea40d9d23737012da96b661b985dff965ab Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Mon, 30 Mar 2026 10:22:24 -0400 Subject: [PATCH 22/23] Fix bug in StatsD reporting and add regression test --- lib/peep/statsd/cache.ex | 2 +- test/statsd_test.exs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/peep/statsd/cache.ex b/lib/peep/statsd/cache.ex index 9773a7b..f697506 100644 --- a/lib/peep/statsd/cache.ex +++ b/lib/peep/statsd/cache.ex @@ -88,7 +88,7 @@ defmodule Peep.Statsd.Cache do formatted_tags = format_tags(tags) to_add = - for {bucket, count} <- buckets, bucket != :sum, bucket != "+Inf", into: %{} do + for {bucket, count} <- buckets, bucket != :sum, bucket != :infinity, into: %{} do formatted_bucket = to_string(bucket) {{:dist, formatted_name, formatted_tags, formatted_bucket}, count} end diff --git a/test/statsd_test.exs b/test/statsd_test.exs index b3e1c00..cb9697c 100644 --- a/test/statsd_test.exs +++ b/test/statsd_test.exs @@ -214,6 +214,23 @@ defmodule StatsdTest do assert parse_packets(packets) == parse_packets(expected) end + test "#{impl} - above-max distribution samples are excluded from output" do + dist = Metrics.distribution("statsd.test.dist", reporter_options: [max_value: 100]) + name = Peep.Test.start_peep!(metrics: [dist], storage: unquote(impl)) + + # Insert one value within range, two above max_value + Peep.Test.insert_metric(name, dist, 50, %{}) + Peep.Test.insert_metric(name, dist, 500, %{}) + Peep.Test.insert_metric(name, dist, 5000, %{}) + + packets = get_statsd_packets(name, %{formatter: :standard}) + parsed = parse_packets(packets) + + # Only the in-range sample should produce a bucket line + assert MapSet.size(parsed) == 1 + assert [%{name: "statsd.test.dist", type: :dist}] = MapSet.to_list(parsed) + end + test "#{impl} - metrics are batched according to mtu option" do sum = fn i -> Metrics.sum("statsd.test.sum.#{i}") end last_value = fn i -> Metrics.last_value("statsd.test.gauge.#{i}") end From f6c866363839ee7eb689a7b387c8f0c2e8e39b34 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Tue, 31 Mar 2026 11:20:40 -0400 Subject: [PATCH 23/23] Make Peep.Atomics.insert/2 a macro --- lib/peep/storage/atomics.ex | 38 ++++++++++++++++++------------------- lib/peep/storage/striped.ex | 2 ++ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/peep/storage/atomics.ex b/lib/peep/storage/atomics.ex index 8f1b54d..9c693c1 100644 --- a/lib/peep/storage/atomics.ex +++ b/lib/peep/storage/atomics.ex @@ -27,29 +27,27 @@ defmodule Peep.Storage.Atomics do ) end - def insert( - atomic( - bucket_calculator: {module, config}, - buckets: buckets, - sum: sum, - num_buckets: num_buckets, - above_max: above_max - ), - value - ) do - # :atomics indexes are 1-based. - # 1 is added for when calculate_bucket/2 returns 0 - bucket_idx = module.bucket_for(value, config) + 1 + defmacro insert(atomics_expr, value_expr) do + quote do + {:atomic, num_buckets, buckets, sum, above_max, {module, config}} + = unquote(atomics_expr) - case bucket_idx > num_buckets do - true -> - :atomics.add(above_max, 1, 1) + value = unquote(value_expr) - false -> - :atomics.add(buckets, bucket_idx, 1) - end + # :atomics indexes are 1-based. + # 1 is added for when calculate_bucket/2 returns 0 + bucket_idx = module.bucket_for(value, config) + 1 + + case bucket_idx > num_buckets do + true -> + :atomics.add(above_max, 1, 1) - :atomics.add(sum, 1, round(value)) + false -> + :atomics.add(buckets, bucket_idx, 1) + end + + :atomics.add(sum, 1, round(value)) + end end def values( diff --git a/lib/peep/storage/striped.ex b/lib/peep/storage/striped.ex index 9d941d0..5164150 100644 --- a/lib/peep/storage/striped.ex +++ b/lib/peep/storage/striped.ex @@ -6,6 +6,8 @@ defmodule Peep.Storage.Striped do memory usage. Recommended when executing thousands of metrics per second. """ + require Peep.Storage.Atomics + alias Telemetry.Metrics alias Peep.Storage