From 6ae583aa584f313e5cc9c04becd65e1980c0c36f Mon Sep 17 00:00:00 2001 From: xkulale Date: Thu, 12 Dec 2019 10:31:57 +0000 Subject: [PATCH 1/2] Add API to get pool per command; Add API to get pool by key; Add API to get eredis_cluster_monitoring version. Remove duplicates of names of pools in get_all_pools() API. --- src/eredis_cluster.erl | 31 ++++++++++++++++--- src/eredis_cluster_monitor.erl | 56 +++++++++++++++++++++++----------- test/eredis_cluster_tests.erl | 53 ++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 22 deletions(-) diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..e051fe8 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -19,6 +19,7 @@ -export([update_hash_field/3]). -export([optimistic_locking_transaction/3]). -export([eval/4]). +-export([get_pool_by_command/1, get_pool_by_key/1, get_key_slot/1]). -include("eredis_cluster.hrl"). @@ -89,7 +90,7 @@ transaction(Transaction, Slot, ExpectedValue, Counter) -> -spec qmn(redis_pipeline_command()) -> redis_pipeline_result(). qmn(Commands) -> qmn(Commands, 0). -qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> +qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> {error, no_connection}; qmn(Commands, Counter) -> %% Throttle retries @@ -106,7 +107,7 @@ qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) -> Result = eredis_cluster_pool:transaction(Pool, Transaction), case handle_transaction_result(Result, Version, check_pipeline_result) of retry -> retry; - Res -> + Res -> MappedRes = lists:zip(Mapping,Res), qmn2(T1, T2, MappedRes ++ Acc, Version) end; @@ -190,9 +191,9 @@ query(Transaction, Slot, Counter) -> end. handle_transaction_result(Result, Version) -> - case Result of + case Result of % If we detect a node went down, we should probably refresh the slot - % mapping. + % mapping. {error, no_connection} -> eredis_cluster_monitor:refresh_mapping(Version), retry; @@ -298,7 +299,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) -> RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]), {lists:last(RedisResult), Result} end, - case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of + case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of {{ok, undefined}, _} -> {error, resource_busy}; {{ok, TransactionResult}, UpdateResult} -> @@ -454,3 +455,23 @@ get_key_from_rest([_,KeyName|_]) when is_list(KeyName) -> KeyName; get_key_from_rest(_) -> undefined. + +%% ============================================================================= +%% @doc Returns pool for the command +%% @end +%% ============================================================================= +get_pool_by_command(Command) -> + Key = get_key_from_command(Command), + Slot = get_key_slot(Key), + {Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot), + Pool. + +%% ============================================================================= +%% @doc Returns pool per a key. +%% @end +%% ============================================================================= +get_pool_by_key(Key) -> + Slot = get_key_slot(Key), + {Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot), + Pool. + diff --git a/src/eredis_cluster_monitor.erl b/src/eredis_cluster_monitor.erl index 5a74f45..24eae77 100644 --- a/src/eredis_cluster_monitor.erl +++ b/src/eredis_cluster_monitor.erl @@ -5,7 +5,7 @@ -export([start_link/0]). -export([connect/1]). -export([refresh_mapping/1]). --export([get_state/0, get_state_version/1]). +-export([get_state/0, get_state_version/1, get_state_version/0]). -export([get_pool_by_slot/1, get_pool_by_slot/2]). -export([get_all_pools/0]). @@ -20,10 +20,10 @@ %% Type definition. -include("eredis_cluster.hrl"). -record(state, { - init_nodes :: [#node{}], - slots :: tuple(), %% whose elements are integer indexes into slots_maps - slots_maps :: tuple(), %% whose elements are #slots_map{} - version :: integer() + init_nodes = [] :: [#node{}], + slots = {} :: tuple(), %% whose elements are integer indexes into slots_maps + slots_maps = {} :: tuple(), %% whose elements are #slots_map{} + version = 0 :: integer() }). %% API. @@ -45,18 +45,32 @@ refresh_mapping(Version) -> -spec get_state() -> #state{}. get_state() -> - [{cluster_state, State}] = ets:lookup(?MODULE, cluster_state), - State. + case ets:lookup(?MODULE, cluster_state) of + [{cluster_state, S}] -> + S; + [] -> + #state{} + end. get_state_version(State) -> State#state.version. +-spec get_state_version() -> integer(). +get_state_version() -> + State = get_state(), + State#state.version. + -spec get_all_pools() -> [pid()]. get_all_pools() -> State = get_state(), - SlotsMapList = tuple_to_list(State#state.slots_maps), - [SlotsMap#slots_map.node#node.pool || SlotsMap <- SlotsMapList, - SlotsMap#slots_map.node =/= undefined]. + case State#state.slots_maps of + undefined -> + []; + _ -> + SlotsMapList = tuple_to_list(State#state.slots_maps), + lists:usort([SlotsMap#slots_map.node#node.pool || SlotsMap <- SlotsMapList, + SlotsMap#slots_map.node =/= undefined]) + end. %% ============================================================================= %% @doc Get cluster pool by slot. Optionally, a memoized State can be provided @@ -65,13 +79,21 @@ get_all_pools() -> %% ============================================================================= -spec get_pool_by_slot(Slot::integer(), State::#state{}) -> {PoolName::atom() | undefined, Version::integer()}. -get_pool_by_slot(Slot, State) -> - Index = element(Slot+1,State#state.slots), - Cluster = element(Index,State#state.slots_maps), - if - Cluster#slots_map.node =/= undefined -> - {Cluster#slots_map.node#node.pool, State#state.version}; - true -> + +get_pool_by_slot(_Slot, State) when State#state.slots == undefined -> + {undefined, State#state.version}; +get_pool_by_slot(Slot, State) -> + try + Index = element(Slot+1,State#state.slots), + Cluster = element(Index,State#state.slots_maps), + if + Cluster#slots_map.node =/= undefined -> + {Cluster#slots_map.node#node.pool, State#state.version}; + true -> + {undefined, State#state.version} + end + catch + _:_ -> {undefined, State#state.version} end. diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 7b8a7ce..70277b2 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -163,6 +163,59 @@ basic_test_() -> eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]), ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])) end + }, + + { "no duplicates of all_pools", + fun () -> + Key = "{1}:test", + + {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), + + ClusterNodesList = string:lexemes(NodesInfo,"\n"), + NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> + ClusterNodeI = string:lexemes(ClusterNode," "), + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = string:lexemes(lists:nth(2, ClusterNodeI), ":"), + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), + KeySlot = eredis_cluster:get_key_slot(Key), + Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), + + {NodeId, Pool} = lists:keyfind(Pool, 2, NodeIdsL), + [{NodeId2, _Pool2}, _] = [{NI, P} || {NI, P} <- NodeIdsL, {NI, P} =/= {NodeId, Pool}], + + %% Migrate Slot to have 2 sets of slots for one node: + CmdImp = ["CLUSTER", "SETSLOT", KeySlot, "IMPORTING", NodeId], + eredis_cluster:qa(CmdImp), + + CmdMig = ["CLUSTER", "SETSLOT", KeySlot, "MIGRATING", NodeId2], + eredis_cluster:qa(CmdMig), + + CmdMig1 = ["CLUSTER", "SETSLOT", KeySlot, "NODE", NodeId2], + eredis_cluster:qa(CmdMig1), + + AllPools = eredis_cluster_monitor:get_all_pools(), + ?assertEqual(true, erlang:length(AllPools) == sets:size(sets:from_list(AllPools))) + end + }, + + { "get pool by command and by key", + fun () -> + Key = "{2}:test", + Cmd = ["keys", Key], + + KeySlot = eredis_cluster:get_key_slot(Key), + ?assertEqual(element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), + eredis_cluster:get_pool_by_command(Cmd)), + ?assertEqual(element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), + eredis_cluster:get_pool_by_key(Key)) + end } ] From 9df65535db3ce5861c7e275c685c262869d57bd9 Mon Sep 17 00:00:00 2001 From: xkulale Date: Mon, 16 Dec 2019 14:07:58 +0000 Subject: [PATCH 2/2] Replace string:lexemes/2 with binary:split/3 --- test/eredis_cluster_tests.erl | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 70277b2..10e2b9c 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -171,19 +171,19 @@ basic_test_() -> {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), - ClusterNodesList = string:lexemes(NodesInfo,"\n"), + ClusterNodesList = [CNEl || CNEl <- binary:split(NodesInfo, <<"\n">>, [global]), CNEl =/= <<>>], NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> - ClusterNodeI = string:lexemes(ClusterNode," "), - case lists:nth(3, ClusterNodeI) of - Role when Role == <<"myself,master">>; - Role == <<"master">> -> - [Ip, Port] = string:lexemes(lists:nth(2, ClusterNodeI), ":"), - Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), - [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; - _ -> - Acc - end - end, [], ClusterNodesList), + ClusterNodeI = binary:split(ClusterNode, <<" ">>, [global]), + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = binary:split(lists:nth(2, ClusterNodeI), <<":">>, [global]), + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), KeySlot = eredis_cluster:get_key_slot(Key), Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)),