diff --git a/README.md b/README.md index ffde1d4..897b5c2 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,17 @@ shackle_pool:start(shackle_pool:name(), client(), client_options(), pool_options ``` ## Telemetry -Shackle integrates with the backend-agnostic [telemetry](https://hexdocs.pm/telemetry/) library. See `shackle_telemetry` for the list of telemetry events that shackle can emit. + +It is straightforward to integrate Shackle with the backend-agnostic [telemetry](https://hexdocs.pm/telemetry/) library. +Because the calling convention in `shackle_hooks` closely resembles `telemetry:execute`, it is possible configure Shackle like so: + +``` +{shackle, [ + {hooks, [ + {events, {telemetry, execute}} + ]} +]} +``` ## Tests diff --git a/include/shackle.hrl b/include/shackle.hrl index 0c16205..2f8a14f 100644 --- a/include/shackle.hrl +++ b/include/shackle.hrl @@ -4,7 +4,7 @@ pid :: undefined | pid(), request_id :: shackle:request_id(), timeout :: timeout(), - timestamp :: erlang:timestamp() + timestamp :: integer() }). -record(reconnect_state, { diff --git a/rebar.config b/rebar.config index 5527149..24d3d27 100644 --- a/rebar.config +++ b/rebar.config @@ -7,8 +7,7 @@ {deps, [ {foil, "0.1.3"}, {granderl, {git, "https://github.com/tokenrove/granderl.git", {ref, "baafd1bc825cb1fc022760eae913f774fa6af91b"}}}, - {metal, "0.1.1"}, - {telemetry, "1.2.1"} + {metal, "0.1.1"} ]}. {dialyzer, [{plt_extra_apps, [public_key]}]}. diff --git a/rebar.lock b/rebar.lock index a106a08..560f745 100644 --- a/rebar.lock +++ b/rebar.lock @@ -4,8 +4,7 @@ {git,"https://github.com/tokenrove/granderl.git", {ref,"baafd1bc825cb1fc022760eae913f774fa6af91b"}}, 0}, - {<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}, - {<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.2.1">>},0}]}. + {<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}]}. [ {pkg_hash,[ {<<"foil">>, <<"415835CA94A8D0A55AB3D334FE2D1A1DCF36E7A0F69789050765770B6BF5E6E9">>}, diff --git a/src/shackle.app.src b/src/shackle.app.src index 3eaf998..d34d446 100644 --- a/src/shackle.app.src +++ b/src/shackle.app.src @@ -1,5 +1,5 @@ {application, shackle, [ - {applications, [kernel, stdlib, granderl, metal, foil, ssl, telemetry]}, + {applications, [kernel, stdlib, granderl, metal, foil, ssl]}, {description, "High-Performance Erlang Network Client Framework"}, {env, []}, {licenses, ["MIT"]}, diff --git a/src/shackle.erl b/src/shackle.erl index 278d536..5a37819 100644 --- a/src/shackle.erl +++ b/src/shackle.erl @@ -78,7 +78,7 @@ cast(PoolName, Request, Pid) -> {ok, request_id()} | {error, atom()}. cast(PoolName, Request, Pid, Timeout) -> - Timestamp = os:timestamp(), + Timestamp = erlang:monotonic_time(), Ref = make_ref(), case shackle_pool:server(PoolName) of {ok, Client, Server} -> diff --git a/src/shackle_events.erl b/src/shackle_events.erl new file mode 100644 index 0000000..77af75e --- /dev/null +++ b/src/shackle_events.erl @@ -0,0 +1,148 @@ +-module(shackle_events). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + backlog_full/1, + connected/3, + connection_error/3, + disabled/1, + found/1, + handle_timeout/1, + no_server/1, + not_found/1, + queued_time/2, + recv/2, + replies/1, + reply/4, + send/2, + timeout/2 +]). + +-define(EVENT(Block), + case shackle_hooks:handler() of + {M, F} -> + {EventName, Measurements, Metadata} = Block, + M:F(EventName, Measurements, Metadata); + _ -> + ok + end +). + +-spec backlog_full(shackle:client()) -> ok. +backlog_full(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, backlog_full], Measurements, Metadata} +end). + +-spec connected(shackle:client(), shackle_pool:name(), integer()) -> ok. +connected(Client, PoolName, StartTime) -> +?EVENT(begin + Measurements = #{count => 1, duration => duration_since(StartTime)}, + Metadata = #{client => Client, pool_name => PoolName}, + {[shackle, connected], Measurements, Metadata} +end). + +-spec connection_error(shackle:client(), shackle_pool:name(), any()) -> ok. +connection_error(Client, PoolName, Reason) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client, pool_name => PoolName, reason => Reason}, + {[shackle, connection_error], Measurements, Metadata} +end). + +-spec disabled(shackle:client()) -> ok. +disabled(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, disabled], Measurements, Metadata} +end). + +-spec found(shackle:client()) -> ok. +found(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, found], Measurements, Metadata} +end). + +-spec handle_timeout(shackle:client()) -> ok. +handle_timeout(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, handle_timeout], Measurements, Metadata} +end). + +-spec no_server(shackle:client()) -> ok. +no_server(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, no_server], Measurements, Metadata} +end). + +-spec not_found(shackle:client()) -> ok. +not_found(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, not_found], Measurements, Metadata} +end). + +-spec queued_time(shackle:client(), non_neg_integer()) -> ok. +queued_time(Client, StartTime) -> +?EVENT(begin + Measurements = #{duration => duration_since(StartTime)}, + Metadata = #{client => Client}, + {[shackle, queued_time], Measurements, Metadata} +end). + +-spec recv(shackle:client(), iodata()) -> ok. +recv(Client, Data) -> +?EVENT(begin + Measurements = #{count => 1, bytes => iolist_size(Data)}, + Metadata = #{client => Client}, + {[shackle, recv], Measurements, Metadata} +end). + +-spec replies(shackle:client()) -> ok. +replies(Client) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client}, + {[shackle, replies], Measurements, Metadata} +end). + +-spec reply(shackle:client(), term(), term(), integer()) -> ok. +reply(Client, Request, Response, Timestamp) -> +?EVENT(begin + Measurements = #{duration => duration_since(Timestamp)}, + Metadata = #{client => Client, request => Request, response => Response}, + {[shackle, reply], Measurements, Metadata} +end). + +-spec send(shackle:client(), iodata()) -> ok. +send(Client, Data) -> +?EVENT(begin + Measurements = #{count => 1, bytes => iolist_size(Data)}, + Metadata = #{client => Client}, + {[shackle, send], Measurements, Metadata} +end). + +-spec timeout(shackle:client(), term()) -> ok. +timeout(Client, Request) -> +?EVENT(begin + Measurements = #{count => 1}, + Metadata = #{client => Client, request => Request}, + {[shackle, timeout], Measurements, Metadata} +end). + +%% private + +duration_since(Timestamp) -> + erlang:monotonic_time() - Timestamp. diff --git a/src/shackle_hooks.erl b/src/shackle_hooks.erl new file mode 100644 index 0000000..b8bb892 --- /dev/null +++ b/src/shackle_hooks.erl @@ -0,0 +1,44 @@ +-module(shackle_hooks). +-include("shackle_internal.hrl"). + +-dialyzer({nowarn_function, handler/0}). +-ignore_xref([ + {shackle_hooks_foil, lookup, 1} +]). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + init/0, + handler/0 +]). + +%% callbacks +-optional_callbacks([event/3]). + +-type event_name() :: [atom(), ...]. +-type event_measurements() :: map(). +-type event_metadata() :: map(). + +-callback event(event_name(), event_measurements(), event_metadata()) -> + ok. + +%% public +-spec init() -> + ok. + +init() -> + Hooks = ?GET_ENV(hooks, []), + Events = ?LOOKUP(events, Hooks, undefined), + foil:new(?MODULE), + foil:insert(?MODULE, events, Events), + foil:load(?MODULE), + ok. + +-spec handler() -> {atom(), atom()} | undefined. +handler() -> + case shackle_hooks_foil:lookup(events) of + {ok, {M, F}} -> {M, F}; + _ -> undefined + end. diff --git a/src/shackle_pool.erl b/src/shackle_pool.erl index c4c864b..6ec3f70 100644 --- a/src/shackle_pool.erl +++ b/src/shackle_pool.erl @@ -165,7 +165,7 @@ server(_Name, #pool_options { client = Client }, 0) -> - shackle_telemetry:no_server(Client), + shackle_events:no_server(Client), {error, no_server}; server(Name, #pool_options { backlog_size = BacklogSize, @@ -183,11 +183,11 @@ server(Name, #pool_options { true -> {ok, Client, ServerName}; false -> - shackle_telemetry:backlog_full(Client), + shackle_events:backlog_full(Client), server(Name, Options, N - 1) end; false -> - shackle_telemetry:disabled(Client), + shackle_events:disabled(Client), server(Name, Options, N - 1) end. diff --git a/src/shackle_queue.erl b/src/shackle_queue.erl index 32038ac..c5b1276 100644 --- a/src/shackle_queue.erl +++ b/src/shackle_queue.erl @@ -5,7 +5,7 @@ %% internal -export([ - add/5, + add/6, clear/2, delete/1, new/1, @@ -14,16 +14,16 @@ ]). %% internal --spec add(shackle:table(), shackle_server:id(), shackle:external_request_id(), shackle:cast(), reference()) -> +-spec add(shackle:table(), shackle_server:id(), shackle:external_request_id(), term(), shackle:cast(), reference()) -> ok. -add(Table, ServerId, ExtRequestId, Cast, TimerRef) -> - Object = {{ServerId, ExtRequestId}, {Cast, TimerRef}}, +add(Table, ServerId, ExtRequestId, Request, Cast, TimerRef) -> + Object = {{ServerId, ExtRequestId}, {Cast, Request, TimerRef}}, ets:insert(Table, Object), ok. -spec clear(shackle:table(), shackle_server:id()) -> - [{shackle:cast(), reference()}]. + [{shackle:cast(), term(), reference()}]. clear(Table, ServerId) -> Match = {{ServerId, '_'}, '_'}, @@ -31,7 +31,7 @@ clear(Table, ServerId) -> [] -> []; Objects -> - [{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects] + [{Cast, Request, TimerRef} || {_, {Cast, Request, TimerRef}} <- Objects] end. -spec delete(shackle_pool:name()) -> @@ -50,14 +50,14 @@ new(PoolName) -> ok. -spec remove(shackle:table(), shackle_server:id(), shackle:external_request_id()) -> - {ok, shackle:cast(), reference()} | {error, not_found}. + {ok, shackle:cast(), term(), reference()} | {error, not_found}. remove(Table, ServerId, ExtRequestId) -> case ets_take(Table, {ServerId, ExtRequestId}) of [] -> {error, not_found}; - [{_, {Cast, TimerRef}}] -> - {ok, Cast, TimerRef} + [{_, {Cast, Request, TimerRef}}] -> + {ok, Cast, Request, TimerRef} end. %% private diff --git a/src/shackle_server.erl b/src/shackle_server.erl index cab2ef3..f06c1e6 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -94,14 +94,16 @@ init(Name, Parent, Opts) -> -spec handle_msg(term(), {state(), client_state()}) -> {ok, term()}. -handle_msg({_, #cast {} = Cast}, {#state { - socket = undefined +handle_msg({_, #cast {timestamp = Timestamp} = Cast}, {#state { + socket = undefined, + client = Client } = State, ClientState}) -> - + shackle_events:queued_time(Client, Timestamp), reply({error, no_socket}, Cast, State), {ok, {State, ClientState}}; handle_msg({Request, #cast { - timeout = Timeout + timeout = Timeout, + timestamp = Timestamp } = Cast}, {#state { client = Client, id = Id, @@ -110,19 +112,19 @@ handle_msg({Request, #cast { queue = Queue, socket = Socket } = State, ClientState}) -> - + shackle_events:queued_time(Client, Timestamp), try Client:handle_request(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case Protocol:send(Socket, Data) of ok -> - shackle_telemetry:send(Client, iolist_size(Data)), + shackle_events:send(Client, Data), case ExtRequestId of undefined -> reply(ok, Cast, State); _ -> Msg = {timeout, ExtRequestId}, TimerRef = erlang:send_after(Timeout, self(), Msg), - shackle_queue:add(Queue, Id, ExtRequestId, Cast, + shackle_queue:add(Queue, Id, ExtRequestId, Request, Cast, TimerRef) end, {ok, {State, ClientState2}}; @@ -167,7 +169,7 @@ handle_msg(?MSG_CONNECT, {#state { socket_options = SocketOptions } = State, ClientState}) -> - case connect(Protocol, Address, Port, SocketOptions, PoolName) of + case connect(Client, Protocol, Address, Port, SocketOptions, PoolName) of {ok, Socket} -> case client(Client, PoolName, Init, Protocol, Socket) of {ok, ClientState2} -> @@ -198,7 +200,7 @@ handle_msg({timeout, ExtRequestId}, {#state { true -> try Client:handle_timeout(ExtRequestId, ClientState) of {ok, Reply, ClientState2} -> - shackle_telemetry:handle_timeout(Client), + shackle_events:handle_timeout(Client), process_responses([Reply], State), {ok, {State, ClientState2}}; {error, Reason, ClientState2} -> @@ -214,8 +216,8 @@ handle_msg({timeout, ExtRequestId}, {#state { end; false -> case shackle_queue:remove(Queue, Id, ExtRequestId) of - {ok, Cast, _TimerRef} -> - shackle_telemetry:timeout(Client), + {ok, Cast, Request, _TimerRef} -> + shackle_events:timeout(Client, Request), reply({error, timeout}, Cast, State); {error, not_found} -> ok @@ -306,15 +308,17 @@ close(#state {id = Id} = State, ClientState) -> reply_all({error, socket_closed}, State), reconnect(State, ClientState). -connect(Protocol, Address, Port, SocketOptions, PoolName) -> +connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> + StartTime = erlang:monotonic_time(), case inet:getaddrs(Address, inet) of {ok, Ips} -> Ip = shackle_utils:random_element(Ips), case Protocol:connect(Ip, Port, SocketOptions) of {ok, Socket} -> + shackle_events:connected(Client, PoolName, StartTime), {ok, Socket}; {error, Reason} -> - ?WARN(PoolName, "connect error: ~p", [Reason]), + shackle_events:connection_error(Client, PoolName, Reason), {error, Reason} end; {error, Reason} -> @@ -339,7 +343,7 @@ handle_msg_data(Socket, Data, #state { socket = Socket } = State, ClientState) -> - shackle_telemetry:recv(Client, size(Data)), + shackle_events:recv(Client, Data), try Client:handle_data(Data, ClientState) of {ok, Replies, ClientState2} -> process_responses(Replies, State), @@ -359,12 +363,12 @@ handle_msg_data(_Socket, _Data, State, ClientState) -> {ok, {State, ClientState}}. handle_msg_error(Socket, Reason, #state { + client = Client, socket = Socket, pool_name = PoolName, protocol = Protocol } = State, ClientState) -> - - ?WARN(PoolName, "connection error: ~p", [Reason]), + shackle_events:connection_error(Client, PoolName, Reason), Protocol:close(Socket), close(State, ClientState); handle_msg_error(_Socket, _Reason, State, ClientState) -> @@ -378,16 +382,15 @@ process_responses([{ExtRequestId, Reply} | T], #state { queue = Queue } = State) -> - shackle_telemetry:replies(Client), + shackle_events:replies(Client), case shackle_queue:remove(Queue, Id, ExtRequestId) of - {ok, #cast {timestamp = Timestamp} = Cast, TimerRef} -> - shackle_telemetry:found(Client), - Diff = timer:now_diff(os:timestamp(), Timestamp), - shackle_telemetry:reply(Client, Diff), + {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> + shackle_events:found(Client), + shackle_events:reply(Client, Request, Reply, Timestamp), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); {error, not_found} -> - shackle_telemetry:not_found(Client), + shackle_events:not_found(Client), ok end, process_responses(T, State). @@ -478,7 +481,7 @@ reply_all(Reply, #state { reply_all(_Reply, [], _State) -> ok; -reply_all(Reply, [{Cast, TimerRef} | T], State) -> +reply_all(Reply, [{Cast, _Request, TimerRef} | T], State) -> erlang:cancel_timer(TimerRef), reply(Reply, Cast, State), reply_all(Reply, T, State). diff --git a/src/shackle_sup.erl b/src/shackle_sup.erl index ab75057..ac20384 100644 --- a/src/shackle_sup.erl +++ b/src/shackle_sup.erl @@ -23,6 +23,7 @@ start_link() -> {ok, {{one_for_one, 5, 10}, [supervisor:child_spec()]}}. init([]) -> + shackle_hooks:init(), shackle_pool:init(), shackle_status:init(), diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl deleted file mode 100644 index eee1ffc..0000000 --- a/src/shackle_telemetry.erl +++ /dev/null @@ -1,84 +0,0 @@ --module(shackle_telemetry). - --compile(inline). --compile({inline_size, 512}). - --export([ - backlog_full/1, - disabled/1, - found/1, - handle_timeout/1, - no_server/1, - not_found/1, - recv/2, - replies/1, - reply/2, - send/2, - timeout/1 -]). - --spec backlog_full(shackle:client()) -> ok. -backlog_full(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, backlog_full], Measurements, Metadata). - --spec disabled(shackle:client()) -> ok. -disabled(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, disabled], Measurements, Metadata). - --spec found(shackle:client()) -> ok. -found(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, found], Measurements, Metadata). - --spec handle_timeout(shackle:client()) -> ok. -handle_timeout(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, handle_timeout], Measurements, Metadata). - --spec no_server(shackle:client()) -> ok. -no_server(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, no_server], Measurements, Metadata). - --spec not_found(shackle:client()) -> ok. -not_found(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, not_found], Measurements, Metadata). - --spec recv(shackle:client(), non_neg_integer()) -> ok. -recv(Client, NBytes) -> - Measurements = #{count => 1, bytes => NBytes}, - Metadata = #{client => Client}, - telemetry:execute([shackle, recv], Measurements, Metadata). - --spec replies(shackle:client()) -> ok. -replies(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, replies], Measurements, Metadata). - --spec reply(shackle:client(), non_neg_integer()) -> ok. -reply(Client, Microseconds) -> - Measurements = #{duration => Microseconds}, - Metadata = #{client => Client}, - telemetry:execute([shackle, reply], Measurements, Metadata). - --spec send(shackle:client(), non_neg_integer()) -> ok. -send(Client, NBytes) -> - Measurements = #{count => 1, bytes => NBytes}, - Metadata = #{client => Client}, - telemetry:execute([shackle, send], Measurements, Metadata). - --spec timeout(shackle:client()) -> ok. -timeout(Client) -> - Measurements = #{count => 1}, - Metadata = #{client => Client}, - telemetry:execute([shackle, timeout], Measurements, Metadata).