Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions lib/peep.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,13 @@ defmodule Peep do
values are already precomputed, for example presummed socket stats.
"""
use GenServer

require Logger
alias Peep.{EventHandler, Options, Statsd}
require Peep.Persistent

alias Peep.EventHandler
alias Peep.Options
alias Peep.Statsd

defmodule State do
@moduledoc false
Expand Down Expand Up @@ -98,10 +103,10 @@ defmodule Peep do

def insert_metric(name, metric, value, tags) when is_number(value) do
case Peep.Persistent.fetch(name) do
%Peep.Persistent{
Peep.Persistent.persistent(
storage: {storage_mod, storage},
metrics_to_ids: %{^metric => id}
} ->
) ->
storage_mod.insert_metric(storage, id, metric, value, tags)

_ ->
Expand Down Expand Up @@ -136,7 +141,7 @@ defmodule Peep do
"""
def get_all_metrics(name) do
case Peep.Persistent.fetch(name) do
%Peep.Persistent{storage: {storage_mod, storage}} = p ->
Peep.Persistent.persistent(storage: {storage_mod, storage}) = p ->
storage_mod.get_all_metrics(storage, p)

_ ->
Expand All @@ -153,10 +158,10 @@ defmodule Peep do

def get_metric(name, metric, tags) do
case Peep.Persistent.fetch(name) do
%Peep.Persistent{
Peep.Persistent.persistent(
storage: {storage_mod, storage},
metrics_to_ids: %{^metric => id}
} ->
) ->
storage_mod.get_metric(storage, id, metric, tags)

_ ->
Expand Down
7 changes: 3 additions & 4 deletions lib/peep/codegen.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule Peep.Codegen do
@moduledoc false

alias Peep.Options
alias Peep.Persistent

def module(peep_name) do
:"Peep.Codegen.#{peep_name}"
Expand All @@ -18,7 +17,7 @@ defmodule Peep.Codegen do
module_ast =
quote do
defmodule unquote(module_name) do
require Persistent
import Peep.Persistent, only: [fast_fetch: 1, persistent: 1]

@compile {:inline, global_tags: 0}

Expand All @@ -43,10 +42,10 @@ defmodule Peep.Codegen do
def handle_event(event, measurements, metadata, _) do
global_tags = global_tags()

%Persistent{
persistent(
events_to_metrics: %{^event => metrics},
storage: {storage_mod, storage}
} = Persistent.fast_fetch(unquote(peep_name))
) = fast_fetch(unquote(peep_name))

:lists.foreach(
fn {metric, id} ->
Expand Down
4 changes: 3 additions & 1 deletion lib/peep/event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ defmodule Peep.EventHandler do

@compile :inline

import Peep.Persistent, only: [persistent: 1]

def attach(name) do
%Peep.Persistent{events_to_metrics: metrics_by_event} = Peep.Persistent.fetch(name)
persistent(events_to_metrics: metrics_by_event) = Peep.Persistent.fetch(name)
module = Peep.Codegen.module(name)

for {event_name, _metrics} <- metrics_by_event do
Expand Down
45 changes: 29 additions & 16 deletions lib/peep/persistent.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
defmodule Peep.Persistent do
@moduledoc false
defstruct [:name, :storage, :events_to_metrics, :ids_to_metrics, :metrics_to_ids]

require Record

Record.defrecord(:persistent, [
:name,
:storage,
events_to_metrics: %{},
ids_to_metrics: %{},
metrics_to_ids: %{}
])

@compile {:inline, key: 1, fetch: 1}

Expand All @@ -10,16 +19,17 @@ defmodule Peep.Persistent do
@typep events_to_metrics() :: %{
:telemetry.event_name() => [{Telemetry.Metrics.t(), non_neg_integer()}]
}
@typep ids_to_metrics :: %{Peep.metric_id() => Telemetry.Metrics.t()}
@typep metrics_to_ids :: %{Telemetry.Metrics.t() => Peep.metric_id()}

@type t() :: %__MODULE__{
name: name(),
storage: storage(),
events_to_metrics: events_to_metrics(),
ids_to_metrics: ids_to_metrics(),
metrics_to_ids: metrics_to_ids()
}
@typep metrics_to_ids() :: %{Telemetry.Metrics.t() => Peep.metric_id()}

@type ids_to_metrics() :: %{Peep.metric_id() => Telemetry.Metrics.t()}
@type t() ::
record(:persistent,
name: name(),
storage: storage(),
events_to_metrics: events_to_metrics(),
ids_to_metrics: ids_to_metrics(),
metrics_to_ids: metrics_to_ids()
)

@spec new(Peep.Options.t()) :: t()
def new(%Peep.Options{} = options) do
Expand All @@ -43,18 +53,18 @@ defmodule Peep.Persistent do
metrics_to_ids: metrics_to_ids
} = Peep.assign_metric_ids(metrics)

%__MODULE__{
persistent(
name: name,
storage: storage,
events_to_metrics: events_to_metrics,
ids_to_metrics: ids_to_metrics,
metrics_to_ids: metrics_to_ids
}
)
end

@spec store(t()) :: :ok
def store(%__MODULE__{} = term) do
%__MODULE__{name: name} = term
def store(persistent() = term) do
persistent(name: name) = term
:persistent_term.put(key(name), term)
end

Expand All @@ -72,14 +82,17 @@ defmodule Peep.Persistent do
@spec storage(name()) :: {module(), term()} | nil
def storage(name) when is_atom(name) do
case fetch(name) do
%__MODULE__{storage: s} ->
persistent(storage: s) ->
s

