diff --git a/src/hello_client.erl b/src/hello_client.erl index b83c108..71fa89b 100644 --- a/src/hello_client.erl +++ b/src/hello_client.erl @@ -44,11 +44,17 @@ %% Behaviour callbacks -callback init_transport(#ex_uri{}, trans_opts()) -> - {ok, ClientState :: term()} | {error, Reason :: term()}. + {ok | browse, ClientState :: term()} | {error, Reason :: term()}. -callback send_request(Reqquest :: binary(), signature(), ClientState :: term()) -> {ok, NewClietnState :: term()} | {error, Reason :: term(), ClietnState :: term()}. +-callback handle_info(Msg :: term(), ClientState :: term()) -> + {?INCOMING_MSG, Message :: term} | {noreply, NewClietnState :: term()}. + +-callback update_service({dnssd:name(), dnssd:type(), dnssd:domain()}, ClientState :: term()) -> + {ok, NewClietnState :: term()}. + -callback terminate_transport(Reason :: term(), ClientState :: term()) -> ok. @@ -119,6 +125,7 @@ timeout_call(Client, Call, Timeout) -> %% ---------------------------------------------------------------------------------------------------- %% -- gen_server callbacks -record(client_state, { + uri = #ex_uri{}, transport_mod :: module(), transport_state :: term(), protocol_mod :: atom(), @@ -127,7 +134,8 @@ timeout_call(Client, Call, Timeout) -> async_request_map = gb_trees:empty() :: gb_trees:tree(), keep_alive_interval :: number(), keep_alive_ref :: term(), - notification_sink :: pid() | function() + notification_sink :: pid() | function(), + services = [] :: list({binary(), binary(), binary()}) }). %% @hidden @@ -172,13 +180,33 @@ handle_info(?PING, State = #client_state{transport_mod=TransportModule, transpor {ok, NewTimerRef} = timer:send_after(KeepAliveInterval, self(), ?PING), {noreply, State#client_state{transport_state = NewTransportState, keep_alive_ref = NewTimerRef}}; +handle_info({dnssd, _Ref, {browse, _, _} = Msg}, + #client_state{transport_mod = TransportModule, transport_state = TransportState, + uri = URI, services = Services} = State) -> + ?LOG_DEBUG("dnssd Msg: ~p", [Msg]), + NewServices = update_services(Msg, Services), + ?LOG_DEBUG("Services: ~p for ~p", [NewServices, ex_uri:encode(URI)]), + case NewServices of + [] -> + ?LOG_WARNING("service ~p not available", [ex_uri:encode(URI)]), + NewTransportState = TransportState; + [{Name, Type, Domain} | _] -> + {ok, Service} = dnssd:resolve_sync(Name, Type, Domain), + ?LOG_INFO("Update service ~p for ~p", [Service, ex_uri:encode(URI)]), + {ok, NewTransportState} = TransportModule:update_service(clean_host(Service), TransportState) + end, + {noreply, State#client_state{transport_state = NewTransportState, services = NewServices}}; + handle_info(Info, State = #client_state{transport_mod=TransportModule, transport_state=TransportState}) -> case TransportModule:handle_info(Info, TransportState) of {?INCOMING_MSG, Message} -> incoming_message(Message, State); {noreply, NewTransportState} -> {noreply, State#client_state{transport_state = NewTransportState}} - end. + end; + +handle_info(Info, _State) -> + ?LOG_INFO("unhandeld message ~p in handle_info", [Info]). %% @hidden terminate(Reason, #client_state{transport_mod = TransportModule, transport_state = TransportState, keep_alive_ref = TimerRef}) -> @@ -197,13 +225,15 @@ code_change(_FromVsn, _ToVsn, State) -> %% -- Helper functions init_transport(TransportModule, URIRec, TransportOpts, ProtocolOpts, ClientOpts) -> case TransportModule:init_transport(URIRec, TransportOpts) of - {ok, TransportState} -> + {OkOrBrowse, TransportState} when OkOrBrowse =:= ok; OkOrBrowse =:= browse -> NotificationSink = proplists:get_value(notification_sink, ProtocolOpts, undefined), ProtocolMod = proplists:get_value(protocol, ProtocolOpts, hello_proto_jsonrpc), + OkOrBrowse =:= browse andalso browse(URIRec), case hello_proto:init_client(ProtocolMod, ProtocolOpts) of {ok, ProtocolState} -> hello_metrics:client(1), - State = #client_state{transport_mod = TransportModule, + State = #client_state{uri = URIRec, + transport_mod = TransportModule, transport_state = TransportState, protocol_mod = ProtocolMod, protocol_opts = ProtocolOpts, @@ -225,6 +255,17 @@ evaluate_client_options(ClientOpts, State) -> {ok, State#client_state{keep_alive_interval = KeepAliveInterval, keep_alive_ref = TimerRef}} end. +clean_host({Host, Port, Txt}) -> + HostSize = erlang:byte_size(Host), + CleanedHost = case binary:match(Host, <<".local.">>) of + {M, L} when HostSize == (M + L) -> + <> = Host, + HostCuted; + _ -> + Host + end, + {binary_to_list(CleanedHost), Port, Txt}. + incoming_message({error, _Reason, NewTransportState}, State) -> %%will be logged later {noreply, State#client_state{transport_state = NewTransportState}}; incoming_message({ok, Signature, BinResponse, NewTransportState}, @@ -267,6 +308,10 @@ outgoing_message(Request, From, State = #client_state{protocol_mod = ProtocolMod ignore -> {ok, State} end. +update_services({browse, add, Service}, Services) -> Services ++ [Service]; +update_services({browse, remove, Service}, Services) -> Services -- [Service]; +update_services(_, Services) -> Services. + update_map(Requests, From, AsyncMap0) when is_list(Requests) -> lists:foldl(fun(Request, AsyncMap) -> update_map(Request, From, AsyncMap) @@ -274,6 +319,9 @@ update_map(Requests, From, AsyncMap0) when is_list(Requests) -> update_map(#request{id = undefined}, _From, AsyncMap) -> AsyncMap; update_map(#request{id = RequestId} = Request, From, AsyncMap) -> gb_trees:enter(RequestId, {From, Request}, AsyncMap). +browse(#ex_uri{scheme = Scheme, authority = #ex_uri_authority{host = Host}, path = [$/|Path]}) -> + dnssd:browse(hello_lib:dnssd_service_type(Scheme, Host, Path)). + handle_internal(?PONG, State = #client_state{keep_alive_interval = KeepAliveInterval, keep_alive_ref = TimerRef}) -> timer:cancel(TimerRef), {ok, NewTimerRef} = timer:send_after(KeepAliveInterval, self(), ?PING), diff --git a/src/hello_lib.erl b/src/hello_lib.erl index 264a757..26d1a5c 100644 --- a/src/hello_lib.erl +++ b/src/hello_lib.erl @@ -1,5 +1,5 @@ -module(hello_lib). --export([get_in/2, to_binary/1, wrap/1, get/2, get/3]). +-export([get_in/2, to_binary/1, wrap/1, get/2, get/3, dnssd_service_type/2, dnssd_service_type/3]). get_in(Map, Path) when is_list(Path) == false -> get_in(Map, Path); get_in(Value, []) -> Value; @@ -23,3 +23,16 @@ to_binary(Term) -> unicode:characters_to_binary(io_lib:format("~p", [Term])). wrap(Requests) when is_list(Requests) -> Requests; wrap(Request) -> [Request]. + +-spec dnssd_service_type(Scheme :: string(), App :: binary()) -> ServiceName :: binary(). +dnssd_service_type(Scheme, App) -> + SchemeBin = hello_lib:to_binary(Scheme), + App1 = binary:replace(App, <<"/">>, <<"_">>, [global]), + <<"_", SchemeBin/binary, "_", App1/binary, "._tcp.">>. + +-spec dnssd_service_type(Scheme :: string(), Host :: string(), Path :: string()) -> ServiceName :: binary(). +dnssd_service_type(Scheme, Host, Path) -> + SchemeBin = hello_lib:to_binary(Scheme), + HostBin = hello_lib:to_binary(Host), + PathBin = binary:replace(hello_lib:to_binary(Path), <<"/">>, <<"_">>, [global]), + <<"_", SchemeBin/binary, "_", HostBin/binary, "_", PathBin/binary, "._tcp.">>. diff --git a/src/hello_registry.erl b/src/hello_registry.erl index d4d2845..0110392 100644 --- a/src/hello_registry.erl +++ b/src/hello_registry.erl @@ -160,10 +160,9 @@ monitor_(Table, Pid) -> [true]}]) == 0 andalso erlang:monitor(process, Pid). -bind({binding, {_Url, _RouterKey}} = Key, Pid, {Data, Port}, Table) -> - % TODO: That is very dangarous, as single error on service, will simple crash the whole registry - [App, Name] = string:tokens(binary_to_list(Data), "/"), - Ref = dnss_register(App, Name, Port), +bind({binding, {#ex_uri{scheme = Scheme} = _Url, _RouterKey}} = Key, Pid, {Data, Port}, Table) -> + %% replace / to _ for backwards compatibility + Ref = dnssd_register(Scheme, Data, Port), ets:insert(Table, {Key, Pid, Data, Ref}); bind(Key, Pid, Data, Table) -> ets:insert(Table, {Key, Pid, Data, undefined}). @@ -176,17 +175,20 @@ update_metric({listener, _}, Value) -> hello_metrics:listener(Value); update_metric(Key, _) -> ?LOG_INFO("unknown key ~p for register_metric", [Key]). -do_dnss_register(App, Name, Port) -> - ?LOG_INFO("dnss register ~p/~p on port ~p", [App, Name, Port]), - case dnssd:register(Name, <<"_", App/binary, "._tcp">>, Port) of - {ok, Ref} -> Ref; +dnssd_register(Scheme, App, Port) when is_binary(Scheme); is_binary(App); is_integer(Port) -> + ?LOG_INFO("dnssd register ~p ~p on port ~p", [Scheme, App, Port]), + {ok, Hostname} = inet:gethostname(), + HostnameBin = hello_lib:to_binary(Hostname), + PortBin = integer_to_binary(Port), + Name = <<(list_to_binary(Scheme))/binary, App/binary, HostnameBin/binary, PortBin/binary, + (base64:encode(crypto:strong_rand_bytes(2)))/binary>>, + case dnssd:register(Name, hello_lib:dnssd_service_type(Scheme, App), Port) of + {ok, Ref} -> + ?LOG_INFO("dnss register ref ~p", [Ref]), + Ref; _ -> ok - end. - -dnss_register(App, Name, Port) - when is_list(App), is_list(Name), is_integer(Port) -> - do_dnss_register(list_to_binary(App), list_to_binary(Name), Port); -dnss_register(_, _, _) -> ok. + end; +dnssd_register(_Scheme, _App, _Port) -> ok. dnssd_clean(Ref) when is_reference(Ref) -> dnssd:stop(Ref); dnssd_clean(_) -> ok. diff --git a/src/transports/hello_http_client.erl b/src/transports/hello_http_client.erl index a51ff1a..d81b554 100644 --- a/src/transports/hello_http_client.erl +++ b/src/transports/hello_http_client.erl @@ -22,7 +22,7 @@ -module(hello_http_client). -behaviour(hello_client). --export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2]). +-export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2, update_service/2]). -export([http_send/4]). -include_lib("ex_uri/include/ex_uri.hrl"). @@ -43,8 +43,7 @@ init_transport(URL, Options) -> case validate_options(Options) of {ok, ValOpts} -> - http_connect_url(URL), - {ok, #http_state{url = ex_uri:encode(URL), scheme = URL#ex_uri.scheme, path = URL#ex_uri.path, options = ValOpts}}; + {browse, #http_state{url = ex_uri:encode(URL), scheme = URL#ex_uri.scheme, path = URL#ex_uri.path, options = ValOpts}}; {error, Reason} -> ?LOG_ERROR("Invalid options for init http client, reason: ~p", [Reason]), {error, Reason} @@ -59,29 +58,17 @@ send_request(_, _, State) -> terminate_transport(_Reason, _State) -> ok. -handle_info({dnssd, _Ref, {resolve,{Host, Port, _Txt}}}, State = #http_state{scheme = Scheme, path = Path}) -> - ?LOG_INFO("dnssd Service: ~p:~w", [Host, Port]), - {noreply, State#http_state{url = build_url(Scheme, Host, Path, Port)}}; -handle_info({dnssd, _Ref, Msg}, State) -> - ?LOG_INFO("dnssd Msg: ~p", [Msg]), +update_service({Host, Port, _Txt}, State = #http_state{scheme = Scheme, path = Path}) -> + {ok, State#http_state{url = build_url(Scheme, Host, Path, Port)}}. + +handle_info(_Msg, State) -> {noreply, State}. build_url(Scheme, Host, Path, Port) -> ex_uri:encode(#ex_uri{scheme = Scheme, - authority = #ex_uri_authority{host = clean_host(Host), port = Port}, + authority = #ex_uri_authority{host = Host, port = Port}, path = Path}). -clean_host(Host) -> - HostSize = erlang:byte_size(Host), - CleanedHost = case binary:match(Host, <<".local.">>) of - {M, L} when HostSize == (M + L) -> - <> = Host, - HostCuted; - _ -> - Host - end, - binary_to_list(CleanedHost). - content_type(Signarute) -> Json = hello_json:signature(), MsgPack = hello_msgpack:signature(), @@ -142,9 +129,3 @@ validate_options([_ | R], Opts) -> validate_options(R, Opts); validate_options([], Opts) -> {ok, Opts}. - -http_connect_url(#ex_uri{authority = #ex_uri_authority{host = Host}, path = [$/|Path]}) -> - dnssd:resolve(list_to_binary(Path), <<"_", (list_to_binary(Host))/binary, "._tcp.">>, <<"local.">>), - ok; -http_connect_url(URI) -> - URI. diff --git a/src/transports/hello_zmq_client.erl b/src/transports/hello_zmq_client.erl index 8505860..6455c1e 100644 --- a/src/transports/hello_zmq_client.erl +++ b/src/transports/hello_zmq_client.erl @@ -22,7 +22,7 @@ -module(hello_zmq_client). -behaviour(hello_client). --export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2]). +-export([init_transport/2, send_request/3, terminate_transport/2, handle_info/2, update_service/2]). -include_lib("ex_uri/include/ex_uri.hrl"). -include("hello.hrl"). @@ -37,8 +37,8 @@ init_transport(URI, Options) -> SockType = proplists:get_value(socket_type, Options, dealer), %% for tests {ok, Socket} = ezmq:socket([{type, SockType}, {active, true}]), - ok = ezmq_connect_url(Socket, URI), - {ok, #zmq_state{socket = Socket, uri = URI}}. + Res = ezmq_connect_url(Socket, URI), + {Res, #zmq_state{socket = Socket, uri = URI}}. send_request(Message, Signature, State = #zmq_state{socket = Socket}) -> ezmq:send(Socket, [Signature, Message]), @@ -49,15 +49,12 @@ send_request(Message, Signature, State = #zmq_state{socket = Socket}) -> terminate_transport(_Reason, #zmq_state{socket = Socket}) -> ezmq:close(Socket). -handle_info({dnssd, _Ref, {resolve,{Host, Port, _Txt}}}, State = #zmq_state{uri = URI, socket = Socket}) -> - ?LOG_INFO("dnssd Service: ~p:~w", [Host, Port]), - Protocol = zmq_protocol(URI), - R = ezmq:connect(Socket, tcp, clean_host(Host), Port, [Protocol]), - ?LOG_INFO("ezmq:connect: ~p", [R]), - {noreply, State}; -handle_info({dnssd, _Ref, Msg}, State) -> - ?LOG_INFO("dnssd Msg: ~p", [Msg]), - {noreply, State}; +update_service({Host, Port, _Txt}, State = #zmq_state{uri = URI, socket = Socket}) -> + Protocol = hello_zmq_listener:zmq_protocol(URI), + R = ezmq:connect(Socket, tcp, Host, Port, [Protocol]), + ?LOG_DEBUG("ezmq:connect: ~p", [R]), + {ok, State}. + handle_info({zmq, _Socket, [Signature, Msg]}, State) -> {?INCOMING_MSG, {ok, Signature, Msg, State}}; handle_info({zmq, _Socket, [<<>>, Signature, Msg]}, State) -> @@ -65,43 +62,9 @@ handle_info({zmq, _Socket, [<<>>, Signature, Msg]}, State) -> %% -------------------------------------------------------------------------------- %% -- helpers -zmq_protocol(#ex_uri{scheme = "zmq-tcp"}) -> inet; -zmq_protocol(#ex_uri{scheme = "zmq-tcp6"}) -> inet6. - -%% use dnssd to resolve port AND host -%% map host to Type and Path to Name -ezmq_connect_url(_Socket, #ex_uri{authority = #ex_uri_authority{host = Host, port = undefined}, path = [$/|Path]}) -> - dnssd:resolve(list_to_binary(Path), <<"_", (list_to_binary(Host))/binary, "._tcp.">>, <<"local.">>), - ok; +ezmq_connect_url(_Socket, #ex_uri{authority = #ex_uri_authority{port = undefined}}) -> browse; ezmq_connect_url(Socket, URI = #ex_uri{authority = #ex_uri_authority{host = Host, port = Port}}) -> - Protocol = zmq_protocol(URI), - case ezmq_ip(Protocol, Host) of - {ok, IP} -> - ezmq:connect(Socket, tcp, IP, Port, [Protocol]); - Other -> - Other - end. - -ezmq_ip(inet, "*") -> {ok, {0,0,0,0}}; -ezmq_ip(inet, Host) -> inet:parse_ipv4_address(Host); - -ezmq_ip(inet6, "*") -> {ok, {0,0,0,0,0,0,0,0}}; -ezmq_ip(inet6, Host) -> - case re:run(Host, "^\\[(.*)\\]$", [{capture, all, list}]) of - {match, ["[::1]", IP]} -> - inet:parse_ipv6_address(IP); - _ -> - inet:parse_ipv6_address(Host) - end. - -clean_host(Host) -> - HostSize = erlang:byte_size(Host), - CleanedHost = case binary:match(Host, <<".local.">>) of - {M, L} when HostSize == (M + L) -> - <> = Host, - HostCuted; - _ -> - Host - end, - binary_to_list(CleanedHost). + Protocol = hello_zmq_listener:zmq_protocol(URI), + {ok, IP} = hello_zmq_listener:ezmq_ip(Protocol, Host), + ezmq:connect(Socket, tcp, IP, Port, [Protocol]). diff --git a/src/transports/hello_zmq_listener.erl b/src/transports/hello_zmq_listener.erl index cfc4936..a040335 100644 --- a/src/transports/hello_zmq_listener.erl +++ b/src/transports/hello_zmq_listener.erl @@ -28,6 +28,9 @@ -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% for hello_zmq_client +-export([ezmq_ip/2, zmq_protocol/1]). + -include_lib("ex_uri/include/ex_uri.hrl"). -include("hello.hrl"). -include("hello_log.hrl").