Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 118 additions & 18 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
% Generic redis call
-export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]).

-export([get_key_slot/1]).

% Specific redis command implementation
-export([flushdb/0]).

Expand Down Expand Up @@ -56,7 +58,12 @@ connect(InitServers) ->
-spec transaction(redis_pipeline_command()) -> redis_transaction_result().
transaction(Commands) ->
Result = q([["multi"]| Commands] ++ [["exec"]]),
lists:last(Result).
case is_list(Result) of
true ->
lists:last(Result);
false ->
Result
end.

%% =============================================================================
%% @doc Execute a function on a pool worker. This function should be use when
Expand Down Expand Up @@ -89,7 +96,7 @@ transaction(Transaction, Slot, ExpectedValue, Counter) ->
-spec qmn(redis_pipeline_command()) -> redis_pipeline_result().
qmn(Commands) -> qmn(Commands, 0).

qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) ->
qmn(_, ?REDIS_CLUSTER_REQUEST_TTL) ->
{error, no_connection};
qmn(Commands, Counter) ->
%% Throttle retries
Expand All @@ -106,7 +113,7 @@ qmn2([{Pool, PoolCommands} | T1], [{Pool, Mapping} | T2], Acc, Version) ->
Result = eredis_cluster_pool:transaction(Pool, Transaction),
case handle_transaction_result(Result, Version, check_pipeline_result) of
retry -> retry;
Res ->
Res ->
MappedRes = lists:zip(Mapping,Res),
qmn2(T1, T2, MappedRes ++ Acc, Version)
end;
Expand Down Expand Up @@ -189,10 +196,49 @@ query(Transaction, Slot, Counter) ->
Result -> Result
end.

handle_transaction_result(Result, Version) when is_list(Result) ->
case proplists:lookup(error, Result) of
% If we detect a node went down, we should probably refresh the slot
% mapping.
{error, no_connection} ->
retry;
% If the tcp connection is closed (connection timeout), the redis worker
% will try to reconnect, thus the connection should be recovered for
% the next request. We don't need to refresh the slot mapping in this
% case:
{error, tcp_closed} ->
retry;
% TCP issue? Try to refresh map, reconnect and retry query:
{error, Reason} when is_atom(Reason) ->
io:format("EREDIS_CLUSTER ERROR!: ~p~n",[Reason]),
eredis_cluster_monitor:refresh_mapping(Version),
retry;
% Redis explicitly say our slot mapping is incorrect, we need to refresh
% it:
{error, <<"MOVED ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, <<"ASK ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, <<"CLUSTERDOWN ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, <<"TRYAGAIN ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, Reason} when is_binary(Reason) ->
%% Do not make the refresh map. It could be errors like as
%% "ERR unknown command 'foobar' or
%% "WRONGTYPE Operation against a key holding the wrong kind of value":
Result;
none ->
Result
end;
handle_transaction_result(Result, Version) ->
case Result of
case Result of
% If we detect a node went down, we should probably refresh the slot
% mapping.
% mapping.
{error, no_connection} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
Expand All @@ -203,13 +249,30 @@ handle_transaction_result(Result, Version) ->
% case
{error, tcp_closed} ->
retry;

