From 951ed13d0a681eadd5f3a67572dd02baac43254b Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Tue, 1 Aug 2023 14:32:54 -0400 Subject: [PATCH 01/11] Add shackle_telemetry:connection_error/3 --- src/shackle_server.erl | 10 +++++----- src/shackle_telemetry.erl | 7 +++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/shackle_server.erl b/src/shackle_server.erl index cab2ef3..9fe5ded 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -167,7 +167,7 @@ handle_msg(?MSG_CONNECT, {#state { socket_options = SocketOptions } = State, ClientState}) -> - case connect(Protocol, Address, Port, SocketOptions, PoolName) of + case connect(Client, Protocol, Address, Port, SocketOptions, PoolName) of {ok, Socket} -> case client(Client, PoolName, Init, Protocol, Socket) of {ok, ClientState2} -> @@ -306,7 +306,7 @@ close(#state {id = Id} = State, ClientState) -> reply_all({error, socket_closed}, State), reconnect(State, ClientState). -connect(Protocol, Address, Port, SocketOptions, PoolName) -> +connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> case inet:getaddrs(Address, inet) of {ok, Ips} -> Ip = shackle_utils:random_element(Ips), @@ -314,7 +314,7 @@ connect(Protocol, Address, Port, SocketOptions, PoolName) -> {ok, Socket} -> {ok, Socket}; {error, Reason} -> - ?WARN(PoolName, "connect error: ~p", [Reason]), + shackle_telemetry:connection_error(Client, PoolName, Reason), {error, Reason} end; {error, Reason} -> @@ -359,12 +359,12 @@ handle_msg_data(_Socket, _Data, State, ClientState) -> {ok, {State, ClientState}}. handle_msg_error(Socket, Reason, #state { + client = Client, socket = Socket, pool_name = PoolName, protocol = Protocol } = State, ClientState) -> - - ?WARN(PoolName, "connection error: ~p", [Reason]), + shackle_telemetry:connection_error(Client, PoolName, Reason), Protocol:close(Socket), close(State, ClientState); handle_msg_error(_Socket, _Reason, State, ClientState) -> diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index eee1ffc..227efb5 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -5,6 +5,7 @@ -export([ backlog_full/1, + connection_error/3, disabled/1, found/1, handle_timeout/1, @@ -23,6 +24,12 @@ backlog_full(Client) -> Metadata = #{client => Client}, telemetry:execute([shackle, backlog_full], Measurements, Metadata). +-spec connection_error(shackle:client(), shackle_pool:name(), any()) -> ok. +connection_error(Client, PoolName, Reason) -> + Measurements = #{count => 1}, + Metadata = #{client => Client, pool_name => PoolName, reason => Reason}, + telemetry:execute([shackle, connection_error], Measurements, Metadata). + -spec disabled(shackle:client()) -> ok. disabled(Client) -> Measurements = #{count => 1}, From c6b1fd216ed777789d55817b5321ff059bb3dc7e Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Tue, 1 Aug 2023 14:44:14 -0400 Subject: [PATCH 02/11] Add shackle_telemetry:connected/2 --- src/shackle_server.erl | 3 +++ src/shackle_telemetry.erl | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/src/shackle_server.erl b/src/shackle_server.erl index 9fe5ded..3d1d332 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -307,11 +307,14 @@ close(#state {id = Id} = State, ClientState) -> reconnect(State, ClientState). connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> + StartTime = os:timestamp(), case inet:getaddrs(Address, inet) of {ok, Ips} -> Ip = shackle_utils:random_element(Ips), case Protocol:connect(Ip, Port, SocketOptions) of {ok, Socket} -> + Diff = timer:now_diff(os:timestamp(), StartTime), + shackle_telemetry:connected(Client, PoolName, Diff), {ok, Socket}; {error, Reason} -> shackle_telemetry:connection_error(Client, PoolName, Reason), diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index 227efb5..ec811b6 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -5,6 +5,7 @@ -export([ backlog_full/1, + connected/3, connection_error/3, disabled/1, found/1, @@ -24,6 +25,12 @@ backlog_full(Client) -> Metadata = #{client => Client}, telemetry:execute([shackle, backlog_full], Measurements, Metadata). +-spec connected(shackle:client(), shackle_pool:name(), non_neg_integer()) -> ok. +connected(Client, PoolName, Microseconds) -> + Measurements = #{count => 1, duration => Microseconds}, + Metadata = #{client => Client, pool_name => PoolName}, + telemetry:execute([shackle, connected], Measurements, Metadata). + -spec connection_error(shackle:client(), shackle_pool:name(), any()) -> ok. connection_error(Client, PoolName, Reason) -> Measurements = #{count => 1}, From 3d663676e4e8e87e414a07c291a37464e506623d Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Wed, 2 Aug 2023 13:43:12 -0400 Subject: [PATCH 03/11] Store shackle_client request data in shackle_queue --- src/shackle_queue.erl | 18 +++++++++--------- src/shackle_server.erl | 12 ++++++------ src/shackle_telemetry.erl | 16 ++++++++-------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/shackle_queue.erl b/src/shackle_queue.erl index 32038ac..c5b1276 100644 --- a/src/shackle_queue.erl +++ b/src/shackle_queue.erl @@ -5,7 +5,7 @@ %% internal -export([ - add/5, + add/6, clear/2, delete/1, new/1, @@ -14,16 +14,16 @@ ]). %% internal --spec add(shackle:table(), shackle_server:id(), shackle:external_request_id(), shackle:cast(), reference()) -> +-spec add(shackle:table(), shackle_server:id(), shackle:external_request_id(), term(), shackle:cast(), reference()) -> ok. -add(Table, ServerId, ExtRequestId, Cast, TimerRef) -> - Object = {{ServerId, ExtRequestId}, {Cast, TimerRef}}, +add(Table, ServerId, ExtRequestId, Request, Cast, TimerRef) -> + Object = {{ServerId, ExtRequestId}, {Cast, Request, TimerRef}}, ets:insert(Table, Object), ok. -spec clear(shackle:table(), shackle_server:id()) -> - [{shackle:cast(), reference()}]. + [{shackle:cast(), term(), reference()}]. clear(Table, ServerId) -> Match = {{ServerId, '_'}, '_'}, @@ -31,7 +31,7 @@ clear(Table, ServerId) -> [] -> []; Objects -> - [{Cast, TimerRef} || {_, {Cast, TimerRef}} <- Objects] + [{Cast, Request, TimerRef} || {_, {Cast, Request, TimerRef}} <- Objects] end. -spec delete(shackle_pool:name()) -> @@ -50,14 +50,14 @@ new(PoolName) -> ok. -spec remove(shackle:table(), shackle_server:id(), shackle:external_request_id()) -> - {ok, shackle:cast(), reference()} | {error, not_found}. + {ok, shackle:cast(), term(), reference()} | {error, not_found}. remove(Table, ServerId, ExtRequestId) -> case ets_take(Table, {ServerId, ExtRequestId}) of [] -> {error, not_found}; - [{_, {Cast, TimerRef}}] -> - {ok, Cast, TimerRef} + [{_, {Cast, Request, TimerRef}}] -> + {ok, Cast, Request, TimerRef} end. %% private diff --git a/src/shackle_server.erl b/src/shackle_server.erl index 3d1d332..f6d4098 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -122,7 +122,7 @@ handle_msg({Request, #cast { _ -> Msg = {timeout, ExtRequestId}, TimerRef = erlang:send_after(Timeout, self(), Msg), - shackle_queue:add(Queue, Id, ExtRequestId, Cast, + shackle_queue:add(Queue, Id, ExtRequestId, Request, Cast, TimerRef) end, {ok, {State, ClientState2}}; @@ -214,8 +214,8 @@ handle_msg({timeout, ExtRequestId}, {#state { end; false -> case shackle_queue:remove(Queue, Id, ExtRequestId) of - {ok, Cast, _TimerRef} -> - shackle_telemetry:timeout(Client), + {ok, Cast, Request, _TimerRef} -> + shackle_telemetry:timeout(Client, Request), reply({error, timeout}, Cast, State); {error, not_found} -> ok @@ -383,10 +383,10 @@ process_responses([{ExtRequestId, Reply} | T], #state { shackle_telemetry:replies(Client), case shackle_queue:remove(Queue, Id, ExtRequestId) of - {ok, #cast {timestamp = Timestamp} = Cast, TimerRef} -> + {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> shackle_telemetry:found(Client), Diff = timer:now_diff(os:timestamp(), Timestamp), - shackle_telemetry:reply(Client, Diff), + shackle_telemetry:reply(Client, Request, Diff), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); {error, not_found} -> @@ -481,7 +481,7 @@ reply_all(Reply, #state { reply_all(_Reply, [], _State) -> ok; -reply_all(Reply, [{Cast, TimerRef} | T], State) -> +reply_all(Reply, [{Cast, _Request, TimerRef} | T], State) -> erlang:cancel_timer(TimerRef), reply(Reply, Cast, State), reply_all(Reply, T, State). diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index ec811b6..ae41974 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -14,9 +14,9 @@ not_found/1, recv/2, replies/1, - reply/2, + reply/3, send/2, - timeout/1 + timeout/2 ]). -spec backlog_full(shackle:client()) -> ok. @@ -79,10 +79,10 @@ replies(Client) -> Metadata = #{client => Client}, telemetry:execute([shackle, replies], Measurements, Metadata). --spec reply(shackle:client(), non_neg_integer()) -> ok. -reply(Client, Microseconds) -> +-spec reply(shackle:client(), term(), non_neg_integer()) -> ok. +reply(Client, Request, Microseconds) -> Measurements = #{duration => Microseconds}, - Metadata = #{client => Client}, + Metadata = #{client => Client, request => Request}, telemetry:execute([shackle, reply], Measurements, Metadata). -spec send(shackle:client(), non_neg_integer()) -> ok. @@ -91,8 +91,8 @@ send(Client, NBytes) -> Metadata = #{client => Client}, telemetry:execute([shackle, send], Measurements, Metadata). --spec timeout(shackle:client()) -> ok. -timeout(Client) -> +-spec timeout(shackle:client(), term()) -> ok. +timeout(Client, Request) -> Measurements = #{count => 1}, - Metadata = #{client => Client}, + Metadata = #{client => Client, request => Request}, telemetry:execute([shackle, timeout], Measurements, Metadata). From c979b3d92fdd4d81affaa751c43a23cd3ae7fc1b Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Wed, 2 Aug 2023 13:58:47 -0400 Subject: [PATCH 04/11] Add shackle_client response to shackle_telemetry:reply --- src/shackle_server.erl | 2 +- src/shackle_telemetry.erl | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/shackle_server.erl b/src/shackle_server.erl index f6d4098..3b56c29 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -386,7 +386,7 @@ process_responses([{ExtRequestId, Reply} | T], #state { {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> shackle_telemetry:found(Client), Diff = timer:now_diff(os:timestamp(), Timestamp), - shackle_telemetry:reply(Client, Request, Diff), + shackle_telemetry:reply(Client, Request, Reply, Diff), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); {error, not_found} -> diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index ae41974..0a05d39 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -14,7 +14,7 @@ not_found/1, recv/2, replies/1, - reply/3, + reply/4, send/2, timeout/2 ]). @@ -79,10 +79,10 @@ replies(Client) -> Metadata = #{client => Client}, telemetry:execute([shackle, replies], Measurements, Metadata). --spec reply(shackle:client(), term(), non_neg_integer()) -> ok. -reply(Client, Request, Microseconds) -> +-spec reply(shackle:client(), term(), term(), non_neg_integer()) -> ok. +reply(Client, Request, Response, Microseconds) -> Measurements = #{duration => Microseconds}, - Metadata = #{client => Client, request => Request}, + Metadata = #{client => Client, request => Request, response => Response}, telemetry:execute([shackle, reply], Measurements, Metadata). -spec send(shackle:client(), non_neg_integer()) -> ok. From 5118a50b39f519f2206c21aa3dbaeb076b166d0c Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Wed, 2 Aug 2023 16:47:09 -0400 Subject: [PATCH 05/11] Add shackle_telemetry:queued_time/2 --- src/shackle_server.erl | 19 ++++++++++++------- src/shackle_telemetry.erl | 7 +++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/shackle_server.erl b/src/shackle_server.erl index 3b56c29..7c02d74 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -94,14 +94,16 @@ init(Name, Parent, Opts) -> -spec handle_msg(term(), {state(), client_state()}) -> {ok, term()}. -handle_msg({_, #cast {} = Cast}, {#state { - socket = undefined +handle_msg({_, #cast {timestamp = Timestamp} = Cast}, {#state { + socket = undefined, + client = Client } = State, ClientState}) -> - + shackle_telemetry:queued_time(Client, microseconds_since(Timestamp)), reply({error, no_socket}, Cast, State), {ok, {State, ClientState}}; handle_msg({Request, #cast { - timeout = Timeout + timeout = Timeout, + timestamp = Timestamp } = Cast}, {#state { client = Client, id = Id, @@ -110,7 +112,7 @@ handle_msg({Request, #cast { queue = Queue, socket = Socket } = State, ClientState}) -> - + shackle_telemetry:queued_time(Client, microseconds_since(Timestamp)), try Client:handle_request(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case Protocol:send(Socket, Data) of @@ -313,7 +315,7 @@ connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> Ip = shackle_utils:random_element(Ips), case Protocol:connect(Ip, Port, SocketOptions) of {ok, Socket} -> - Diff = timer:now_diff(os:timestamp(), StartTime), + Diff = microseconds_since(StartTime), shackle_telemetry:connected(Client, PoolName, Diff), {ok, Socket}; {error, Reason} -> @@ -385,7 +387,7 @@ process_responses([{ExtRequestId, Reply} | T], #state { case shackle_queue:remove(Queue, Id, ExtRequestId) of {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> shackle_telemetry:found(Client), - Diff = timer:now_diff(os:timestamp(), Timestamp), + Diff = microseconds_since(Timestamp), shackle_telemetry:reply(Client, Request, Reply, Diff), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); @@ -485,3 +487,6 @@ reply_all(Reply, [{Cast, _Request, TimerRef} | T], State) -> erlang:cancel_timer(TimerRef), reply(Reply, Cast, State), reply_all(Reply, T, State). + +microseconds_since(Timestamp) -> + timer:now_diff(os:timestamp(), Timestamp). diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index 0a05d39..a4c1390 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -12,6 +12,7 @@ handle_timeout/1, no_server/1, not_found/1, + queued_time/2, recv/2, replies/1, reply/4, @@ -67,6 +68,12 @@ not_found(Client) -> Metadata = #{client => Client}, telemetry:execute([shackle, not_found], Measurements, Metadata). +-spec queued_time(shackle:client(), non_neg_integer()) -> ok. +queued_time(Client, Microseconds) -> + Measurements = #{duration => Microseconds}, + Metadata = #{client => Client}, + telemetry:execute([shackle, queued_time], Measurements, Metadata). + -spec recv(shackle:client(), non_neg_integer()) -> ok. recv(Client, NBytes) -> Measurements = #{count => 1, bytes => NBytes}, From 9d3dee8467139b423059ec5c99e80c7d31281f91 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Wed, 2 Aug 2023 17:24:08 -0400 Subject: [PATCH 06/11] Replace usage of os:timestamp/0 with erlang:monotonic_time/0 As of OTP 18, the recommended way to measure elapsed time is not to use timer:now_diff/2, but to take two timestamps with erlang:monotonic_time/{0,1} and subtract them. --- include/shackle.hrl | 2 +- src/shackle.erl | 2 +- src/shackle_server.erl | 14 +++++++------- src/shackle_telemetry.erl | 12 ++++++------ 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/include/shackle.hrl b/include/shackle.hrl index 0c16205..2f8a14f 100644 --- a/include/shackle.hrl +++ b/include/shackle.hrl @@ -4,7 +4,7 @@ pid :: undefined | pid(), request_id :: shackle:request_id(), timeout :: timeout(), - timestamp :: erlang:timestamp() + timestamp :: integer() }). -record(reconnect_state, { diff --git a/src/shackle.erl b/src/shackle.erl index 278d536..5a37819 100644 --- a/src/shackle.erl +++ b/src/shackle.erl @@ -78,7 +78,7 @@ cast(PoolName, Request, Pid) -> {ok, request_id()} | {error, atom()}. cast(PoolName, Request, Pid, Timeout) -> - Timestamp = os:timestamp(), + Timestamp = erlang:monotonic_time(), Ref = make_ref(), case shackle_pool:server(PoolName) of {ok, Client, Server} -> diff --git a/src/shackle_server.erl b/src/shackle_server.erl index 7c02d74..6e3abaf 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -98,7 +98,7 @@ handle_msg({_, #cast {timestamp = Timestamp} = Cast}, {#state { socket = undefined, client = Client } = State, ClientState}) -> - shackle_telemetry:queued_time(Client, microseconds_since(Timestamp)), + shackle_telemetry:queued_time(Client, duration_since(Timestamp)), reply({error, no_socket}, Cast, State), {ok, {State, ClientState}}; handle_msg({Request, #cast { @@ -112,7 +112,7 @@ handle_msg({Request, #cast { queue = Queue, socket = Socket } = State, ClientState}) -> - shackle_telemetry:queued_time(Client, microseconds_since(Timestamp)), + shackle_telemetry:queued_time(Client, duration_since(Timestamp)), try Client:handle_request(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case Protocol:send(Socket, Data) of @@ -309,13 +309,13 @@ close(#state {id = Id} = State, ClientState) -> reconnect(State, ClientState). connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> - StartTime = os:timestamp(), + StartTime = erlang:monotonic_time(), case inet:getaddrs(Address, inet) of {ok, Ips} -> Ip = shackle_utils:random_element(Ips), case Protocol:connect(Ip, Port, SocketOptions) of {ok, Socket} -> - Diff = microseconds_since(StartTime), + Diff = duration_since(StartTime), shackle_telemetry:connected(Client, PoolName, Diff), {ok, Socket}; {error, Reason} -> @@ -387,7 +387,7 @@ process_responses([{ExtRequestId, Reply} | T], #state { case shackle_queue:remove(Queue, Id, ExtRequestId) of {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> shackle_telemetry:found(Client), - Diff = microseconds_since(Timestamp), + Diff = duration_since(Timestamp), shackle_telemetry:reply(Client, Request, Reply, Diff), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); @@ -488,5 +488,5 @@ reply_all(Reply, [{Cast, _Request, TimerRef} | T], State) -> reply(Reply, Cast, State), reply_all(Reply, T, State). -microseconds_since(Timestamp) -> - timer:now_diff(os:timestamp(), Timestamp). +duration_since(Timestamp) -> + erlang:monotonic_time() - Timestamp. diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index a4c1390..b8d7739 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -27,8 +27,8 @@ backlog_full(Client) -> telemetry:execute([shackle, backlog_full], Measurements, Metadata). -spec connected(shackle:client(), shackle_pool:name(), non_neg_integer()) -> ok. -connected(Client, PoolName, Microseconds) -> - Measurements = #{count => 1, duration => Microseconds}, +connected(Client, PoolName, Duration) -> + Measurements = #{count => 1, duration => Duration}, Metadata = #{client => Client, pool_name => PoolName}, telemetry:execute([shackle, connected], Measurements, Metadata). @@ -69,8 +69,8 @@ not_found(Client) -> telemetry:execute([shackle, not_found], Measurements, Metadata). -spec queued_time(shackle:client(), non_neg_integer()) -> ok. -queued_time(Client, Microseconds) -> - Measurements = #{duration => Microseconds}, +queued_time(Client, Duration) -> + Measurements = #{duration => Duration}, Metadata = #{client => Client}, telemetry:execute([shackle, queued_time], Measurements, Metadata). @@ -87,8 +87,8 @@ replies(Client) -> telemetry:execute([shackle, replies], Measurements, Metadata). -spec reply(shackle:client(), term(), term(), non_neg_integer()) -> ok. -reply(Client, Request, Response, Microseconds) -> - Measurements = #{duration => Microseconds}, +reply(Client, Request, Response, Duration) -> + Measurements = #{duration => Duration}, Metadata = #{client => Client, request => Request, response => Response}, telemetry:execute([shackle, reply], Measurements, Metadata). From e3d8a347ee9be986a5cc2ea5adf5807f0f7519ae Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Mon, 14 Aug 2023 13:52:27 -0400 Subject: [PATCH 07/11] Remove telemetry, re-add shackle_hooks --- rebar.config | 3 +-- rebar.lock | 3 +-- src/shackle.app.src | 2 +- src/shackle_hooks.erl | 48 +++++++++++++++++++++++++++++++++++++++ src/shackle_sup.erl | 1 + src/shackle_telemetry.erl | 28 +++++++++++------------ 6 files changed, 66 insertions(+), 19 deletions(-) create mode 100644 src/shackle_hooks.erl diff --git a/rebar.config b/rebar.config index 5527149..24d3d27 100644 --- a/rebar.config +++ b/rebar.config @@ -7,8 +7,7 @@ {deps, [ {foil, "0.1.3"}, {granderl, {git, "https://github.com/tokenrove/granderl.git", {ref, "baafd1bc825cb1fc022760eae913f774fa6af91b"}}}, - {metal, "0.1.1"}, - {telemetry, "1.2.1"} + {metal, "0.1.1"} ]}. {dialyzer, [{plt_extra_apps, [public_key]}]}. diff --git a/rebar.lock b/rebar.lock index a106a08..560f745 100644 --- a/rebar.lock +++ b/rebar.lock @@ -4,8 +4,7 @@ {git,"https://github.com/tokenrove/granderl.git", {ref,"baafd1bc825cb1fc022760eae913f774fa6af91b"}}, 0}, - {<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}, - {<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.2.1">>},0}]}. + {<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}]}. [ {pkg_hash,[ {<<"foil">>, <<"415835CA94A8D0A55AB3D334FE2D1A1DCF36E7A0F69789050765770B6BF5E6E9">>}, diff --git a/src/shackle.app.src b/src/shackle.app.src index 3eaf998..d34d446 100644 --- a/src/shackle.app.src +++ b/src/shackle.app.src @@ -1,5 +1,5 @@ {application, shackle, [ - {applications, [kernel, stdlib, granderl, metal, foil, ssl, telemetry]}, + {applications, [kernel, stdlib, granderl, metal, foil, ssl]}, {description, "High-Performance Erlang Network Client Framework"}, {env, []}, {licenses, ["MIT"]}, diff --git a/src/shackle_hooks.erl b/src/shackle_hooks.erl new file mode 100644 index 0000000..61aed6d --- /dev/null +++ b/src/shackle_hooks.erl @@ -0,0 +1,48 @@ +-module(shackle_hooks). +-include("shackle_internal.hrl"). + +-dialyzer({nowarn_function, event/3}). +-ignore_xref([ + {shackle_hooks_foil, lookup, 1} +]). + +-compile(inline). +-compile({inline_size, 512}). + +-export([ + init/0, + event/3 +]). + +%% callbacks +-optional_callbacks([event/3]). + +-type event_name() :: [atom(), ...]. +-type event_measurements() :: map(). +-type event_metadata() :: map(). + +-callback event(event_name(), event_measurements(), event_metadata()) -> + ok. + +%% public +-spec init() -> + ok. + +init() -> + Hooks = ?GET_ENV(hooks, []), + Metrics = ?LOOKUP(metrics, Hooks, undefined), + foil:new(?MODULE), + foil:insert(?MODULE, metrics, Metrics), + foil:load(?MODULE), + ok. + +-spec event(event_name(), event_measurements(), event_metadata()) -> + ok. + +event(EventName, Measurements, Metadata) -> + case shackle_hooks_foil:lookup(metrics) of + {ok, {M, F}} -> + M:F(EventName, Measurements, Metadata); + _ -> + ok + end. diff --git a/src/shackle_sup.erl b/src/shackle_sup.erl index ab75057..ac20384 100644 --- a/src/shackle_sup.erl +++ b/src/shackle_sup.erl @@ -23,6 +23,7 @@ start_link() -> {ok, {{one_for_one, 5, 10}, [supervisor:child_spec()]}}. init([]) -> + shackle_hooks:init(), shackle_pool:init(), shackle_status:init(), diff --git a/src/shackle_telemetry.erl b/src/shackle_telemetry.erl index b8d7739..1517222 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_telemetry.erl @@ -24,82 +24,82 @@ backlog_full(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, backlog_full], Measurements, Metadata). + shackle_hooks:event([shackle, backlog_full], Measurements, Metadata). -spec connected(shackle:client(), shackle_pool:name(), non_neg_integer()) -> ok. connected(Client, PoolName, Duration) -> Measurements = #{count => 1, duration => Duration}, Metadata = #{client => Client, pool_name => PoolName}, - telemetry:execute([shackle, connected], Measurements, Metadata). + shackle_hooks:event([shackle, connected], Measurements, Metadata). -spec connection_error(shackle:client(), shackle_pool:name(), any()) -> ok. connection_error(Client, PoolName, Reason) -> Measurements = #{count => 1}, Metadata = #{client => Client, pool_name => PoolName, reason => Reason}, - telemetry:execute([shackle, connection_error], Measurements, Metadata). + shackle_hooks:event([shackle, connection_error], Measurements, Metadata). -spec disabled(shackle:client()) -> ok. disabled(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, disabled], Measurements, Metadata). + shackle_hooks:event([shackle, disabled], Measurements, Metadata). -spec found(shackle:client()) -> ok. found(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, found], Measurements, Metadata). + shackle_hooks:event([shackle, found], Measurements, Metadata). -spec handle_timeout(shackle:client()) -> ok. handle_timeout(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, handle_timeout], Measurements, Metadata). + shackle_hooks:event([shackle, handle_timeout], Measurements, Metadata). -spec no_server(shackle:client()) -> ok. no_server(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, no_server], Measurements, Metadata). + shackle_hooks:event([shackle, no_server], Measurements, Metadata). -spec not_found(shackle:client()) -> ok. not_found(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, not_found], Measurements, Metadata). + shackle_hooks:event([shackle, not_found], Measurements, Metadata). -spec queued_time(shackle:client(), non_neg_integer()) -> ok. queued_time(Client, Duration) -> Measurements = #{duration => Duration}, Metadata = #{client => Client}, - telemetry:execute([shackle, queued_time], Measurements, Metadata). + shackle_hooks:event([shackle, queued_time], Measurements, Metadata). -spec recv(shackle:client(), non_neg_integer()) -> ok. recv(Client, NBytes) -> Measurements = #{count => 1, bytes => NBytes}, Metadata = #{client => Client}, - telemetry:execute([shackle, recv], Measurements, Metadata). + shackle_hooks:event([shackle, recv], Measurements, Metadata). -spec replies(shackle:client()) -> ok. replies(Client) -> Measurements = #{count => 1}, Metadata = #{client => Client}, - telemetry:execute([shackle, replies], Measurements, Metadata). + shackle_hooks:event([shackle, replies], Measurements, Metadata). -spec reply(shackle:client(), term(), term(), non_neg_integer()) -> ok. reply(Client, Request, Response, Duration) -> Measurements = #{duration => Duration}, Metadata = #{client => Client, request => Request, response => Response}, - telemetry:execute([shackle, reply], Measurements, Metadata). + shackle_hooks:event([shackle, reply], Measurements, Metadata). -spec send(shackle:client(), non_neg_integer()) -> ok. send(Client, NBytes) -> Measurements = #{count => 1, bytes => NBytes}, Metadata = #{client => Client}, - telemetry:execute([shackle, send], Measurements, Metadata). + shackle_hooks:event([shackle, send], Measurements, Metadata). -spec timeout(shackle:client(), term()) -> ok. timeout(Client, Request) -> Measurements = #{count => 1}, Metadata = #{client => Client, request => Request}, - telemetry:execute([shackle, timeout], Measurements, Metadata). + shackle_hooks:event([shackle, timeout], Measurements, Metadata). From 6090a5ed694c0d806291f8918c30bd7d7b8f6ee1 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Tue, 15 Aug 2023 10:37:36 -0400 Subject: [PATCH 08/11] Rename shackle_telemetry to shackle_events --- ...ackle_telemetry.erl => shackle_events.erl} | 2 +- src/shackle_pool.erl | 6 ++--- src/shackle_server.erl | 26 +++++++++---------- 3 files changed, 17 insertions(+), 17 deletions(-) rename src/{shackle_telemetry.erl => shackle_events.erl} (99%) diff --git a/src/shackle_telemetry.erl b/src/shackle_events.erl similarity index 99% rename from src/shackle_telemetry.erl rename to src/shackle_events.erl index 1517222..9862a73 100644 --- a/src/shackle_telemetry.erl +++ b/src/shackle_events.erl @@ -1,4 +1,4 @@ --module(shackle_telemetry). +-module(shackle_events). -compile(inline). -compile({inline_size, 512}). diff --git a/src/shackle_pool.erl b/src/shackle_pool.erl index c4c864b..6ec3f70 100644 --- a/src/shackle_pool.erl +++ b/src/shackle_pool.erl @@ -165,7 +165,7 @@ server(_Name, #pool_options { client = Client }, 0) -> - shackle_telemetry:no_server(Client), + shackle_events:no_server(Client), {error, no_server}; server(Name, #pool_options { backlog_size = BacklogSize, @@ -183,11 +183,11 @@ server(Name, #pool_options { true -> {ok, Client, ServerName}; false -> - shackle_telemetry:backlog_full(Client), + shackle_events:backlog_full(Client), server(Name, Options, N - 1) end; false -> - shackle_telemetry:disabled(Client), + shackle_events:disabled(Client), server(Name, Options, N - 1) end. diff --git a/src/shackle_server.erl b/src/shackle_server.erl index 6e3abaf..0f84762 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -98,7 +98,7 @@ handle_msg({_, #cast {timestamp = Timestamp} = Cast}, {#state { socket = undefined, client = Client } = State, ClientState}) -> - shackle_telemetry:queued_time(Client, duration_since(Timestamp)), + shackle_events:queued_time(Client, duration_since(Timestamp)), reply({error, no_socket}, Cast, State), {ok, {State, ClientState}}; handle_msg({Request, #cast { @@ -112,12 +112,12 @@ handle_msg({Request, #cast { queue = Queue, socket = Socket } = State, ClientState}) -> - shackle_telemetry:queued_time(Client, duration_since(Timestamp)), + shackle_events:queued_time(Client, duration_since(Timestamp)), try Client:handle_request(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case Protocol:send(Socket, Data) of ok -> - shackle_telemetry:send(Client, iolist_size(Data)), + shackle_events:send(Client, iolist_size(Data)), case ExtRequestId of undefined -> reply(ok, Cast, State); @@ -200,7 +200,7 @@ handle_msg({timeout, ExtRequestId}, {#state { true -> try Client:handle_timeout(ExtRequestId, ClientState) of {ok, Reply, ClientState2} -> - shackle_telemetry:handle_timeout(Client), + shackle_events:handle_timeout(Client), process_responses([Reply], State), {ok, {State, ClientState2}}; {error, Reason, ClientState2} -> @@ -217,7 +217,7 @@ handle_msg({timeout, ExtRequestId}, {#state { false -> case shackle_queue:remove(Queue, Id, ExtRequestId) of {ok, Cast, Request, _TimerRef} -> - shackle_telemetry:timeout(Client, Request), + shackle_events:timeout(Client, Request), reply({error, timeout}, Cast, State); {error, not_found} -> ok @@ -316,10 +316,10 @@ connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> case Protocol:connect(Ip, Port, SocketOptions) of {ok, Socket} -> Diff = duration_since(StartTime), - shackle_telemetry:connected(Client, PoolName, Diff), + shackle_events:connected(Client, PoolName, Diff), {ok, Socket}; {error, Reason} -> - shackle_telemetry:connection_error(Client, PoolName, Reason), + shackle_events:connection_error(Client, PoolName, Reason), {error, Reason} end; {error, Reason} -> @@ -344,7 +344,7 @@ handle_msg_data(Socket, Data, #state { socket = Socket } = State, ClientState) -> - shackle_telemetry:recv(Client, size(Data)), + shackle_events:recv(Client, size(Data)), try Client:handle_data(Data, ClientState) of {ok, Replies, ClientState2} -> process_responses(Replies, State), @@ -369,7 +369,7 @@ handle_msg_error(Socket, Reason, #state { pool_name = PoolName, protocol = Protocol } = State, ClientState) -> - shackle_telemetry:connection_error(Client, PoolName, Reason), + shackle_events:connection_error(Client, PoolName, Reason), Protocol:close(Socket), close(State, ClientState); handle_msg_error(_Socket, _Reason, State, ClientState) -> @@ -383,16 +383,16 @@ process_responses([{ExtRequestId, Reply} | T], #state { queue = Queue } = State) -> - shackle_telemetry:replies(Client), + shackle_events:replies(Client), case shackle_queue:remove(Queue, Id, ExtRequestId) of {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> - shackle_telemetry:found(Client), + shackle_events:found(Client), Diff = duration_since(Timestamp), - shackle_telemetry:reply(Client, Request, Reply, Diff), + shackle_events:reply(Client, Request, Reply, Diff), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); {error, not_found} -> - shackle_telemetry:not_found(Client), + shackle_events:not_found(Client), ok end, process_responses(T, State). From 3565ea074e723171ada70115bcd5fa39b053110d Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Tue, 15 Aug 2023 10:43:34 -0400 Subject: [PATCH 09/11] Add instructions for instrumenting with telemetry --- README.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ffde1d4..79eff74 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,17 @@ shackle_pool:start(shackle_pool:name(), client(), client_options(), pool_options ``` ## Telemetry -Shackle integrates with the backend-agnostic [telemetry](https://hexdocs.pm/telemetry/) library. See `shackle_telemetry` for the list of telemetry events that shackle can emit. + +It is straightforward to integrate Shackle with the backend-agnostic [telemetry](https://hexdocs.pm/telemetry/) library. +Because the calling convention in `shackle_hooks` closely resembles `telemetry:execute`, it is possible configure Shackle like so: + +``` +{shackle, [ + {hooks, [ + {metrics, {telemetry, execute}} + ]} +]} +``` ## Tests From e02935901f5afe6393ae82835b200e1e33001393 Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Tue, 15 Aug 2023 10:53:16 -0400 Subject: [PATCH 10/11] s/metrics/events --- README.md | 2 +- src/shackle_hooks.erl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 79eff74..897b5c2 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,7 @@ Because the calling convention in `shackle_hooks` closely resembles `telemetry:e ``` {shackle, [ {hooks, [ - {metrics, {telemetry, execute}} + {events, {telemetry, execute}} ]} ]} ``` diff --git a/src/shackle_hooks.erl b/src/shackle_hooks.erl index 61aed6d..d815d28 100644 --- a/src/shackle_hooks.erl +++ b/src/shackle_hooks.erl @@ -30,9 +30,9 @@ init() -> Hooks = ?GET_ENV(hooks, []), - Metrics = ?LOOKUP(metrics, Hooks, undefined), + Events = ?LOOKUP(events, Hooks, undefined), foil:new(?MODULE), - foil:insert(?MODULE, metrics, Metrics), + foil:insert(?MODULE, events, Events), foil:load(?MODULE), ok. @@ -40,7 +40,7 @@ init() -> ok. event(EventName, Measurements, Metadata) -> - case shackle_hooks_foil:lookup(metrics) of + case shackle_hooks_foil:lookup(events) of {ok, {M, F}} -> M:F(EventName, Measurements, Metadata); _ -> From ecda745fd7f9b68ca10ce4f9436b01d737045dce Mon Sep 17 00:00:00 2001 From: Richard Kallos Date: Wed, 16 Aug 2023 13:46:49 -0400 Subject: [PATCH 11/11] Add macro to avoid computing shackle_events data --- src/shackle_events.erl | 99 ++++++++++++++++++++++++++++++------------ src/shackle_hooks.erl | 16 +++---- src/shackle_server.erl | 17 +++----- 3 files changed, 83 insertions(+), 49 deletions(-) diff --git a/src/shackle_events.erl b/src/shackle_events.erl index 9862a73..77af75e 100644 --- a/src/shackle_events.erl +++ b/src/shackle_events.erl @@ -20,86 +20,129 @@ timeout/2 ]). +-define(EVENT(Block), + case shackle_hooks:handler() of + {M, F} -> + {EventName, Measurements, Metadata} = Block, + M:F(EventName, Measurements, Metadata); + _ -> + ok + end +). + -spec backlog_full(shackle:client()) -> ok. backlog_full(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, backlog_full], Measurements, Metadata). + {[shackle, backlog_full], Measurements, Metadata} +end). --spec connected(shackle:client(), shackle_pool:name(), non_neg_integer()) -> ok. -connected(Client, PoolName, Duration) -> - Measurements = #{count => 1, duration => Duration}, +-spec connected(shackle:client(), shackle_pool:name(), integer()) -> ok. +connected(Client, PoolName, StartTime) -> +?EVENT(begin + Measurements = #{count => 1, duration => duration_since(StartTime)}, Metadata = #{client => Client, pool_name => PoolName}, - shackle_hooks:event([shackle, connected], Measurements, Metadata). + {[shackle, connected], Measurements, Metadata} +end). -spec connection_error(shackle:client(), shackle_pool:name(), any()) -> ok. connection_error(Client, PoolName, Reason) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client, pool_name => PoolName, reason => Reason}, - shackle_hooks:event([shackle, connection_error], Measurements, Metadata). + {[shackle, connection_error], Measurements, Metadata} +end). -spec disabled(shackle:client()) -> ok. disabled(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, disabled], Measurements, Metadata). + {[shackle, disabled], Measurements, Metadata} +end). -spec found(shackle:client()) -> ok. found(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, found], Measurements, Metadata). + {[shackle, found], Measurements, Metadata} +end). -spec handle_timeout(shackle:client()) -> ok. handle_timeout(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, handle_timeout], Measurements, Metadata). + {[shackle, handle_timeout], Measurements, Metadata} +end). -spec no_server(shackle:client()) -> ok. no_server(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, no_server], Measurements, Metadata). + {[shackle, no_server], Measurements, Metadata} +end). -spec not_found(shackle:client()) -> ok. not_found(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, not_found], Measurements, Metadata). + {[shackle, not_found], Measurements, Metadata} +end). -spec queued_time(shackle:client(), non_neg_integer()) -> ok. -queued_time(Client, Duration) -> - Measurements = #{duration => Duration}, +queued_time(Client, StartTime) -> +?EVENT(begin + Measurements = #{duration => duration_since(StartTime)}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, queued_time], Measurements, Metadata). + {[shackle, queued_time], Measurements, Metadata} +end). --spec recv(shackle:client(), non_neg_integer()) -> ok. -recv(Client, NBytes) -> - Measurements = #{count => 1, bytes => NBytes}, +-spec recv(shackle:client(), iodata()) -> ok. +recv(Client, Data) -> +?EVENT(begin + Measurements = #{count => 1, bytes => iolist_size(Data)}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, recv], Measurements, Metadata). + {[shackle, recv], Measurements, Metadata} +end). -spec replies(shackle:client()) -> ok. replies(Client) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, replies], Measurements, Metadata). + {[shackle, replies], Measurements, Metadata} +end). --spec reply(shackle:client(), term(), term(), non_neg_integer()) -> ok. -reply(Client, Request, Response, Duration) -> - Measurements = #{duration => Duration}, +-spec reply(shackle:client(), term(), term(), integer()) -> ok. +reply(Client, Request, Response, Timestamp) -> +?EVENT(begin + Measurements = #{duration => duration_since(Timestamp)}, Metadata = #{client => Client, request => Request, response => Response}, - shackle_hooks:event([shackle, reply], Measurements, Metadata). + {[shackle, reply], Measurements, Metadata} +end). --spec send(shackle:client(), non_neg_integer()) -> ok. -send(Client, NBytes) -> - Measurements = #{count => 1, bytes => NBytes}, +-spec send(shackle:client(), iodata()) -> ok. +send(Client, Data) -> +?EVENT(begin + Measurements = #{count => 1, bytes => iolist_size(Data)}, Metadata = #{client => Client}, - shackle_hooks:event([shackle, send], Measurements, Metadata). + {[shackle, send], Measurements, Metadata} +end). -spec timeout(shackle:client(), term()) -> ok. timeout(Client, Request) -> +?EVENT(begin Measurements = #{count => 1}, Metadata = #{client => Client, request => Request}, - shackle_hooks:event([shackle, timeout], Measurements, Metadata). + {[shackle, timeout], Measurements, Metadata} +end). + +%% private + +duration_since(Timestamp) -> + erlang:monotonic_time() - Timestamp. diff --git a/src/shackle_hooks.erl b/src/shackle_hooks.erl index d815d28..b8bb892 100644 --- a/src/shackle_hooks.erl +++ b/src/shackle_hooks.erl @@ -1,7 +1,7 @@ -module(shackle_hooks). -include("shackle_internal.hrl"). --dialyzer({nowarn_function, event/3}). +-dialyzer({nowarn_function, handler/0}). -ignore_xref([ {shackle_hooks_foil, lookup, 1} ]). @@ -11,7 +11,7 @@ -export([ init/0, - event/3 + handler/0 ]). %% callbacks @@ -36,13 +36,9 @@ init() -> foil:load(?MODULE), ok. --spec event(event_name(), event_measurements(), event_metadata()) -> - ok. - -event(EventName, Measurements, Metadata) -> +-spec handler() -> {atom(), atom()} | undefined. +handler() -> case shackle_hooks_foil:lookup(events) of - {ok, {M, F}} -> - M:F(EventName, Measurements, Metadata); - _ -> - ok + {ok, {M, F}} -> {M, F}; + _ -> undefined end. diff --git a/src/shackle_server.erl b/src/shackle_server.erl index 0f84762..f06c1e6 100644 --- a/src/shackle_server.erl +++ b/src/shackle_server.erl @@ -98,7 +98,7 @@ handle_msg({_, #cast {timestamp = Timestamp} = Cast}, {#state { socket = undefined, client = Client } = State, ClientState}) -> - shackle_events:queued_time(Client, duration_since(Timestamp)), + shackle_events:queued_time(Client, Timestamp), reply({error, no_socket}, Cast, State), {ok, {State, ClientState}}; handle_msg({Request, #cast { @@ -112,12 +112,12 @@ handle_msg({Request, #cast { queue = Queue, socket = Socket } = State, ClientState}) -> - shackle_events:queued_time(Client, duration_since(Timestamp)), + shackle_events:queued_time(Client, Timestamp), try Client:handle_request(Request, ClientState) of {ok, ExtRequestId, Data, ClientState2} -> case Protocol:send(Socket, Data) of ok -> - shackle_events:send(Client, iolist_size(Data)), + shackle_events:send(Client, Data), case ExtRequestId of undefined -> reply(ok, Cast, State); @@ -315,8 +315,7 @@ connect(Client, Protocol, Address, Port, SocketOptions, PoolName) -> Ip = shackle_utils:random_element(Ips), case Protocol:connect(Ip, Port, SocketOptions) of {ok, Socket} -> - Diff = duration_since(StartTime), - shackle_events:connected(Client, PoolName, Diff), + shackle_events:connected(Client, PoolName, StartTime), {ok, Socket}; {error, Reason} -> shackle_events:connection_error(Client, PoolName, Reason), @@ -344,7 +343,7 @@ handle_msg_data(Socket, Data, #state { socket = Socket } = State, ClientState) -> - shackle_events:recv(Client, size(Data)), + shackle_events:recv(Client, Data), try Client:handle_data(Data, ClientState) of {ok, Replies, ClientState2} -> process_responses(Replies, State), @@ -387,8 +386,7 @@ process_responses([{ExtRequestId, Reply} | T], #state { case shackle_queue:remove(Queue, Id, ExtRequestId) of {ok, #cast {timestamp = Timestamp} = Cast, Request, TimerRef} -> shackle_events:found(Client), - Diff = duration_since(Timestamp), - shackle_events:reply(Client, Request, Reply, Diff), + shackle_events:reply(Client, Request, Reply, Timestamp), erlang:cancel_timer(TimerRef), reply(Reply, Cast, State); {error, not_found} -> @@ -487,6 +485,3 @@ reply_all(Reply, [{Cast, _Request, TimerRef} | T], State) -> erlang:cancel_timer(TimerRef), reply(Reply, Cast, State), reply_all(Reply, T, State). - -duration_since(Timestamp) -> - erlang:monotonic_time() - Timestamp.