Skip to content
46 changes: 46 additions & 0 deletions bench/event_handler_bench.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Telemetry.Metrics

metrics = [
counter("bench.event.count", tags: [:tag_a]),
counter("bench.event.count2", tags: [:tag_a, :tag_b]),
sum("bench.event.payload_size", tags: [:tag_a]),
last_value("bench.event.queue_depth", tags: [:tag_a]),
distribution("bench.event.duration",
tags: [:tag_a],
reporter_options: [max_value: 1_000_000, bucket_variability: 0.3]
),
distribution("bench.event.latency",
tags: [:tag_a, :tag_b],
reporter_options: [max_value: 65536, bucket_variability: 0.3]
)
]

{:ok, _pid} = Peep.start_link(name: :bench, metrics: metrics, storage: :striped)

measurements = %{
count: 1,
payload_size: 512,
queue_depth: 42,
duration: 1500,
latency: 350
}

metadata = %{tag_a: "exchange_1", tag_b: "channel_2"}
event = [:bench, :event]

parallel = String.to_integer(System.get_env("BENCH_PARALLEL", "1"))
IO.puts("Running with parallel: #{parallel}")

Benchee.run(
%{
"telemetry_execute" => fn ->
:telemetry.execute(event, measurements, metadata)
end
},
warmup: 2,
time: 5,
memory_time: 2,
parallel: parallel
)

GenServer.stop(:bench)
11 changes: 7 additions & 4 deletions lib/peep.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ defmodule Peep do
defstruct name: nil,
interval: nil,
handler_ids: nil,
event_keys: nil,
statsd_opts: nil,
statsd_state: nil
end
Expand All @@ -107,7 +108,8 @@ defmodule Peep do
storage: {storage_mod, storage},
metrics_to_ids: %{^metric => id}
) ->
storage_mod.insert_metric(storage, id, metric, value, tags)
resolved = storage_mod.resolve(storage)
storage_mod.insert_metric(resolved, id, metric, value, tags)

_ ->
nil
Expand Down Expand Up @@ -258,7 +260,7 @@ defmodule Peep do
Peep.Persistent.new(options)
|> Peep.Persistent.store()

handler_ids = EventHandler.attach(name)
{handler_ids, event_keys} = EventHandler.attach(name)

statsd_opts = options.statsd
statsd_flush_interval = statsd_opts[:flush_interval_ms]
Expand All @@ -277,6 +279,7 @@ defmodule Peep do
state = %State{
name: name,
handler_ids: handler_ids,
event_keys: event_keys,
statsd_opts: statsd_opts,
statsd_state: statsd_state
}
Expand Down Expand Up @@ -312,9 +315,9 @@ defmodule Peep do
end

@impl true
def terminate(_reason, %{name: name, handler_ids: handler_ids}) do
def terminate(_reason, %{name: name, handler_ids: handler_ids, event_keys: event_keys}) do
Peep.Persistent.erase(name)
EventHandler.detach(handler_ids)
EventHandler.detach(handler_ids, event_keys)
end

# private
Expand Down
178 changes: 133 additions & 45 deletions lib/peep/event_handler.ex
Original file line number Diff line number Diff line change
@@ -1,74 +1,171 @@
defmodule Peep.EventHandler do
@moduledoc false

@compile {:inline, keep?: 3, meta: 3, fetch_measurement: 3}
@compile {:inline, keep?: 3, fetch_measurement: 3}

import Peep.Persistent, only: [persistent: 1]

def attach(name) do
persistent(events_to_metrics: metrics_by_event) = Peep.Persistent.fetch(name)
persistent(
events_to_metrics: metrics_by_event,
storage: storage
) = Peep.Persistent.fetch(name)

for {event_name, _metrics} <- metrics_by_event do
handler_id = handler_id(event_name, name)
pairs =
for {event_name, metrics} <- metrics_by_event do
event_key = :erlang.unique_integer([:positive, :monotonic])
:persistent_term.put(event_key, {storage, metrics})
handler_id = handler_id(event_name, name)

:ok =
:telemetry.attach(
handler_id,
event_name,
&__MODULE__.handle_event/4,
event_key
)

:ok =
:telemetry.attach(
handler_id,
event_name,
&__MODULE__.handle_event/4,
name
)
{handler_id, event_key}
end

handler_id
end
Enum.unzip(pairs)
end

def detach(handler_ids) do
def detach(handler_ids, event_keys) do
for id <- handler_ids, do: :telemetry.detach(id)
for key <- event_keys, do: :persistent_term.erase(key)
:ok
end

defp handler_id(event_name, peep_name) do
{__MODULE__, peep_name, event_name}
end

def handle_event(event, measurements, metadata, name) do
persistent(
events_to_metrics: %{^event => metrics},
storage: {storage_mod, storage}
) = Peep.Persistent.fetch(name)
def precompute_metrics(metrics, {storage_mod, _}) do
{tag_map, _} =
Enum.reduce(metrics, {%{}, 0}, fn {metric, _id}, {map, next_idx} ->
key = {metric.tag_values, metric.tags}

case map do
%{^key => _} -> {map, next_idx}
_ -> {Map.put(map, key, next_idx), next_idx + 1}
end
end)

tag_fns =
tag_map
|> Enum.sort_by(fn {_key, idx} -> idx end)
|> Enum.map(fn {{tag_values, tags}, _idx} -> compile_tag_fn(tag_values, tags) end)
|> List.to_tuple()