% TCP issue? Try to refresh map, reconnect and retry query:
{error, Reason} when is_atom(Reason) ->
io:format("EREDIS_CLUSTER ERROR!: ~p~n",[Reason]),
eredis_cluster_monitor:refresh_mapping(Version),
retry;
% Redis explicitly say our slot mapping is incorrect, we need to refresh
% it
{error, <<"MOVED ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;

{error, <<"ASK ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, <<"TRYAGAIN ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, <<"CLUSTERDOWN ", _/binary>>} ->
eredis_cluster_monitor:refresh_mapping(Version),
retry;
{error, Reason} when is_binary(Reason) ->
%% Do not make the refresh map. It could be errors like as
%% "ERR unknown command 'foobar' or
%% "WRONGTYPE Operation against a key holding the wrong kind of value":
Result;
Payload ->
Payload
end.
Expand All @@ -218,6 +281,9 @@ handle_transaction_result(Result, Version, check_pipeline_result) ->
retry -> retry;
Payload when is_list(Payload) ->
Pred = fun({error, <<"MOVED ", _/binary>>}) -> true;
({error, <<"ASK ", _/binary>>}) -> true;
({error, <<"TRYAGAIN ", _/binary>>}) -> true;
({error, <<"CLUSTERDOWN ", _/binary>>}) -> true;
(_) -> false
end,
case lists:any(Pred, Payload) of
Expand Down Expand Up @@ -298,7 +364,7 @@ optimistic_locking_transaction(WatchedKey, GetCommand, UpdateFunction) ->
RedisResult = qw(Worker, [["MULTI"]] ++ UpdateCommand ++ [["EXEC"]]),
{lists:last(RedisResult), Result}
end,
case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of
case transaction(Transaction, Slot, {ok, undefined}, ?OL_TRANSACTION_TTL) of
{{ok, undefined}, _} ->
{error, resource_busy};
{{ok, TransactionResult}, UpdateResult} ->
Expand Down Expand Up @@ -333,14 +399,44 @@ eval(Script, ScriptHash, Keys, Args) ->
end.

%% =============================================================================
%% @doc Perform a given query on all node of a redis cluster
%% @doc Perform a given query on all node of a redis cluster.
%% When connection to master failed then do refresh mapping and try again to
%% query.
%% @end
%% =============================================================================
-spec qa(redis_command()) -> ok | {error, Reason::bitstring()}.
-spec qa(redis_command()) -> redis_result() | [redis_result()] | {error, Reason::bitstring()}.
qa(Command) ->
qa(Command, 0, []).

qa(_, ?REDIS_CLUSTER_REQUEST_TTL, Res) ->
case Res of
[] ->
{error, no_connection};
_ ->
%% Return result from each Pool:
Res
end;
qa(Command, Counter, Res) ->
%% Throttle retries
throttle_retries(Counter),
Pools = eredis_cluster_monitor:get_all_pools(),
Transaction = fun(Worker) -> qw(Worker, Command) end,
[eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools].
case Pools of
[] ->
Version = eredis_cluster_monitor:get_state_version(),
eredis_cluster_monitor:refresh_mapping(Version),
qa(Command, Counter + 1, Res);
_ ->
Transaction = fun(Worker) -> qw(Worker, Command) end,
Result = [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools],
State = eredis_cluster_monitor:get_state(),
case handle_transaction_result(Result,
eredis_cluster_monitor:get_state_version(State))
of
retry -> qa(Command, Counter + 1, Result);
Result -> Result
end
end.


%% =============================================================================
%% @doc Wrapper function to be used for direct call to a pool worker in the
Expand All @@ -357,12 +453,16 @@ qw(Worker, Command) ->
%% =============================================================================
-spec flushdb() -> ok | {error, Reason::bitstring()}.
flushdb() ->
Result = qa(["FLUSHDB"]),
case proplists:lookup(error,Result) of
none ->
ok;
Error ->
Error
case qa(["FLUSHDB"]) of
{error, Reason} ->
{error, Reason};
Result ->
case proplists:lookup(error, Result) of
none ->
ok;
Error ->
Error
end
end.

%% =============================================================================
Expand Down
59 changes: 58 additions & 1 deletion test/eredis_cluster_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,65 @@ basic_test_() ->
eredis_cluster:eval(Script, ScriptHash, ["qrs"], ["evaltest"]),
?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"]))
end
}
},

{ "get info from all pools",
fun () ->
?assertEqual({ok, <<"evaltest">>}, eredis_cluster:q(["get", "qrs"])),
?assertEqual(none, proplists:lookup(error,eredis_cluster:qa(["cluster", "slots"]))),
?assertMatch({error,_}, proplists:lookup(error,eredis_cluster:qa(["get", "qrs"])))
end
},

{ "ASK support",
fun () ->
Key = "{1}:test",
eredis_cluster:q(["set",Key]),
{ok, NodesInfo} = eredis_cluster:q(["cluster","nodes"]),

ClusterNodesList = [CNEL || CNEL <- binary:split(NodesInfo,<<"\n">>, [global]), CNEL =/= <<>>],
NodeIdsL = lists:foldl(fun(ClusterNode, Acc) ->
ClusterNodeI = binary:split(ClusterNode,<<" ">>, [global]),
case lists:nth(3, ClusterNodeI) of
Role when Role == <<"myself,master">>;
Role == <<"master">> ->
[Ip, Port] = binary:split(lists:nth(2, ClusterNodeI), <<":">>, [global]),
Pool = list_to_atom(binary_to_list(Ip) ++ "#" ++ binary_to_list(Port)),
[{binary_to_list(lists:nth(1, ClusterNodeI)), Pool} | Acc];
_ ->
Acc
end
end, [], ClusterNodesList),
KeySlot = eredis_cluster:get_key_slot(Key),

Pool = element(1, eredis_cluster_monitor:get_pool_by_slot(KeySlot)),

{NodeId, Pool} = lists:keyfind(Pool, 2, NodeIdsL),
[{NodeId2, _Pool2}, _] = [{NI, P} || {NI, P} <- NodeIdsL, {NI, P} =/= {NodeId, Pool}],

%% Migrate Slot to have 2 sets of slots for one node:
CmdImp = ["CLUSTER", "SETSLOT", KeySlot, "IMPORTING", NodeId],
eredis_cluster:qa(CmdImp),

CmdMig = ["CLUSTER", "SETSLOT", KeySlot, "MIGRATING", NodeId2],
eredis_cluster:qa(CmdMig),
Result = lists:usort(eredis_cluster:qa(["get",Key])),

Fun = fun({error, <<"MOVED ", _/binary>>}) -> true;
({error, <<"ASK ", _/binary>>}) -> true;
(_) -> false
end,
Verdict = case lists:all(Fun, Result) of
false -> Result;
true -> true
end,

?assertEqual(true, Verdict),

CmdMig1 = ["CLUSTER", "SETSLOT", KeySlot, "NODE", NodeId2],
eredis_cluster:qa(CmdMig1)
end
}
]
}
}.