Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions bench/event_handler_bench.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Telemetry.Metrics

metrics = [
counter("bench.event.count", tags: [:tag_a]),
counter("bench.event.count2", tags: [:tag_a, :tag_b]),
sum("bench.event.payload_size", tags: [:tag_a]),
last_value("bench.event.queue_depth", tags: [:tag_a]),
distribution("bench.event.duration",
tags: [:tag_a],
reporter_options: [max_value: 1_000_000, bucket_variability: 0.3]
),
distribution("bench.event.latency",
tags: [:tag_a, :tag_b],
reporter_options: [max_value: 65536, bucket_variability: 0.3]
)
]

{:ok, _pid} = Peep.start_link(name: :bench, metrics: metrics, storage: :striped)

measurements = %{
count: 1,
payload_size: 512,
queue_depth: 42,
duration: 1500,
latency: 350
}

metadata = %{tag_a: "exchange_1", tag_b: "channel_2"}
event = [:bench, :event]

parallel = String.to_integer(System.get_env("BENCH_PARALLEL", "1"))
IO.puts("Running with parallel: #{parallel}")

Benchee.run(
%{
"telemetry_execute" => fn ->
:telemetry.execute(event, measurements, metadata)
end
},
warmup: 2,
time: 5,
memory_time: 2,
parallel: parallel
)

GenServer.stop(:bench)
78 changes: 11 additions & 67 deletions lib/peep.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ defmodule Peep do
statsd_state: nil
end

@type metric_id() :: pos_integer()
@type metric_id() :: non_neg_integer()

def child_spec(options) do
%{id: peep_name!(options), start: {__MODULE__, :start_link, [options]}}
Expand All @@ -101,21 +101,6 @@ defmodule Peep do
end
end

def insert_metric(name, metric, value, tags) when is_number(value) do
case Peep.Persistent.fetch(name) do
Peep.Persistent.persistent(
storage: {storage_mod, storage},
metrics_to_ids: %{^metric => id}
) ->
storage_mod.insert_metric(storage, id, metric, value, tags)

_ ->
nil
end
end

def insert_metric(_name, _metric, _value, _tags), do: nil