metrics_list =
Enum.map(metrics, fn {metric, id} ->
%{
measurement: measurement,
tag_values: tag_values,
tags: tags,
keep: keep
} = metric

tag_idx = Map.fetch!(tag_map, {tag_values, tags})
keep_val = if is_nil(keep), do: :no_keep, else: keep

insert_fn = fn data, value, tags ->
storage_mod.insert_metric(data, id, metric, value, tags)
end

{metric_type(metric), insert_fn, keep_val, measurement, tag_idx}
end)

{tag_fns, metrics_list}
end

defp metric_type(%Telemetry.Metrics.Counter{}), do: :counter
defp metric_type(_), do: :other

defp compile_tag_fn(_tag_values, []), do: fn _metadata -> %{} end
defp compile_tag_fn(_tag_values, tags) when is_function(tags, 1), do: tags
defp compile_tag_fn(tag_values, keys), do: fn metadata -> Map.take(tag_values.(metadata), keys) end

def handle_event(_event, measurements, metadata, event_key) do
{{storage_mod, storage}, {tag_fns, metrics}} = :persistent_term.get(event_key)
resolved = storage_mod.resolve(storage)
tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns), 0, [])
store_metrics(metrics, measurements, metadata, resolved, tag_results)
end

defp compute_tags(_tag_fns, _metadata, size, size, acc) do
acc |> :lists.reverse() |> List.to_tuple()
end

defp compute_tags(tag_fns, metadata, size, idx, acc) do
compute_tags(tag_fns, metadata, size, idx + 1, [elem(tag_fns, idx).(metadata) | acc])
end

defp store_metrics([], _measurements, _metadata, _data, _tag_results), do: :ok

defp store_metrics(
[{:counter, insert_fn, :no_keep, _measurement, tag_idx} | rest],
measurements,
metadata,
data,
tag_results
) do
insert_fn.(data, 1, elem(tag_results, tag_idx))
store_metrics(rest, measurements, metadata, data, tag_results)
end

defp store_metrics(
[{_type, insert_fn, :no_keep, measurement, tag_idx} | rest],
measurements,
metadata,
data,
tag_results
) do
case fetch_measurement(measurement, measurements, metadata) do
value when is_number(value) ->
insert_fn.(data, value, elem(tag_results, tag_idx))

_ ->
nil
end

store_metrics(metrics, measurements, metadata, storage_mod, storage)
store_metrics(rest, measurements, metadata, data, tag_results)
end

defp store_metrics([], _measurements, _metadata, _mod, _data), do: :ok
defp store_metrics(
[{:counter, insert_fn, keep, _measurement, tag_idx} | rest],
measurements,
metadata,
data,
tag_results
) do
if keep?(keep, metadata, nil) do
insert_fn.(data, 1, elem(tag_results, tag_idx))
end

defp store_metrics([{metric, id} | rest], measurements, metadata, mod, data) do
%{
measurement: measurement,
tag_values: tag_values,
tags: tags,
keep: keep
} = metric
store_metrics(rest, measurements, metadata, data, tag_results)
end

defp store_metrics(
[{_type, insert_fn, keep, measurement, tag_idx} | rest],
measurements,
metadata,
data,
tag_results
) do
if keep?(keep, metadata, measurement) do
# credo:disable-for-next-line Credo.Check.Refactor.Nesting
case fetch_measurement(measurement, measurements, metadata) do
value when is_number(value) ->
mod.insert_metric(
data,
id,
metric,
value,
meta(metadata, tag_values, tags)
)
insert_fn.(data, value, elem(tag_results, tag_idx))

_ ->
nil
end
end

store_metrics(rest, measurements, metadata, mod, data)
store_metrics(rest, measurements, metadata, data, tag_results)
end

defp keep?(keep, metadata, measurement) when is_function(keep, 2),
Expand All @@ -77,15 +174,6 @@ defmodule Peep.EventHandler do
defp keep?(keep, metadata, _measurement) when is_function(keep, 1), do: keep.(metadata)
defp keep?(_keep, _metadata, _measurement), do: true

# When selected list is empty, just return empty map
defp meta(_tags, _map, []), do: %{}
defp meta(meta, _map, tags) when is_function(tags, 1), do: tags.(meta)
defp meta(tags, map, keys), do: Map.take(map.(tags), keys)

defp fetch_measurement(%Telemetry.Metrics.Counter{}, _measurements, _metadata) do
1
end

defp fetch_measurement(measurement, measurements, metadata) do
case measurement do
nil ->
Expand Down
7 changes: 6 additions & 1 deletion lib/peep/persistent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,15 @@ defmodule Peep.Persistent do
metrics_to_ids: metrics_to_ids
} = Peep.assign_metric_ids(metrics)

precomputed_events =
Map.new(events_to_metrics, fn {event, metric_list} ->
{event, Peep.EventHandler.precompute_metrics(metric_list, storage)}
end)

persistent(
name: name,
storage: storage,
events_to_metrics: events_to_metrics,
events_to_metrics: precomputed_events,
ids_to_metrics: ids_to_metrics,
metrics_to_ids: metrics_to_ids,
global_tags: global_tags
Expand Down
6 changes: 6 additions & 0 deletions lib/peep/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ defmodule Peep.Storage do
"""
@callback storage_size(term()) :: %{size: non_neg_integer(), memory: non_neg_integer()}

@doc """
Resolves storage to its per-scheduler form for the current scheduler.
Called once per event to avoid redundant scheduler lookups across metrics.
"""
@callback resolve(term()) :: term()

@doc """
Stores a sample metric
"""
Expand Down
3 changes: 3 additions & 0 deletions lib/peep/storage/ets.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ defmodule Peep.Storage.ETS do
:ets.new(__MODULE__, opts)
end

@impl true
def resolve(tid), do: tid

@impl true
def storage_size(tid) do
%{
Expand Down
Loading
Loading