diff --git a/bench/event_handler_bench.exs b/bench/event_handler_bench.exs new file mode 100644 index 0000000..9ebb930 --- /dev/null +++ b/bench/event_handler_bench.exs @@ -0,0 +1,46 @@ +import Telemetry.Metrics + +metrics = [ + counter("bench.event.count", tags: [:tag_a]), + counter("bench.event.count2", tags: [:tag_a, :tag_b]), + sum("bench.event.payload_size", tags: [:tag_a]), + last_value("bench.event.queue_depth", tags: [:tag_a]), + distribution("bench.event.duration", + tags: [:tag_a], + reporter_options: [max_value: 1_000_000, bucket_variability: 0.3] + ), + distribution("bench.event.latency", + tags: [:tag_a, :tag_b], + reporter_options: [max_value: 65536, bucket_variability: 0.3] + ) +] + +{:ok, _pid} = Peep.start_link(name: :bench, metrics: metrics, storage: :striped) + +measurements = %{ + count: 1, + payload_size: 512, + queue_depth: 42, + duration: 1500, + latency: 350 +} + +metadata = %{tag_a: "exchange_1", tag_b: "channel_2"} +event = [:bench, :event] + +parallel = String.to_integer(System.get_env("BENCH_PARALLEL", "1")) +IO.puts("Running with parallel: #{parallel}") + +Benchee.run( + %{ + "telemetry_execute" => fn -> + :telemetry.execute(event, measurements, metadata) + end + }, + warmup: 2, + time: 5, + memory_time: 2, + parallel: parallel +) + +GenServer.stop(:bench) diff --git a/lib/peep.ex b/lib/peep.ex index 67126a4..9cab108 100644 --- a/lib/peep.ex +++ b/lib/peep.ex @@ -81,6 +81,7 @@ defmodule Peep do defstruct name: nil, interval: nil, handler_ids: nil, + event_keys: nil, statsd_opts: nil, statsd_state: nil end @@ -107,7 +108,8 @@ defmodule Peep do storage: {storage_mod, storage}, metrics_to_ids: %{^metric => id} ) -> - storage_mod.insert_metric(storage, id, metric, value, tags) + resolved = storage_mod.resolve(storage) + storage_mod.insert_metric(resolved, id, metric, value, tags) _ -> nil @@ -258,7 +260,7 @@ defmodule Peep do Peep.Persistent.new(options) |> Peep.Persistent.store() - handler_ids = EventHandler.attach(name) + {handler_ids, event_keys} = EventHandler.attach(name) statsd_opts = options.statsd statsd_flush_interval = statsd_opts[:flush_interval_ms] @@ -277,6 +279,7 @@ defmodule Peep do state = %State{ name: name, handler_ids: handler_ids, + event_keys: event_keys, statsd_opts: statsd_opts, statsd_state: statsd_state } @@ -312,9 +315,9 @@ defmodule Peep do end @impl true - def terminate(_reason, %{name: name, handler_ids: handler_ids}) do + def terminate(_reason, %{name: name, handler_ids: handler_ids, event_keys: event_keys}) do Peep.Persistent.erase(name) - EventHandler.detach(handler_ids) + EventHandler.detach(handler_ids, event_keys) end # private diff --git a/lib/peep/event_handler.ex b/lib/peep/event_handler.ex index 36c9fdc..0e5b197 100644 --- a/lib/peep/event_handler.ex +++ b/lib/peep/event_handler.ex @@ -1,30 +1,39 @@ defmodule Peep.EventHandler do @moduledoc false - @compile {:inline, keep?: 3, meta: 3, fetch_measurement: 3} + @compile {:inline, keep?: 3, fetch_measurement: 3} import Peep.Persistent, only: [persistent: 1] def attach(name) do - persistent(events_to_metrics: metrics_by_event) = Peep.Persistent.fetch(name) + persistent( + events_to_metrics: metrics_by_event, + storage: storage + ) = Peep.Persistent.fetch(name) - for {event_name, _metrics} <- metrics_by_event do - handler_id = handler_id(event_name, name) + pairs = + for {event_name, metrics} <- metrics_by_event do + event_key = :erlang.unique_integer([:positive, :monotonic]) + :persistent_term.put(event_key, {storage, metrics}) + handler_id = handler_id(event_name, name) + + :ok = + :telemetry.attach( + handler_id, + event_name, + &__MODULE__.handle_event/4, + event_key + ) - :ok = - :telemetry.attach( - handler_id, - event_name, - &__MODULE__.handle_event/4, - name - ) + {handler_id, event_key} + end - handler_id - end + Enum.unzip(pairs) end - def detach(handler_ids) do + def detach(handler_ids, event_keys) do for id <- handler_ids, do: :telemetry.detach(id) + for key <- event_keys, do: :persistent_term.erase(key) :ok end @@ -32,43 +41,131 @@ defmodule Peep.EventHandler do {__MODULE__, peep_name, event_name} end - def handle_event(event, measurements, metadata, name) do - persistent( - events_to_metrics: %{^event => metrics}, - storage: {storage_mod, storage} - ) = Peep.Persistent.fetch(name) + def precompute_metrics(metrics, {storage_mod, _}) do + {tag_map, _} = + Enum.reduce(metrics, {%{}, 0}, fn {metric, _id}, {map, next_idx} -> + key = {metric.tag_values, metric.tags} + + case map do + %{^key => _} -> {map, next_idx} + _ -> {Map.put(map, key, next_idx), next_idx + 1} + end + end) + + tag_fns = + tag_map + |> Enum.sort_by(fn {_key, idx} -> idx end) + |> Enum.map(fn {{tag_values, tags}, _idx} -> compile_tag_fn(tag_values, tags) end) + |> List.to_tuple() + + metrics_list = + Enum.map(metrics, fn {metric, id} -> + %{ + measurement: measurement, + tag_values: tag_values, + tags: tags, + keep: keep + } = metric + + tag_idx = Map.fetch!(tag_map, {tag_values, tags}) + keep_val = if is_nil(keep), do: :no_keep, else: keep + + insert_fn = fn data, value, tags -> + storage_mod.insert_metric(data, id, metric, value, tags) + end + + {metric_type(metric), insert_fn, keep_val, measurement, tag_idx} + end) + + {tag_fns, metrics_list} + end + + defp metric_type(%Telemetry.Metrics.Counter{}), do: :counter + defp metric_type(_), do: :other + + defp compile_tag_fn(_tag_values, []), do: fn _metadata -> %{} end + defp compile_tag_fn(_tag_values, tags) when is_function(tags, 1), do: tags + defp compile_tag_fn(tag_values, keys), do: fn metadata -> Map.take(tag_values.(metadata), keys) end + + def handle_event(_event, measurements, metadata, event_key) do + {{storage_mod, storage}, {tag_fns, metrics}} = :persistent_term.get(event_key) + resolved = storage_mod.resolve(storage) + tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, []) + store_metrics(metrics, measurements, metadata, resolved, tag_results) + end + + defp compute_tags(_tag_fns, _metadata, size, size, acc) do + acc |> :lists.reverse() |> List.to_tuple() + end + + defp compute_tags(tag_fns, metadata, size, idx, acc) do + compute_tags(tag_fns, metadata, size, idx + 1, [elem(tag_fns, idx).(metadata) | acc]) + end + + defp store_metrics([], _measurements, _metadata, _data, _tag_results), do: :ok + + defp store_metrics( + [{:counter, insert_fn, :no_keep, _measurement, tag_idx} | rest], + measurements, + metadata, + data, + tag_results + ) do + insert_fn.(data, 1, elem(tag_results, tag_idx)) + store_metrics(rest, measurements, metadata, data, tag_results) + end + + defp store_metrics( + [{_type, insert_fn, :no_keep, measurement, tag_idx} | rest], + measurements, + metadata, + data, + tag_results + ) do + case fetch_measurement(measurement, measurements, metadata) do + value when is_number(value) -> + insert_fn.(data, value, elem(tag_results, tag_idx)) + + _ -> + nil + end - store_metrics(metrics, measurements, metadata, storage_mod, storage) + store_metrics(rest, measurements, metadata, data, tag_results) end - defp store_metrics([], _measurements, _metadata, _mod, _data), do: :ok + defp store_metrics( + [{:counter, insert_fn, keep, _measurement, tag_idx} | rest], + measurements, + metadata, + data, + tag_results + ) do + if keep?(keep, metadata, nil) do + insert_fn.(data, 1, elem(tag_results, tag_idx)) + end - defp store_metrics([{metric, id} | rest], measurements, metadata, mod, data) do - %{ - measurement: measurement, - tag_values: tag_values, - tags: tags, - keep: keep - } = metric + store_metrics(rest, measurements, metadata, data, tag_results) + end + defp store_metrics( + [{_type, insert_fn, keep, measurement, tag_idx} | rest], + measurements, + metadata, + data, + tag_results + ) do if keep?(keep, metadata, measurement) do # credo:disable-for-next-line Credo.Check.Refactor.Nesting case fetch_measurement(measurement, measurements, metadata) do value when is_number(value) -> - mod.insert_metric( - data, - id, - metric, - value, - meta(metadata, tag_values, tags) - ) + insert_fn.(data, value, elem(tag_results, tag_idx)) _ -> nil end end - store_metrics(rest, measurements, metadata, mod, data) + store_metrics(rest, measurements, metadata, data, tag_results) end defp keep?(keep, metadata, measurement) when is_function(keep, 2), @@ -77,15 +174,6 @@ defmodule Peep.EventHandler do defp keep?(keep, metadata, _measurement) when is_function(keep, 1), do: keep.(metadata) defp keep?(_keep, _metadata, _measurement), do: true - # When selected list is empty, just return empty map - defp meta(_tags, _map, []), do: %{} - defp meta(meta, _map, tags) when is_function(tags, 1), do: tags.(meta) - defp meta(tags, map, keys), do: Map.take(map.(tags), keys) - - defp fetch_measurement(%Telemetry.Metrics.Counter{}, _measurements, _metadata) do - 1 - end - defp fetch_measurement(measurement, measurements, metadata) do case measurement do nil -> diff --git a/lib/peep/persistent.ex b/lib/peep/persistent.ex index ae6af0a..5f8a132 100644 --- a/lib/peep/persistent.ex +++ b/lib/peep/persistent.ex @@ -60,10 +60,15 @@ defmodule Peep.Persistent do metrics_to_ids: metrics_to_ids } = Peep.assign_metric_ids(metrics) + precomputed_events = + Map.new(events_to_metrics, fn {event, metric_list} -> + {event, Peep.EventHandler.precompute_metrics(metric_list, storage)} + end) + persistent( name: name, storage: storage, - events_to_metrics: events_to_metrics, + events_to_metrics: precomputed_events, ids_to_metrics: ids_to_metrics, metrics_to_ids: metrics_to_ids, global_tags: global_tags diff --git a/lib/peep/storage.ex b/lib/peep/storage.ex index af5b27e..5990cbc 100644 --- a/lib/peep/storage.ex +++ b/lib/peep/storage.ex @@ -16,6 +16,12 @@ defmodule Peep.Storage do """ @callback storage_size(term()) :: %{size: non_neg_integer(), memory: non_neg_integer()} + @doc """ + Resolves storage to its per-scheduler form for the current scheduler. + Called once per event to avoid redundant scheduler lookups across metrics. + """ + @callback resolve(term()) :: term() + @doc """ Stores a sample metric """ diff --git a/lib/peep/storage/ets.ex b/lib/peep/storage/ets.ex index 64b27db..54d6941 100644 --- a/lib/peep/storage/ets.ex +++ b/lib/peep/storage/ets.ex @@ -31,6 +31,9 @@ defmodule Peep.Storage.ETS do :ets.new(__MODULE__, opts) end + @impl true + def resolve(tid), do: tid + @impl true def storage_size(tid) do %{ diff --git a/lib/peep/storage/striped.ex b/lib/peep/storage/striped.ex index 5f28b0a..976f156 100644 --- a/lib/peep/storage/striped.ex +++ b/lib/peep/storage/striped.ex @@ -45,27 +45,29 @@ defmodule Peep.Storage.Striped do end @impl true - def insert_metric(tids, id, %Metrics.Counter{}, _value, %{} = tags) do - tid = get_tid(tids) + def resolve(tids) do + scheduler_id = :erlang.system_info(:scheduler_id) + elem(tids, scheduler_id - 1) + end + + @impl true + def insert_metric(tid, id, %Metrics.Counter{}, _value, %{} = tags) do key = {id, tags} :ets.update_counter(tid, key, {2, 1}, {key, 0}) end - def insert_metric(tids, id, %Metrics.Sum{}, value, %{} = tags) do - tid = get_tid(tids) + def insert_metric(tid, id, %Metrics.Sum{}, value, %{} = tags) do key = {id, tags} :ets.update_counter(tid, key, {2, value}, {key, 0}) end - def insert_metric(tids, id, %Metrics.LastValue{}, value, %{} = tags) do - tid = get_tid(tids) + def insert_metric(tid, id, %Metrics.LastValue{}, value, %{} = tags) do now = System.monotonic_time() key = {id, tags} :ets.insert(tid, {key, {now, value}}) end - def insert_metric(tids, id, %Metrics.Distribution{} = metric, value, %{} = tags) do - tid = get_tid(tids) + def insert_metric(tid, id, %Metrics.Distribution{} = metric, value, %{} = tags) do key = {id, tags} atomics = @@ -93,11 +95,6 @@ defmodule Peep.Storage.Striped do Storage.Atomics.insert(atomics, value) end - defp get_tid(tids) do - scheduler_id = :erlang.system_info(:scheduler_id) - elem(tids, scheduler_id - 1) - end - @impl true def prune_tags(tids, patterns) do match_spec = diff --git a/mix.exs b/mix.exs index 9d26128..957da1b 100644 --- a/mix.exs +++ b/mix.exs @@ -44,6 +44,7 @@ defmodule Peep.MixProject do {:inch_ex, "~> 2.0", only: [:dev, :test], runtime: false}, {:ex_doc, "~> 0.34", only: [:dev], runtime: false}, {:nimble_parsec, "~> 1.4", only: [:dev, :test], runtime: false}, + {:benchee, "~> 1.3", only: [:dev], runtime: false}, {:plug_cowboy, "~> 2.7", only: [:test]}, # Optional dependencies diff --git a/mix.lock b/mix.lock index 6cce936..ff925cc 100644 --- a/mix.lock +++ b/mix.lock @@ -1,10 +1,12 @@ %{ "bandit": {:hex, :bandit, "1.7.0", "d1564f30553c97d3e25f9623144bb8df11f3787a26733f00b21699a128105c0c", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3e2f7a98c7a11f48d9d8c037f7177cd39778e74d55c7af06fe6227c742a8168a"}, + "benchee": {:hex, :benchee, "1.5.0", "4d812c31d54b0ec0167e91278e7de3f596324a78a096fd3d0bea68bb0c513b10", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.1", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "5b075393aea81b8ae74eadd1c28b1d87e8a63696c649d8293db7c4df3eb67535"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cowboy": {:hex, :cowboy, "2.13.0", "09d770dd5f6a22cc60c071f432cd7cb87776164527f205c5a6b0f24ff6b38990", [:make, :rebar3], [{:cowlib, ">= 2.14.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "e724d3a70995025d654c1992c7b11dbfea95205c047d86ff9bf1cda92ddc5614"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.15.0", "3c97a318a933962d1c12b96ab7c1d728267d2c523c25a5b57b0f93392b6e9e25", [:make, :rebar3], [], "hexpm", "4f00c879a64b4fe7c8fcb42a4281925e9ffdb928820b03c3ad325a617e857532"}, "credo": {:hex, :credo, "1.7.12", "9e3c20463de4b5f3f23721527fcaf16722ec815e70ff6c60b86412c695d426c1", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8493d45c656c5427d9c729235b99d498bd133421f3e0a683e5c1b561471291e5"}, + "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, @@ -23,6 +25,7 @@ "plug_cowboy": {:hex, :plug_cowboy, "2.7.4", "729c752d17cf364e2b8da5bdb34fb5804f56251e88bb602aff48ae0bd8673d11", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "9b85632bd7012615bae0a5d70084deb1b25d2bcbb32cab82d1e9a1e023168aa3"}, "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "ranch": {:hex, :ranch, "2.2.0", "25528f82bc8d7c6152c57666ca99ec716510fe0925cb188172f41ce93117b1b0", [:make, :rebar3], [], "hexpm", "fa0b99a1780c80218a4197a59ea8d3bdae32fbff7e88527d7d8a4787eff4f8e7"}, + "statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "thousand_island": {:hex, :thousand_island, "1.3.14", "ad45ebed2577b5437582bcc79c5eccd1e2a8c326abf6a3464ab6c06e2055a34a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d0d24a929d31cdd1d7903a4fe7f2409afeedff092d277be604966cd6aa4307ef"}, diff --git a/test/support/custom_storage.ex b/test/support/custom_storage.ex index 3a82d7b..8ba2485 100644 --- a/test/support/custom_storage.ex +++ b/test/support/custom_storage.ex @@ -13,6 +13,9 @@ defmodule CustomStorage do alias Telemetry.Metrics alias Peep.Storage + @impl true + def resolve(agents), do: agents + @impl true @spec new(non_neg_integer) :: tuple def new(n_agents) do