Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,17 @@ shackle_pool:start(shackle_pool:name(), client(), client_options(), pool_options
```

## Telemetry
Shackle integrates with the backend-agnostic [telemetry](https://hexdocs.pm/telemetry/) library. See `shackle_telemetry` for the list of telemetry events that shackle can emit.

It is straightforward to integrate Shackle with the backend-agnostic [telemetry](https://hexdocs.pm/telemetry/) library.
Because the calling convention in `shackle_hooks` closely resembles `telemetry:execute`, it is possible configure Shackle like so:

```
{shackle, [
{hooks, [
{events, {telemetry, execute}}
]}
]}
```

## Tests

Expand Down
2 changes: 1 addition & 1 deletion include/shackle.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pid :: undefined | pid(),
request_id :: shackle:request_id(),
timeout :: timeout(),
timestamp :: erlang:timestamp()
timestamp :: integer()
}).

-record(reconnect_state, {
Expand Down
3 changes: 1 addition & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
{deps, [
{foil, "0.1.3"},
{granderl, {git, "https://github.com/tokenrove/granderl.git", {ref, "baafd1bc825cb1fc022760eae913f774fa6af91b"}}},
{metal, "0.1.1"},
{telemetry, "1.2.1"}
{metal, "0.1.1"}
]}.

{dialyzer, [{plt_extra_apps, [public_key]}]}.
Expand Down
3 changes: 1 addition & 2 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
{git,"https://github.com/tokenrove/granderl.git",
{ref,"baafd1bc825cb1fc022760eae913f774fa6af91b"}},
0},
{<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0},
{<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.2.1">>},0}]}.
{<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}]}.
[
{pkg_hash,[
{<<"foil">>, <<"415835CA94A8D0A55AB3D334FE2D1A1DCF36E7A0F69789050765770B6BF5E6E9">>},
Expand Down
2 changes: 1 addition & 1 deletion src/shackle.app.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{application, shackle, [
{applications, [kernel, stdlib, granderl, metal, foil, ssl, telemetry]},
{applications, [kernel, stdlib, granderl, metal, foil, ssl]},
{description, "High-Performance Erlang Network Client Framework"},
{env, []},
{licenses, ["MIT"]},
Expand Down
2 changes: 1 addition & 1 deletion src/shackle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ cast(PoolName, Request, Pid) ->
{ok, request_id()} | {error, atom()}.

cast(PoolName, Request, Pid, Timeout) ->
Timestamp = os:timestamp(),
Timestamp = erlang:monotonic_time(),
Ref = make_ref(),
case shackle_pool:server(PoolName) of
{ok, Client, Server} ->
Expand Down
148 changes: 148 additions & 0 deletions src/shackle_events.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
-module(shackle_events).

-compile(inline).
-compile({inline_size, 512}).

-export([
backlog_full/1,
connected/3,
connection_error/3,
disabled/1,
found/1,
handle_timeout/1,
no_server/1,
not_found/1,
queued_time/2,
recv/2,
replies/1,
reply/4,
send/2,
timeout/2
]).

-define(EVENT(Block),
case shackle_hooks:handler() of
{M, F} ->
{EventName, Measurements, Metadata} = Block,
M:F(EventName, Measurements, Metadata);
_ ->
ok
end
).

-spec backlog_full(shackle:client()) -> ok.
backlog_full(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, backlog_full], Measurements, Metadata}
end).

-spec connected(shackle:client(), shackle_pool:name(), integer()) -> ok.
connected(Client, PoolName, StartTime) ->
?EVENT(begin
Measurements = #{count => 1, duration => duration_since(StartTime)},
Metadata = #{client => Client, pool_name => PoolName},
{[shackle, connected], Measurements, Metadata}
end).

-spec connection_error(shackle:client(), shackle_pool:name(), any()) -> ok.
connection_error(Client, PoolName, Reason) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client, pool_name => PoolName, reason => Reason},
{[shackle, connection_error], Measurements, Metadata}
end).

-spec disabled(shackle:client()) -> ok.
disabled(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, disabled], Measurements, Metadata}
end).

-spec found(shackle:client()) -> ok.
found(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, found], Measurements, Metadata}
end).

-spec handle_timeout(shackle:client()) -> ok.
handle_timeout(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, handle_timeout], Measurements, Metadata}
end).

-spec no_server(shackle:client()) -> ok.
no_server(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, no_server], Measurements, Metadata}
end).

-spec not_found(shackle:client()) -> ok.
not_found(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, not_found], Measurements, Metadata}
end).

-spec queued_time(shackle:client(), non_neg_integer()) -> ok.
queued_time(Client, StartTime) ->
?EVENT(begin
Measurements = #{duration => duration_since(StartTime)},
Metadata = #{client => Client},
{[shackle, queued_time], Measurements, Metadata}
end).

-spec recv(shackle:client(), iodata()) -> ok.
recv(Client, Data) ->
?EVENT(begin
Measurements = #{count => 1, bytes => iolist_size(Data)},
Metadata = #{client => Client},
{[shackle, recv], Measurements, Metadata}
end).

-spec replies(shackle:client()) -> ok.
replies(Client) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client},
{[shackle, replies], Measurements, Metadata}
end).

-spec reply(shackle:client(), term(), term(), integer()) -> ok.
reply(Client, Request, Response, Timestamp) ->
?EVENT(begin
Measurements = #{duration => duration_since(Timestamp)},
Metadata = #{client => Client, request => Request, response => Response},
{[shackle, reply], Measurements, Metadata}
end).

-spec send(shackle:client(), iodata()) -> ok.
send(Client, Data) ->
?EVENT(begin
Measurements = #{count => 1, bytes => iolist_size(Data)},
Metadata = #{client => Client},
{[shackle, send], Measurements, Metadata}
end).

-spec timeout(shackle:client(), term()) -> ok.
timeout(Client, Request) ->
?EVENT(begin
Measurements = #{count => 1},
Metadata = #{client => Client, request => Request},
{[shackle, timeout], Measurements, Metadata}
end).

%% private

duration_since(Timestamp) ->
erlang:monotonic_time() - Timestamp.
44 changes: 44 additions & 0 deletions src/shackle_hooks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-module(shackle_hooks).
-include("shackle_internal.hrl").

-dialyzer({nowarn_function, handler/0}).
-ignore_xref([
{shackle_hooks_foil, lookup, 1}
]).

-compile(inline).
-compile({inline_size, 512}).

-export([
init/0,
handler/0
]).

%% callbacks
-optional_callbacks([event/3]).

-type event_name() :: [atom(), ...].
-type event_measurements() :: map().
-type event_metadata() :: map().

-callback event(event_name(), event_measurements(), event_metadata()) ->
ok.

%% public
-spec init() ->
ok.

init() ->
Hooks = ?GET_ENV(hooks, []),
Events = ?LOOKUP(events, Hooks, undefined),
foil:new(?MODULE),
foil:insert(?MODULE, events, Events),
foil:load(?MODULE),
ok.

-spec handler() -> {atom(), atom()} | undefined.
handler() ->
case shackle_hooks_foil:lookup(events) of
{ok, {M, F}} -> {M, F};
_ -> undefined
end.
6 changes: 3 additions & 3 deletions src/shackle_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ server(_Name, #pool_options {
client = Client
}, 0) ->

shackle_telemetry:no_server(Client),
shackle_events:no_server(Client),
{error, no_server};
server(Name, #pool_options {
backlog_size = BacklogSize,
Expand All @@ -183,11 +183,11 @@ server(Name, #pool_options {
true ->
{ok, Client, ServerName};
false ->
shackle_telemetry:backlog_full(Client),
shackle_events:backlog_full(Client),
server(Name, Options, N - 1)
end;
false ->
shackle_telemetry:disabled(Client),
shackle_events:disabled(Client),
server(Name, Options, N - 1)
end.

Expand Down
18 changes: 9 additions & 9 deletions src/shackle_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

%% internal
-export([
add/5,
add/6,
clear/2,
delete/1,
new/1,
Expand All @@ -14,24 +14,24 @@
]).

%% internal
-spec add(shackle:table(), shackle_server:id(), shackle:external_request_id(), shackle:cast(), reference()) ->
-spec add(shackle:table(), shackle_server:id(), shackle:external_request_id(), term(), shackle:cast(), reference()) ->
ok.

add(Table, ServerId, ExtRequestId, Cast, TimerRef) ->
Object = {{ServerId, ExtRequestId}, {Cast, TimerRef}},
add(Table, ServerId, ExtRequestId, Request, Cast, TimerRef) ->
Object = {{ServerId, ExtRequestId}, {Cast, Request, TimerRef}},
ets:insert(Table, Object),
ok.

-spec clear(shackle:table(), shackle_server:id()) ->
[{shackle:cast(), reference()}].
[{shackle:cast(), term(), reference()}].

clear(Table, ServerId) ->
Match = {{ServerId, '_'}, '_'},
case ets_match_take(Table, Match) of
[] ->
[];
Objects ->
[{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects]
[{Cast, Request, TimerRef} || {_, {Cast, Request, TimerRef}} <- Objects]
end.

-spec delete(shackle_pool:name()) ->
Expand All @@ -50,14 +50,14 @@ new(PoolName) ->
ok.

-spec remove(shackle:table(), shackle_server:id(), shackle:external_request_id()) ->
{ok, shackle:cast(), reference()} | {error, not_found}.
{ok, shackle:cast(), term(), reference()} | {error, not_found}.

remove(Table, ServerId, ExtRequestId) ->
case ets_take(Table, {ServerId, ExtRequestId}) of
[] ->
{error, not_found};
[{_, {Cast, TimerRef}}] ->
{ok, Cast, TimerRef}
[{_, {Cast, Request, TimerRef}}] ->
{ok, Cast, Request, TimerRef}
end.

%% private
Expand Down
Loading