@doc """
Returns measurements about the size of a running Peep's storage, in number of
ETS elements and in bytes of memory.
Expand Down Expand Up @@ -154,9 +139,10 @@ defmodule Peep do

defp extend_with(metrics, global_tags) do
Map.new(metrics, fn {metric, measurements} ->
updated = Map.new(measurements, fn {tags, val} ->
{Map.merge(global_tags, tags), val}
end)
updated =
Map.new(measurements, fn {tags, val} ->
{Map.merge(global_tags, tags), val}
end)

{metric, updated}
end)
Expand Down Expand Up @@ -209,66 +195,24 @@ defmodule Peep do
true
end

def assign_metric_ids(metrics) do
filtered_metrics = Enum.filter(metrics, &allow_metric?/1)

assign_metric_ids(
Enum.reverse(filtered_metrics),
%{},
%{},
%{},
length(filtered_metrics)
)
end

defp assign_metric_ids([], events_to_metrics, ids_to_metrics, metrics_to_ids, _counter) do
%{
events_to_metrics: events_to_metrics,
ids_to_metrics: ids_to_metrics,
metrics_to_ids: metrics_to_ids
}
end

defp assign_metric_ids([metric | rest], etm, itm, mti, counter) do
%{event_name: event_name} = metric

etm =
case etm do
%{^event_name => metrics} ->
%{etm | event_name => [{metric, counter} | metrics]}

_ ->
Map.put(etm, event_name, [{metric, counter}])
end

itm = Map.put(itm, counter, metric)
mti = Map.put(mti, metric, counter)

assign_metric_ids(rest, etm, itm, mti, counter - 1)
end

# callbacks

@impl true
def init(options) do
Process.flag(:trap_exit, true)
name = options.name

:ok =
Peep.Persistent.new(options)
|> Peep.Persistent.store()
peep_persistent = Peep.Persistent.new(options)
:ok = Peep.Persistent.store(peep_persistent)

handler_ids = EventHandler.attach(name)
handler_ids = EventHandler.attach(peep_persistent)
:telemetry.persist()

statsd_opts = options.statsd
statsd_flush_interval = statsd_opts[:flush_interval_ms]

if statsd_flush_interval != nil do
set_statsd_timer(statsd_flush_interval)
end

statsd_state =
if options.statsd do
if statsd_opts do
set_statsd_timer(statsd_opts.flush_interval_ms)
Statsd.make_state(statsd_opts)
else
nil
Expand Down
159 changes: 121 additions & 38 deletions lib/peep/event_handler.ex
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
defmodule Peep.EventHandler do
@moduledoc false

@compile {:inline, keep?: 3, meta: 3, fetch_measurement: 3}
@compile {:inline, keep?: 3, fetch_measurement: 3}

import Peep.Persistent, only: [persistent: 1]

def attach(name) do
persistent(events_to_metrics: metrics_by_event) = Peep.Persistent.fetch(name)
require Peep.Handler.Config
require Peep.Handler.Metric

for {event_name, _metrics} <- metrics_by_event do
alias Peep.Handler.Config
alias Peep.Handler.Metric

def attach(
persistent(
name: name,
events_to_metrics: metrics_by_event,
storage: {storage_mod, storage}
)
) do
for {event_name, metrics} <- metrics_by_event do
handler_id = handler_id(event_name, name)

:ok =
:telemetry.attach(
handler_id,
event_name,
&__MODULE__.handle_event/4,
name
Config.new(metrics, storage_mod, storage)
)

handler_id
Expand All @@ -32,43 +42,125 @@ defmodule Peep.EventHandler do
{__MODULE__, peep_name, event_name}
end

def handle_event(event, measurements, metadata, name) do
persistent(
events_to_metrics: %{^event => metrics},
storage: {storage_mod, storage}
) = Peep.Persistent.fetch(name)
def handle_event(
_event,
measurements,
metadata,
Config.handler_config(
metrics: metrics,
storage_mod: storage_mod,
storage: storage,
tag_fns: tag_fns
)
) do
resolved = storage_mod.resolve(storage)
tag_results = compute_tags(tag_fns, metadata, tuple_size(tag_fns) - 1, [])
store_metrics(metrics, measurements, metadata, resolved, tag_results)
end

defp compute_tags(_tag_fns, _metadata, -1, acc) do
List.to_tuple(acc)
end

defp compute_tags(tag_fns, metadata, idx, acc) do
compute_tags(tag_fns, metadata, idx - 1, [elem(tag_fns, idx).(metadata) | acc])
end

defp store_metrics([], _measurements, _metadata, _data, _tag_results), do: :ok

defp store_metrics(
[
Metric.handler_metric(
type: :counter,
insert_fn: insert_fn,
keep: :no_keep,
tag_idx: tag_idx
)
| rest
],
measurements,
metadata,
data,
tag_results
) do
insert_fn.(data, 1, elem(tag_results, tag_idx))
store_metrics(rest, measurements, metadata, data, tag_results)
end

store_metrics(metrics, measurements, metadata, storage_mod, storage)
defp store_metrics(
[
Metric.handler_metric(
insert_fn: insert_fn,
keep: :no_keep,
measurement: measurement,
tag_idx: tag_idx
)
| rest
],
measurements,
metadata,
data,
tag_results
) do
case fetch_measurement(measurement, measurements, metadata) do
value when is_number(value) ->
insert_fn.(data, value, elem(tag_results, tag_idx))

_ ->
nil
end

store_metrics(rest, measurements, metadata, data, tag_results)
end

defp store_metrics([], _measurements, _metadata, _mod, _data), do: :ok
defp store_metrics(
[
Metric.handler_metric(
type: :counter,
insert_fn: insert_fn,
keep: keep,
tag_idx: tag_idx
)
| rest
],
measurements,
metadata,
data,
tag_results
) do
if keep?(keep, metadata, nil) do
insert_fn.(data, 1, elem(tag_results, tag_idx))
end

defp store_metrics([{metric, id} | rest], measurements, metadata, mod, data) do
%{
measurement: measurement,
tag_values: tag_values,
tags: tags,
keep: keep
} = metric
store_metrics(rest, measurements, metadata, data, tag_results)
end

if keep?(keep, metadata, measurement) do
# credo:disable-for-next-line Credo.Check.Refactor.Nesting
defp store_metrics(
[
Metric.handler_metric(
insert_fn: insert_fn,
keep: keep,
measurement: measurement,
tag_idx: tag_idx
)
| rest
],
measurements,
metadata,
data,
tag_results
) do
if keep?(keep, metadata, nil) do
case fetch_measurement(measurement, measurements, metadata) do
value when is_number(value) ->
mod.insert_metric(
data,
id,
metric,
value,
meta(metadata, tag_values, tags)
)
insert_fn.(data, value, elem(tag_results, tag_idx))

_ ->
nil
end
end

store_metrics(rest, measurements, metadata, mod, data)
store_metrics(rest, measurements, metadata, data, tag_results)
end

defp keep?(keep, metadata, measurement) when is_function(keep, 2),
Expand All @@ -77,15 +169,6 @@ defmodule Peep.EventHandler do
defp keep?(keep, metadata, _measurement) when is_function(keep, 1), do: keep.(metadata)
defp keep?(_keep, _metadata, _measurement), do: true

# When selected list is empty, just return empty map
defp meta(_tags, _map, []), do: %{}
defp meta(meta, _map, tags) when is_function(tags, 1), do: tags.(meta)
defp meta(tags, map, keys), do: Map.take(map.(tags), keys)

defp fetch_measurement(%Telemetry.Metrics.Counter{}, _measurements, _metadata) do
1
end

defp fetch_measurement(measurement, measurements, metadata) do
case measurement do
nil ->
Expand Down
Loading
Loading