_ ->
nil
end
end

@spec ids_to_metrics(t()) :: ids_to_metrics()
def ids_to_metrics(persistent(ids_to_metrics: itm)), do: itm

defmacro fast_fetch(name) when is_atom(name) do
quote do
:persistent_term.get(unquote(key(name)), nil)
Expand Down
30 changes: 17 additions & 13 deletions lib/peep/storage/atomics.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
defmodule Peep.Storage.Atomics do
@moduledoc false

defstruct [
require Record

Record.defrecord(:atomic, [
:num_buckets,
:buckets,
:sum,
:above_max,
:bucket_calculator
]
])

def new(%Telemetry.Metrics.Distribution{} = metric) do
{bucket_calculator, config} = Peep.Buckets.config(metric)
Expand All @@ -16,23 +18,23 @@ defmodule Peep.Storage.Atomics do
sum = :atomics.new(1, signed: true)
above_max = :atomics.new(1, signed: false)

%__MODULE__{
atomic(
num_buckets: num_buckets,
buckets: buckets,
sum: sum,
above_max: above_max,
bucket_calculator: {bucket_calculator, config}
}
)
end

def insert(
%__MODULE__{
atomic(
bucket_calculator: {module, config},
buckets: buckets,
sum: sum,
num_buckets: num_buckets,
above_max: above_max
},
),
value
) do
# :atomics indexes are 1-based.
Expand All @@ -50,13 +52,15 @@ defmodule Peep.Storage.Atomics do
:atomics.add(sum, 1, round(value))
end

def values(%__MODULE__{
bucket_calculator: {module, config},
buckets: buckets,
sum: sum,
above_max: above_max,
num_buckets: num_buckets
}) do
def values(
atomic(
bucket_calculator: {module, config},
buckets: buckets,
sum: sum,
above_max: above_max,
num_buckets: num_buckets
)
) do
map =
for idx <- 1..num_buckets, into: %{} do
{module.upper_bound(idx - 1, config), :atomics.get(buckets, idx)}
Expand Down
9 changes: 6 additions & 3 deletions lib/peep/storage/ets.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ defmodule Peep.Storage.ETS do
consider switching to `Peep.Storage.Striped`, which reduces lock contention
at the cost of higher memory usage.
"""

require Peep.Storage.Atomics

alias Peep.Storage
alias Telemetry.Metrics

Expand Down Expand Up @@ -81,9 +84,9 @@ defmodule Peep.Storage.ETS do
end

@impl true
def get_all_metrics(tid, %Peep.Persistent{ids_to_metrics: itm}) do
def get_all_metrics(tid, persistent) do
:ets.tab2list(tid)
|> group_metrics(itm, %{})
|> group_metrics(Peep.Persistent.ids_to_metrics(persistent), %{})
end

@impl true
Expand Down Expand Up @@ -153,7 +156,7 @@ defmodule Peep.Storage.ETS do
update_in(acc, [Access.key(metric, %{}), Access.key(tags, 0)], &(&1 + value))
end

defp group_metric({{id, tags}, %Storage.Atomics{} = atomics}, itm, acc) do
defp group_metric({{id, tags}, Storage.Atomics.atomic() = atomics}, itm, acc) do
%{^id => metric} = itm
put_in(acc, [Access.key(metric, %{}), Access.key(tags)], Storage.Atomics.values(atomics))
end
Expand Down
4 changes: 3 additions & 1 deletion lib/peep/storage/striped.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Peep.Storage.Striped do
Offers less lock contention than `Peep.Storage.ETS`, at the cost of higher
memory usage. Recommended when executing thousands of metrics per second.
"""

alias Telemetry.Metrics
alias Peep.Storage

Expand Down Expand Up @@ -187,7 +188,8 @@ defmodule Peep.Storage.Striped do
end

@impl true
def get_all_metrics(tids, %Peep.Persistent{ids_to_metrics: itm}) do
def get_all_metrics(tids, persistent) do
itm = Peep.Persistent.ids_to_metrics(persistent)
acc = get_all_metrics2(Tuple.to_list(tids), itm, %{})
remove_timestamps_from_last_values(acc)
end
Expand Down
8 changes: 6 additions & 2 deletions test/support/custom_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ defmodule CustomStorage do
"""
@behaviour Peep.Storage

require Peep.Persistent
require Peep.Storage.Atomics

alias Telemetry.Metrics
alias Peep.Storage


@impl true
@spec new(non_neg_integer) :: tuple
def new(n_agents) do
Expand Down Expand Up @@ -85,7 +89,7 @@ defmodule CustomStorage do
end

@impl true
def get_all_metrics(agents, %Peep.Persistent{ids_to_metrics: itm}) do
def get_all_metrics(agents, Peep.Persistent.persistent(ids_to_metrics: itm)) do
agents
|> Tuple.to_list()
|> Enum.flat_map(fn agent ->
Expand Down Expand Up @@ -205,7 +209,7 @@ defmodule CustomStorage do
group_metrics(rest, itm, acc2)
end

defp group_metric({{id, tags}, %Storage.Atomics{} = atomics}, itm, acc) do
defp group_metric({{id, tags}, Storage.Atomics.atomic() = atomics}, itm, acc) do
%{^id => metric} = itm
put_in(acc, [Access.key(metric, %{}), Access.key(tags)], Storage.Atomics.values(atomics))
end
Expand Down
Loading