diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..a71ce6d 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -11,6 +11,8 @@ % Generic redis call -export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). +-export([get_key_slot/1]). + % Specific redis command implementation -export([flushdb/0]). @@ -56,7 +58,12 @@ connect(InitServers) -> -spec transaction(redis_pipeline_command()) -> redis_transaction_result(). transaction(Commands) -> Result = q([["multi"]| Commands] ++ [["exec"]]), - lists:last(Result). + case is_list(Result) of + true -> + lists:last(Result); + false -> + Result + end. %% ============================================================================= %% @doc Execute a function on a pool worker. This function should be use when @@ -89,7 +96,7 @@ transaction(Transaction, Slot, ExpectedValue, Counter) -> -spec qmn(redis_pipeline_command()) -> redis_pipeline_result(). qmn(Commands) -> qmn(Commands, 0). -qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> +qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) -> {error, no_connection}; qmn(Commands, Counter) -> %% Throttle retries @@ -106,7 +113,7 @@ qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) -> Result = eredis_cluster_pool:transaction(Pool, Transaction), case handle_transaction_result(Result, Version, check_pipeline_result) of retry -> retry; - Res -> + Res -> MappedRes = lists:zip(Mapping,Res), qmn2(T1, T2, MappedRes ++ Acc, Version) end; @@ -189,10 +196,49 @@ query(Transaction, Slot, Counter) -> Result -> Result end. +handle_transaction_result(Result, Version) when is_list(Result) -> + case proplists:lookup(error, Result) of + % If we detect a node went down, we should probably refresh the slot + % mapping. + {error, no_connection} -> + retry; + % If the tcp connection is closed (connection timeout), the redis worker + % will try to reconnect, thus the connection should be recovered for + % the next request. We don't need to refresh the slot mapping in this + % case: + {error, tcp_closed} -> + retry; + % TCP issue? Try to refresh map, reconnect and retry query: + {error, Reason} when is_atom(Reason) -> + io:format("EREDIS_CLUSTER ERROR!: ~p~n",[Reason]), + eredis_cluster_monitor:refresh_mapping(Version), + retry; + % Redis explicitly say our slot mapping is incorrect, we need to refresh + % it: + {error, <<"MOVED ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"ASK ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"CLUSTERDOWN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"TRYAGAIN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, Reason} when is_binary(Reason) -> + %% Do not make the refresh map. It could be errors like as + %% "ERR unknown command 'foobar' or + %% "WRONGTYPE Operation against a key holding the wrong kind of value": + Result; + none -> + Result + end; handle_transaction_result(Result, Version) -> - case Result of + case Result of % If we detect a node went down, we should probably refresh the slot - % mapping. + % mapping. {error, no_connection} -> eredis_cluster_monitor:refresh_mapping(Version), retry; @@ -203,13 +249,30 @@ handle_transaction_result(Result, Version) -> % case {error, tcp_closed} -> retry; - + % TCP issue? Try to refresh map, reconnect and retry query: + {error, Reason} when is_atom(Reason) -> + io:format("EREDIS_CLUSTER ERROR!: ~p~n",[Reason]), + eredis_cluster_monitor:refresh_mapping(Version), + retry; % Redis explicitly say our slot mapping is incorrect, we need to refresh % it {error, <<"MOVED ", _/binary>>} -> eredis_cluster_monitor:refresh_mapping(Version), retry; - + {error, <<"ASK ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"TRYAGAIN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, <<"CLUSTERDOWN ", _/binary>>} -> + eredis_cluster_monitor:refresh_mapping(Version), + retry; + {error, Reason} when is_binary(Reason) -> + %% Do not make the refresh map. It could be errors like as + %% "ERR unknown command 'foobar' or + %% "WRONGTYPE Operation against a key holding the wrong kind of value": + Result; Payload -> Payload end. @@ -218,6 +281,9 @@ handle_transaction_result(Result, Version, check_pipeline_result) -> retry -> retry; Payload when is_list(Payload) -> Pred = fun({error, <<"MOVED ", _/binary>>}) -> true; + ({error, <<"ASK ", _/binary>>}) -> true; + ({error, <<"TRYAGAIN ", _/binary>>}) -> true; + ({error, <<"CLUSTERDOWN ", _/binary>>}) -> true; (_) -> false end, case lists:any(Pred, Payload) of @@ -298,7 +364,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) -> RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]), {lists:last(RedisResult), Result} end, - case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of + case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of {{ok, undefined}, _} -> {error, resource_busy}; {{ok, TransactionResult}, UpdateResult} -> @@ -333,14 +399,44 @@ eval(Script, ScriptHash, Keys, Args) -> end. %% ============================================================================= -%% @doc Perform a given query on all node of a redis cluster +%% @doc Perform a given query on all node of a redis cluster. +%% When connection to master failed then do refresh mapping and try again to +%% query. %% @end %% ============================================================================= --spec qa(redis_command()) -> ok | {error, Reason::bitstring()}. +-spec qa(redis_command()) -> redis_result() | [redis_result()] | {error, Reason::bitstring()}. qa(Command) -> + qa(Command, 0, []). + +qa(_, ?REDIS_CLUSTER_REQUEST_TTL, Res) -> + case Res of + [] -> + {error, no_connection}; + _ -> + %% Return result from each Pool: + Res + end; +qa(Command, Counter, Res) -> + %% Throttle retries + throttle_retries(Counter), Pools = eredis_cluster_monitor:get_all_pools(), - Transaction = fun(Worker) -> qw(Worker, Command) end, - [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools]. + case Pools of + [] -> + Version = eredis_cluster_monitor:get_state_version(), + eredis_cluster_monitor:refresh_mapping(Version), + qa(Command, Counter + 1, Res); + _ -> + Transaction = fun(Worker) -> qw(Worker, Command) end, + Result = [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools], + State = eredis_cluster_monitor:get_state(), + case handle_transaction_result(Result, + eredis_cluster_monitor:get_state_version(State)) + of + retry -> qa(Command, Counter + 1, Result); + Result -> Result + end + end. + %% ============================================================================= %% @doc Wrapper function to be used for direct call to a pool worker in the @@ -357,12 +453,16 @@ qw(Worker, Command) -> %% ============================================================================= -spec flushdb() -> ok | {error, Reason::bitstring()}. flushdb() -> - Result = qa(["FLUSHDB"]), - case proplists:lookup(error,Result) of - none -> - ok; - Error -> - Error + case qa(["FLUSHDB"]) of + {error, Reason} -> + {error, Reason}; + Result -> + case proplists:lookup(error, Result) of + none -> + ok; + Error -> + Error + end end. %% ============================================================================= diff --git a/test/eredis_cluster_tests.erl b/test/eredis_cluster_tests.erl index 7b8a7ce..fb4d091 100644 --- a/test/eredis_cluster_tests.erl +++ b/test/eredis_cluster_tests.erl @@ -163,8 +163,65 @@ basic_test_() -> eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]), ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])) end - } + }, + + { "get info from all pools", + fun () -> + ?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])), + ?assertEqual(none, proplists:lookup(error,eredis_cluster:qa(["cluster", "slots"]))), + ?assertMatch({error,_}, proplists:lookup(error,eredis_cluster:qa(["get", "qrs"]))) + end + }, + { "ASK support", + fun () -> + Key = "{1}:test", + eredis_cluster:q(["set",Key]), + {ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]), + + ClusterNodesList = [CNEL || CNEL <- binary:split(NodesInfo,<<"\n">>, [global]), CNEL =/= <<>>], + NodeIdsL = lists:foldl(fun(ClusterNode, Acc) -> + ClusterNodeI = binary:split(ClusterNode,<<" ">>, [global]), + case lists:nth(3, ClusterNodeI) of + Role when Role == <<"myself,master">>; + Role == <<"master">> -> + [Ip, Port] = binary:split(lists:nth(2, ClusterNodeI), <<":">>, [global]), + Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)), + [{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc]; + _ -> + Acc + end + end, [], ClusterNodesList), + KeySlot = eredis_cluster:get_key_slot(Key), + + Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)), + + {NodeId, Pool} = lists:keyfind(Pool, 2, NodeIdsL), + [{NodeId2, _Pool2}, _] = [{NI, P} || {NI, P} <- NodeIdsL, {NI, P} =/= {NodeId, Pool}], + + %% Migrate Slot to have 2 sets of slots for one node: + CmdImp = ["CLUSTER", "SETSLOT", KeySlot, "IMPORTING", NodeId], + eredis_cluster:qa(CmdImp), + + CmdMig = ["CLUSTER", "SETSLOT", KeySlot, "MIGRATING", NodeId2], + eredis_cluster:qa(CmdMig), + Result = lists:usort(eredis_cluster:qa(["get",Key])), + + Fun = fun({error, <<"MOVED ", _/binary>>}) -> true; + ({error, <<"ASK ", _/binary>>}) -> true; + (_) -> false + end, + Verdict = case lists:all(Fun, Result) of + false -> Result; + true -> true + end, + + ?assertEqual(true, Verdict), + + CmdMig1 = ["CLUSTER", "SETSLOT", KeySlot, "NODE", NodeId2], + eredis_cluster:qa(CmdMig1) + end + } ] } }.