Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions src/hello_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@

%% Behaviour callbacks
-callback init_transport(#ex_uri{}, trans_opts()) ->
{ok, ClientState :: term()} | {error, Reason :: term()}.
{ok | browse, ClientState :: term()} | {error, Reason :: term()}.

-callback send_request(Reqquest :: binary(), signature(), ClientState :: term()) ->
{ok, NewClietnState :: term()} | {error, Reason :: term(), ClietnState :: term()}.

-callback handle_info(Msg :: term(), ClientState :: term()) ->
{?INCOMING_MSG, Message :: term} | {noreply, NewClietnState :: term()}.

-callback update_service({dnssd:name(), dnssd:type(), dnssd:domain()}, ClientState :: term()) ->
{ok, NewClietnState :: term()}.

-callback terminate_transport(Reason :: term(), ClientState :: term()) -> ok.


Expand Down Expand Up @@ -119,6 +125,7 @@ timeout_call(Client, Call, Timeout) ->
%% ----------------------------------------------------------------------------------------------------
%% -- gen_server callbacks
-record(client_state, {
uri = #ex_uri{},
transport_mod :: module(),
transport_state :: term(),
protocol_mod :: atom(),
Expand All @@ -127,7 +134,8 @@ timeout_call(Client, Call, Timeout) ->
async_request_map = gb_trees:empty() :: gb_trees:tree(),
keep_alive_interval :: number(),
keep_alive_ref :: term(),
notification_sink :: pid() | function()
notification_sink :: pid() | function(),
services = [] :: list({binary(), binary(), binary()})
}).

%% @hidden
Expand Down Expand Up @@ -172,13 +180,33 @@ handle_info(?PING, State = #client_state{transport_mod=TransportModule, transpor
{ok, NewTimerRef} = timer:send_after(KeepAliveInterval, self(), ?PING),
{noreply, State#client_state{transport_state = NewTransportState, keep_alive_ref = NewTimerRef}};

handle_info({dnssd, _Ref, {browse, _, _} = Msg},
#client_state{transport_mod = TransportModule, transport_state = TransportState,
uri = URI, services = Services} = State) ->
?LOG_DEBUG("dnssd Msg: ~p", [Msg]),
NewServices = update_services(Msg, Services),
?LOG_DEBUG("Services: ~p for ~p", [NewServices, ex_uri:encode(URI)]),
case NewServices of
[] ->
?LOG_WARNING("service ~p not available", [ex_uri:encode(URI)]),
NewTransportState = TransportState;
[{Name, Type, Domain} | _] ->
{ok, Service} = dnssd:resolve_sync(Name, Type, Domain),
?LOG_INFO("Update service ~p for ~p", [Service, ex_uri:encode(URI)]),
{ok, NewTransportState} = TransportModule:update_service(clean_host(Service), TransportState)
end,
{noreply, State#client_state{transport_state = NewTransportState, services = NewServices}};

handle_info(Info, State = #client_state{transport_mod=TransportModule, transport_state=TransportState}) ->
case TransportModule:handle_info(Info, TransportState) of
{?INCOMING_MSG, Message} ->
incoming_message(Message, State);
{noreply, NewTransportState} ->
{noreply, State#client_state{transport_state = NewTransportState}}
end.
end;

handle_info(Info, _State) ->
?LOG_INFO("unhandeld message ~p in handle_info", [Info]).

%% @hidden
terminate(Reason, #client_state{transport_mod = TransportModule, transport_state = TransportState, keep_alive_ref = TimerRef}) ->
Expand All @@ -197,13 +225,15 @@ code_change(_FromVsn, _ToVsn, State) ->
%% -- Helper functions
init_transport(TransportModule, URIRec, TransportOpts, ProtocolOpts, ClientOpts) ->
case TransportModule:init_transport(URIRec, TransportOpts) of
{ok, TransportState} ->
{OkOrBrowse, TransportState} when OkOrBrowse =:= ok; OkOrBrowse =:= browse ->
NotificationSink = proplists:get_value(notification_sink, ProtocolOpts, undefined),
ProtocolMod = proplists:get_value(protocol, ProtocolOpts, hello_proto_jsonrpc),
OkOrBrowse =:= browse andalso browse(URIRec),
case hello_proto:init_client(ProtocolMod, ProtocolOpts) of
{ok, ProtocolState} ->
hello_metrics:client(1),
State = #client_state{transport_mod = TransportModule,
State = #client_state{uri = URIRec,
transport_mod = TransportModule,
transport_state = TransportState,
protocol_mod = ProtocolMod,
protocol_opts = ProtocolOpts,
Expand All @@ -225,6 +255,17 @@ evaluate_client_options(ClientOpts, State) ->
{ok, State#client_state{keep_alive_interval = KeepAliveInterval, keep_alive_ref = TimerRef}}
end.

clean_host({Host, Port, Txt}) ->
HostSize = erlang:byte_size(Host),
CleanedHost = case binary:match(Host, <<".local.">>) of
{M, L} when HostSize == (M + L) ->
<<HostCuted:M/binary, _/binary>> = Host,
HostCuted;
_ ->
Host
end,
{binary_to_list(CleanedHost), Port, Txt}.

incoming_message({error, _Reason, NewTransportState}, State) -> %%will be logged later
{noreply, State#client_state{transport_state = NewTransportState}};
incoming_message({ok, Signature, BinResponse, NewTransportState},
Expand Down Expand Up @@ -267,13 +308,20 @@ outgoing_message(Request, From, State = #client_state{protocol_mod = ProtocolMod
ignore -> {ok, State}
end.

update_services({browse, add, Service}, Services) -> Services ++ [Service];
update_services({browse, remove, Service}, Services) -> Services -- [Service];
update_services(_, Services) -> Services.

update_map(Requests, From, AsyncMap0) when is_list(Requests) ->
lists:foldl(fun(Request, AsyncMap) ->
update_map(Request, From, AsyncMap)
end, AsyncMap0, Requests);
update_map(#request{id = undefined}, _From, AsyncMap) -> AsyncMap;
update_map(#request{id = RequestId} = Request, From, AsyncMap) -> gb_trees:enter(RequestId, {From, Request}, AsyncMap).

browse(#ex_uri{scheme = Scheme, authority = #ex_uri_authority{host = Host}, path = [$/|Path]}) ->
dnssd:browse(hello_lib:dnssd_service_type(Scheme, Host, Path)).

handle_internal(?PONG, State = #client_state{keep_alive_interval = KeepAliveInterval, keep_alive_ref = TimerRef}) ->
timer:cancel(TimerRef),
{ok, NewTimerRef} = timer:send_after(KeepAliveInterval, self(), ?PING),
Expand Down
15 changes: 14 additions & 1 deletion src/hello_lib.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-module(hello_lib).
-export([get_in/2, to_binary/1, wrap/1, get/2, get/3]).
-export([get_in/2, to_binary/1, wrap/1, get/2, get/3, dnssd_service_type/2, dnssd_service_type/3]).

get_in(Map, Path) when is_list(Path) == false -> get_in(Map, Path);
get_in(Value, []) -> Value;
Expand All @@ -23,3 +23,16 @@ to_binary(Term) -> unicode:characters_to_binary(io_lib:format("~p", [Term])).

wrap(Requests) when is_list(Requests) -> Requests;
wrap(Request) -> [Request].

-spec dnssd_service_type(Scheme :: string(), App :: binary()) -> ServiceName :: binary().
dnssd_service_type(Scheme, App) ->
SchemeBin = hello_lib:to_binary(Scheme),
App1 = binary:replace(App, <<"/">>, <<"_">>, [global]),
<<"_", SchemeBin/binary, "_", App1/binary, "._tcp.">>.

-spec dnssd_service_type(Scheme :: string(), Host :: string(), Path :: string()) -> ServiceName :: binary().
dnssd_service_type(Scheme, Host, Path) ->
SchemeBin = hello_lib:to_binary(Scheme),
HostBin = hello_lib:to_binary(Host),
PathBin = binary:replace(hello_lib:to_binary(Path), <<"/">>, <<"_">>, [global]),
<<"_", SchemeBin/binary, "_", HostBin/binary, "_", PathBin/binary, "._tcp.">>.
30 changes: 16 additions & 14 deletions src/hello_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,9 @@ monitor_(Table, Pid) ->
[true]}]) == 0
andalso erlang:monitor(process, Pid).

bind({binding, {_Url, _RouterKey}} = Key, Pid, {Data, Port}, Table) ->
% TODO: That is very dangarous, as single error on service, will simple crash the whole registry
[App, Name] = string:tokens(binary_to_list(Data), "/"),
Ref = dnss_register(App, Name, Port),
bind({binding, {#ex_uri{scheme = Scheme} = _Url, _RouterKey}} = Key, Pid, {Data, Port}, Table) ->
%% replace / to _ for backwards compatibility
Ref = dnssd_register(Scheme, Data, Port),
ets:insert(Table, {Key, Pid, Data, Ref});
bind(Key, Pid, Data, Table) -> ets:insert(Table, {Key, Pid, Data, undefined}).

Expand All @@ -176,17 +175,20 @@ update_metric({listener, _}, Value) -> hello_metrics:listener(Value);
update_metric(Key, _) ->
?LOG_INFO("unknown key ~p for register_metric", [Key]).

do_dnss_register(App, Name, Port) ->
?LOG_INFO("dnss register ~p/~p on port ~p", [App, Name, Port]),
case dnssd:register(Name, <<"_", App/binary, "._tcp">>, Port) of
{ok, Ref} -> Ref;
dnssd_register(Scheme, App, Port) when is_binary(Scheme); is_binary(App); is_integer(Port) ->
?LOG_INFO("dnssd register ~p ~p on port ~p", [Scheme, App, Port]),
{ok, Hostname} = inet:gethostname(),
HostnameBin = hello_lib:to_binary(Hostname),
PortBin = integer_to_binary(Port),
Name = <<(list_to_binary(Scheme))/binary, App/binary, HostnameBin/binary, PortBin/binary,
(base64:encode(crypto:strong_rand_bytes(2)))/binary>>,
case dnssd:register(Name, hello_lib:dnssd_service_type(Scheme, App), Port) of
{ok, Ref} ->
?LOG_INFO("dnss register ref ~p", [Ref]),
Ref;
_ -> ok
end.

dnss_register(App, Name, Port)
when is_list(App), is_list(Name), is_integer(Port) ->
do_dnss_register(list_to_binary(App), list_to_binary(Name), Port);
dnss_register(_, _, _) -> ok.
end;
dnssd_register(_Scheme, _App, _Port) -> ok.

dnssd_clean(Ref) when is_reference(Ref) -> dnssd:stop(Ref);
dnssd_clean(_) -> ok.
33 changes: 7 additions & 26 deletions src/transports/hello_http_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-module(hello_http_client).

-behaviour(hello_client).
-export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2]).
-export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2, update_service/2]).
-export([http_send/4]).

-include_lib("ex_uri/include/ex_uri.hrl").
Expand All @@ -43,8 +43,7 @@
init_transport(URL, Options) ->
case validate_options(Options) of
{ok, ValOpts} ->
http_connect_url(URL),
{ok, #http_state{url = ex_uri:encode(URL), scheme = URL#ex_uri.scheme, path = URL#ex_uri.path, options = ValOpts}};
{browse, #http_state{url = ex_uri:encode(URL), scheme = URL#ex_uri.scheme, path = URL#ex_uri.path, options = ValOpts}};
{error, Reason} ->
?LOG_ERROR("Invalid options for init http client, reason: ~p", [Reason]),
{error, Reason}
Expand All @@ -59,29 +58,17 @@ send_request(_, _, State) ->
terminate_transport(_Reason, _State) ->
ok.

handle_info({dnssd, _Ref, {resolve,{Host, Port, _Txt}}}, State = #http_state{scheme = Scheme, path = Path}) ->
?LOG_INFO("dnssd Service: ~p:~w", [Host, Port]),
{noreply, State#http_state{url = build_url(Scheme, Host, Path, Port)}};
handle_info({dnssd, _Ref, Msg}, State) ->
?LOG_INFO("dnssd Msg: ~p", [Msg]),
update_service({Host, Port, _Txt}, State = #http_state{scheme = Scheme, path = Path}) ->
{ok, State#http_state{url = build_url(Scheme, Host, Path, Port)}}.

handle_info(_Msg, State) ->
{noreply, State}.

build_url(Scheme, Host, Path, Port) ->
ex_uri:encode(#ex_uri{scheme = Scheme,
authority = #ex_uri_authority{host = clean_host(Host), port = Port},
authority = #ex_uri_authority{host = Host, port = Port},
path = Path}).

clean_host(Host) ->
HostSize = erlang:byte_size(Host),
CleanedHost = case binary:match(Host, <<".local.">>) of
{M, L} when HostSize == (M + L) ->
<<HostCuted:M/binary, _/binary>> = Host,
HostCuted;
_ ->
Host
end,
binary_to_list(CleanedHost).

content_type(Signarute) ->
Json = hello_json:signature(),
MsgPack = hello_msgpack:signature(),
Expand Down Expand Up @@ -142,9 +129,3 @@ validate_options([_ | R], Opts) ->
validate_options(R, Opts);
validate_options([], Opts) ->
{ok, Opts}.

http_connect_url(#ex_uri{authority = #ex_uri_authority{host = Host}, path = [$/|Path]}) ->
dnssd:resolve(list_to_binary(Path), <<"_", (list_to_binary(Host))/binary, "._tcp.">>, <<"local.">>),
ok;
http_connect_url(URI) ->
URI.
63 changes: 13 additions & 50 deletions src/transports/hello_zmq_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-module(hello_zmq_client).

-behaviour(hello_client).
-export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2]).
-export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2, update_service/2]).

-include_lib("ex_uri/include/ex_uri.hrl").
-include("hello.hrl").
Expand All @@ -37,8 +37,8 @@
init_transport(URI, Options) ->
SockType = proplists:get_value(socket_type, Options, dealer), %% for tests
{ok, Socket} = ezmq:socket([{type, SockType}, {active, true}]),
ok = ezmq_connect_url(Socket, URI),
{ok, #zmq_state{socket = Socket, uri = URI}}.
Res = ezmq_connect_url(Socket, URI),
{Res, #zmq_state{socket = Socket, uri = URI}}.

send_request(Message, Signature, State = #zmq_state{socket = Socket}) ->
ezmq:send(Socket, [Signature, Message]),
Expand All @@ -49,59 +49,22 @@ send_request(Message, Signature, State = #zmq_state{socket = Socket}) ->
terminate_transport(_Reason, #zmq_state{socket = Socket}) ->
ezmq:close(Socket).

handle_info({dnssd, _Ref, {resolve,{Host, Port, _Txt}}}, State = #zmq_state{uri = URI, socket = Socket}) ->
?LOG_INFO("dnssd Service: ~p:~w", [Host, Port]),
Protocol = zmq_protocol(URI),
R = ezmq:connect(Socket, tcp, clean_host(Host), Port, [Protocol]),
?LOG_INFO("ezmq:connect: ~p", [R]),
{noreply, State};
handle_info({dnssd, _Ref, Msg}, State) ->
?LOG_INFO("dnssd Msg: ~p", [Msg]),
{noreply, State};
update_service({Host, Port, _Txt}, State = #zmq_state{uri = URI, socket = Socket}) ->
Protocol = hello_zmq_listener:zmq_protocol(URI),
R = ezmq:connect(Socket, tcp, Host, Port, [Protocol]),
?LOG_DEBUG("ezmq:connect: ~p", [R]),
{ok, State}.

handle_info({zmq, _Socket, [Signature, Msg]}, State) ->
{?INCOMING_MSG, {ok, Signature, Msg, State}};
handle_info({zmq, _Socket, [<<>>, Signature, Msg]}, State) ->
{?INCOMING_MSG, {ok, Signature, Msg, State}}.

%% --------------------------------------------------------------------------------
%% -- helpers
zmq_protocol(#ex_uri{scheme = "zmq-tcp"}) -> inet;
zmq_protocol(#ex_uri{scheme = "zmq-tcp6"}) -> inet6.

%% use dnssd to resolve port AND host
%% map host to Type and Path to Name
ezmq_connect_url(_Socket, #ex_uri{authority = #ex_uri_authority{host = Host, port = undefined}, path = [$/|Path]}) ->
dnssd:resolve(list_to_binary(Path), <<"_", (list_to_binary(Host))/binary, "._tcp.">>, <<"local.">>),
ok;
ezmq_connect_url(_Socket, #ex_uri{authority = #ex_uri_authority{port = undefined}}) -> browse;

ezmq_connect_url(Socket, URI = #ex_uri{authority = #ex_uri_authority{host = Host, port = Port}}) ->
Protocol = zmq_protocol(URI),
case ezmq_ip(Protocol, Host) of
{ok, IP} ->
ezmq:connect(Socket, tcp, IP, Port, [Protocol]);
Other ->
Other
end.

ezmq_ip(inet, "*") -> {ok, {0,0,0,0}};
ezmq_ip(inet, Host) -> inet:parse_ipv4_address(Host);

ezmq_ip(inet6, "*") -> {ok, {0,0,0,0,0,0,0,0}};
ezmq_ip(inet6, Host) ->
case re:run(Host, "^\\[(.*)\\]$", [{capture, all, list}]) of
{match, ["[::1]", IP]} ->
inet:parse_ipv6_address(IP);
_ ->
inet:parse_ipv6_address(Host)
end.

clean_host(Host) ->
HostSize = erlang:byte_size(Host),
CleanedHost = case binary:match(Host, <<".local.">>) of
{M, L} when HostSize == (M + L) ->
<<HostCuted:M/binary, _/binary>> = Host,
HostCuted;
_ ->
Host
end,
binary_to_list(CleanedHost).
Protocol = hello_zmq_listener:zmq_protocol(URI),
{ok, IP} = hello_zmq_listener:ezmq_ip(Protocol, Host),
ezmq:connect(Socket, tcp, IP, Port, [Protocol]).
3 changes: 3 additions & 0 deletions src/transports/hello_zmq_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

%% for hello_zmq_client
-export([ezmq_ip/2, zmq_protocol/1]).

-include_lib("ex_uri/include/ex_uri.hrl").
-include("hello.hrl").
-include("hello_log.hrl").
Expand Down