From ead111787b69fe13991dddda071e782d550f130b Mon Sep 17 00:00:00 2001 From: xkulale Date: Wed, 11 Dec 2019 12:37:21 +0000 Subject: [PATCH 1/6] =?UTF-8?q?Improve=20query=20handling=20a.=09Retry=20t?= =?UTF-8?q?he=20query=20that=20is=20sent=20to=20all=20connected=20pools=20?= =?UTF-8?q?(Redis=20instances);=20b.=09Add=20`refresh=20slots=20map=20and?= =?UTF-8?q?=20retry=20query`=20logic=20for=20such=20Redis=20error=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20responses=20as=20<<"ASK=20",=20=5F/binary>>,?= =?UTF-8?q?=20=20<<"TRYAGAIN=20",=20=5F/binary>>=20and=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20<<"CLUSTERDOWN=20",=20=5F/binary>>;=20c.=09Return=20an?= =?UTF-8?q?=20error=20to=20application=20in=20case=20of=20Redis=20response?= =?UTF-8?q?=20errors=20=20=20=20=20=20=20=20=20like=20as:=20"ERR=20unknown?= =?UTF-8?q?=20command=20'foobar'=E2=80=9D=20or=20"WRONGTYPE=20Operation=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20against=20a=20key=20holding=20the=20wro?= =?UTF-8?q?ng=20kind=20of=20value",=20"BUSY=20Redis=20is=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20busy=20running=20a=20script",=20"NOAUTH=20Authenticati?= =?UTF-8?q?on=20required."=20and=20so=20on=20;=20d.=09Add=20`refresh=20slo?= =?UTF-8?q?ts=20map=20and=20retry=20query=20`=20logic=20in=20case=20of=20P?= =?UTF-8?q?OSIX=20=20=20=20=20=20=20=20=20error=20that=20can=20related=20t?= =?UTF-8?q?o=20temporary=20TCP=20issues.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/eredis_cluster.erl | 136 +++++++++++++++++++++++++++++----- test/eredis_cluster_tests.erl | 9 ++- 2 files changed, 126 insertions(+), 19 deletions(-) diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..a71ce6d 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -11,6 +11,8 @@ % Generic redis call -export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). +-export([get_key_slot/1]). + % Specific redis command implementation -export([flushdb/0]). @@ -56,7 +58,12 @@ connect(InitServers) -> -spec transaction(redis_pipeline_command()) -> redis_transaction_result(). transaction(Commands) -> Result = q([["multi"]| Commands] ++ [["exec"]]), - lists:last(Result). + case is_list(Result) of + true -> + lists:last(Result); + false -> + Result + end. %% ============================================================================= %% @doc Execute a function on a pool worker. This function should be use when @@ -89,7 +96,7 @@ transaction(Transaction, Slot, ExpectedValue, Counter) -> -spec qmn(redis_pipeline_command()) -> redis_pipeline_result(). qmn(Commands) -> qmn(Commands, 0). -qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> +qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> {error, no_connection}; qmn(Commands, Counter) -> %% Throttle retries @@ -106,7 +113,7 @@ qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) -> Result = eredis_cluster_pool:transaction(Pool, Transaction), case handle_transaction_result(Result, Version, check_pipeline_result) of retry -> retry; - Res -> + Res -> MappedRes = lists:zip(Mapping,Res), qmn2(T1, T2, MappedRes ++ Acc, Version) end; @@ -189,10 +196,49 @@ query(Transaction, Slot, Counter) -> Result -> Result end. +handle_transaction_result(Result, Version) when is_list(Result) -> + case proplists:lookup(error, Result) of + % If we detect a node went down, we should probably refresh the slot + % mapping. + {error, no_connection} -> + retry; + % If the tcp connection is closed (connection timeout), the redis worker + % will try to reconnect, thus the connection should be recovered for + % the next request. We don't need to refresh the slot mapping in this + % case: + {error, tcp_closed} -> + retry; + % TCP issue? Try to refresh map, reconnect and retry query: + {error, Reason} when is_atom(Reason) -> + io:format("EREDIS_CLUSTER ERROR!: ~p~n",[Reason]), + eredis_cluster_monitor:refresh_mapping(Version), + retry; + % Redis explicitly say our slot mapping is incorrect, we need to refresh + % it: + {error, <<"MOVED ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"ASK ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"CLUSTERDOWN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"TRYAGAIN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, Reason} when is_binary(Reason) -> + %% Do not make the refresh map. It could be errors like as + %% "ERR unknown command 'foobar' or + %% "WRONGTYPE Operation against a key holding the wrong kind of value": + Result; + none -> + Result + end; handle_transaction_result(Result, Version) -> - case Result of + case Result of % If we detect a node went down, we should probably refresh the slot - % mapping. + % mapping. {error, no_connection} -> eredis_cluster_monitor:refresh_mapping(Version), retry; @@ -203,13 +249,30 @@ handle_transaction_result(Result, Version) -> % case {error, tcp_closed} -> retry; - + % TCP issue? Try to refresh map, reconnect and retry query: + {error, Reason} when is_atom(Reason) -> + io:format("EREDIS_CLUSTER ERROR!: ~p~n",[Reason]), + eredis_cluster_monitor:refresh_mapping(Version), + retry; % Redis explicitly say our slot mapping is incorrect, we need to refresh % it {error, <<"MOVED ", _/binary>>} -> eredis_cluster_monitor:refresh_mapping(Version), retry; - + {error, <<"ASK ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"TRYAGAIN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"CLUSTERDOWN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, Reason} when is_binary(Reason) -> + %% Do not make the refresh map. It could be errors like as + %% "ERR unknown command 'foobar' or + %% "WRONGTYPE Operation against a key holding the wrong kind of value": + Result; Payload -> Payload end. @@ -218,6 +281,9 @@ handle_transaction_result(Result, Version, check_pipeline_result) -> retry -> retry; Payload when is_list(Payload) -> Pred = fun({error, <<"MOVED ", _/binary>>}) -> true; + ({error, <<"ASK ", _/binary>>}) -> true; + ({error, <<"TRYAGAIN ", _/binary>>}) -> true; + ({error, <<"CLUSTERDOWN ", _/binary>>}) -> true; (_) -> false end, case lists:any(Pred, Payload) of @@ -298,7 +364,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) -> RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]), {lists:last(RedisResult), Result} end, - case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of + case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of {{ok, undefined}, _} -> {error, resource_busy}; {{ok, TransactionResult}, UpdateResult} -> @@ -333,14 +399,44 @@ eval(Script, ScriptHash, Keys, Args) -> end. %% ============================================================================= -%% @doc Perform a given query on all node of a redis cluster +%% @doc Perform a given query on all node of a redis cluster. +%% When connection to master failed then do refresh mapping and try again to +%% query. %% @end %% ============================================================================= --spec qa(redis_command()) -> ok | {error, Reason::bitstring()}. +-spec qa(redis_command()) -> redis_result() | [redis_result()] | {error, Reason::bitstring()}. qa(Command) -> + qa(Command, 0, []). + +qa(_, ?REDIS_CLUSTER_REQUEST_TTL, Res) -> + case Res of + [] -> + {error, no_connection}; + _ -> + %% Return result from each Pool: + Res + end; +qa(Command, Counter, Res) -> + %% Throttle retries + throttle_retries(Counter), Pools = eredis_cluster_monitor:get_all_pools(), - Transaction = fun(Worker) -> qw(Worker, Command) end, - [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools]. + case Pools of + [] -> + Version = eredis_cluster_monitor:get_state_version(), + eredis_cluster_monitor:refresh_mapping(Version), + qa(Command, Counter + 1, Res); + _ -> + Transaction = fun(Worker) -> qw(Worker, Command) end, + Result = [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools], + State = eredis_cluster_monitor:get_state(), + case handle_transaction_result(Result, + eredis_cluster_monitor:get_state_version(State)) + of + retry -> qa(Command, Counter + 1, Result); + Result -> Result + end + end. + %% ============================================================================= %% @doc Wrapper function to be used for direct call to a pool worker in the @@ -357,12 +453,16 @@ qw(Worker, Command) -> %% ============================================================================= -spec flushdb() -> ok | {error, Reason::bitstring()}. flushdb() -> - Result = qa(["FLUSHDB"]), - case proplists:lookup(error,Result) of - none -> - ok; - Error -> - Error + case qa(["FLUSHDB"]) of + {error, Reason} -> + {error, Reason}; + Result -> + case proplists:lookup(error, Result) of + none -> + ok; + Error -> + Error + end end. %% ============================================================================= diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 7b8a7ce..d4b8bee 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -163,8 +163,15 @@ basic_test_() -> eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]), ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])) end - } + }, + { "get info from all pools", + fun () -> + ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])), + ?assertEqual(none, proplists:lookup(error,eredis_cluster:qa(["cluster", "slots"]))), + ?assertMatch({error,_}, proplists:lookup(error,eredis_cluster:qa(["get", "qrs"]))) + end + } ] } }. From 25f0d3df49f79f1bc49af267affb55b441429395 Mon Sep 17 00:00:00 2001 From: xkulale Date: Thu, 12 Dec 2019 11:11:08 +0000 Subject: [PATCH 2/6] Add "ASK" support test. --- test/eredis_cluster_tests.erl | 50 +++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index d4b8bee..6c60eda 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -171,6 +171,56 @@ basic_test_() -> ?assertEqual(none, proplists:lookup(error,eredis_cluster:qa(["cluster", "slots"]))), ?assertMatch({error,_}, proplists:lookup(error,eredis_cluster:qa(["get", "qrs"]))) end + }, + + { "ASK support", + fun () -> + Key = "{1}:test", + eredis_cluster:q(["set",Key]), + {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), + + ClusterNodesList = string:lexemes(NodesInfo,"\n"), + NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> + ClusterNodeI = string:lexemes(ClusterNode," "), + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = string:lexemes(lists:nth(2, ClusterNodeI), ":"), + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), + KeySlot = eredis_cluster:get_key_slot(Key), + + Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), + + {NodeId, Pool} = lists:keyfind(Pool, 2, NodeIdsL), + [{NodeId2, _Pool2}, _] = [{NI, P} || {NI, P} <- NodeIdsL, {NI, P} =/= {NodeId, Pool}], + + %% Migrate Slot to have 2 sets of slots for one node: + CmdImp = ["CLUSTER", "SETSLOT", KeySlot, "IMPORTING", NodeId], + eredis_cluster:qa(CmdImp), + + CmdMig = ["CLUSTER", "SETSLOT", KeySlot, "MIGRATING", NodeId2], + eredis_cluster:qa(CmdMig), + Result = lists:usort(eredis_cluster:qa(["get",Key])), + + Fun = fun({error, <<"MOVED ", _/binary>>}) -> true; + ({error, <<"ASK ", _/binary>>}) -> true; + (_) -> false + end, + Verdict = case lists:any(Fun, Result) of + false -> Result; + true -> true + end, + + ?assertEqual(true, Verdict), + + CmdMig1 = ["CLUSTER", "SETSLOT", KeySlot, "NODE", NodeId2], + eredis_cluster:qa(CmdMig1) + end } ] } From 4d9e27d4c3848b4f94148c41e7d0559f0fe90196 Mon Sep 17 00:00:00 2001 From: xkulale Date: Thu, 12 Dec 2019 11:20:43 +0000 Subject: [PATCH 3/6] Check all answers for "MOVED" or "ASK". --- test/eredis_cluster_tests.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 6c60eda..79ecdac 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -211,7 +211,7 @@ basic_test_() -> ({error, <<"ASK ", _/binary>>}) -> true; (_) -> false end, - Verdict = case lists:any(Fun, Result) of + Verdict = case lists:all(Fun, Result) of false -> Result; true -> true end, From d12a666a821c48966aeffe6dd49213bc0d4f8b72 Mon Sep 17 00:00:00 2001 From: xkulale Date: Mon, 16 Dec 2019 12:17:15 +0000 Subject: [PATCH 4/6] If OTP release less than 20, use strings:token() instead strings:lexemes(). --- test/eredis_cluster_tests.erl | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 79ecdac..afd29f4 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -181,17 +181,20 @@ basic_test_() -> ClusterNodesList = string:lexemes(NodesInfo,"\n"), NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> - ClusterNodeI = string:lexemes(ClusterNode," "), - case lists:nth(3, ClusterNodeI) of - Role when Role == <<"myself,master">>; - Role == <<"master">> -> - [Ip, Port] = string:lexemes(lists:nth(2, ClusterNodeI), ":"), - Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), - [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; - _ -> - Acc - end - end, [], ClusterNodesList), + ClusterNodeI = case list_to_integer(erlang:system_info(otp_release)) < 20 of + true -> string:tokens(ClusterNode," "); + false -> string:lexemes(ClusterNode," ") + end, + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = string:lexemes(lists:nth(2, ClusterNodeI), ":"), + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), KeySlot = eredis_cluster:get_key_slot(Key), Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), From 01eeba01095e10daadcf6f468c3131427fb68a7c Mon Sep 17 00:00:00 2001 From: xkulale Date: Mon, 16 Dec 2019 12:17:15 +0000 Subject: [PATCH 5/6] If OTP release less than 20, use strings:token() instead strings:lexemes(). --- test/eredis_cluster_tests.erl | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 79ecdac..9fd8fd0 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -178,20 +178,31 @@ basic_test_() -> Key = "{1}:test", eredis_cluster:q(["set",Key]), {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), + OTPRel = list_to_integer(erlang:system_info(otp_release)), - ClusterNodesList = string:lexemes(NodesInfo,"\n"), + ClusterNodesList = case OTPRel < 20 of + true -> string:tokens(NodesInfo,"\n"); + false -> string:lexemes(NodesInfo,"\n") + end, NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> - ClusterNodeI = string:lexemes(ClusterNode," "), - case lists:nth(3, ClusterNodeI) of - Role when Role == <<"myself,master">>; - Role == <<"master">> -> - [Ip, Port] = string:lexemes(lists:nth(2, ClusterNodeI), ":"), - Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), - [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; - _ -> - Acc - end - end, [], ClusterNodesList), + + ClusterNodeI = case OTPRel < 20 of + true -> string:tokens(ClusterNode," "); + false -> string:lexemes(ClusterNode," ") + end, + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = case OTPRel < 20 of + true -> string:tokens(lists:nth(2, ClusterNodeI), ":"); + false -> string:lexemes(lists:nth(2, ClusterNodeI), ":") + end, + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), KeySlot = eredis_cluster:get_key_slot(Key), Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), From bc9f6152613660e1b1c6ac61990228874b9aa1b0 Mon Sep 17 00:00:00 2001 From: xkulale Date: Mon, 16 Dec 2019 13:35:35 +0000 Subject: [PATCH 6/6] Replase string:lexemes/2 and string:tockens/2 to binary:split/3. --- test/eredis_cluster_tests.erl | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 57bd0d5..fb4d091 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -178,24 +178,14 @@ basic_test_() -> Key = "{1}:test", eredis_cluster:q(["set",Key]), {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), - OTPRel = list_to_integer(erlang:system_info(otp_release)), - ClusterNodesList = case OTPRel < 20 of - true -> string:tokens(NodesInfo,"\n"); - false -> string:lexemes(NodesInfo,"\n") - end, + ClusterNodesList = [CNEL || CNEL <- binary:split(NodesInfo,<<"\n">>, [global]), CNEL =/= <<>>], NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> - ClusterNodeI = case OTPRel < 20 of - true -> string:tokens(ClusterNode," "); - false -> string:lexemes(ClusterNode," ") - end, + ClusterNodeI = binary:split(ClusterNode,<<" ">>, [global]), case lists:nth(3, ClusterNodeI) of Role when Role == <<"myself,master">>; Role == <<"master">> -> - [Ip, Port] = case OTPRel < 20 of - true -> string:tokens(lists:nth(2, ClusterNodeI), ":"); - false -> string:lexemes(lists:nth(2, ClusterNodeI), ":") - end, + [Ip, Port] = binary:split(lists:nth(2, ClusterNodeI), <<":">>, [global]), Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; _ ->