Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
- otp_release: 24
- otp_release: 25
- otp_release: 26
- otp_release: 27

steps:
- uses: actions/checkout@v2
Expand Down
6 changes: 6 additions & 0 deletions src/ecpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
, start_pool/3
, start_sup_pool/3
, stop_sup_pool/1
, check_pool_integrity/1
, get_client/1
, get_client/2
, pick_and_do/3
Expand Down Expand Up @@ -111,6 +112,11 @@ start_sup_pool(Pool, Mod, Opts) ->
stop_sup_pool(Pool) ->
ecpool_sup:stop_pool(Pool).

-spec check_pool_integrity(pool_name()) ->
ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}.
check_pool_integrity(Pool) ->
ecpool_sup:check_pool_integrity(Pool).

%% @doc Get client/connection
-spec(get_client(pool_name()) -> get_client_ret()).
get_client(Pool) ->
Expand Down
34 changes: 23 additions & 11 deletions src/ecpool_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,26 @@
start_link(Pool, Mod, Opts) ->
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).

init([Pool, Mod, Opts]) ->
{ok, { {one_for_all, 10, 100}, [
{pool, {ecpool_pool, start_link, [Pool, Opts]},
transient, 16#ffff, worker, [ecpool_pool]},
{worker_sup,
{ecpool_worker_sup,start_link,
[Pool, Mod, Opts]},
transient,
infinity,
supervisor,
[ecpool_worker_sup]}] }}.
init([PoolName, Mod, Opts]) ->
SupFlags = #{
strategy => one_for_all,
intensity => 10,
period => 100
},
Pool = #{
id => pool,
start => {ecpool_pool, start_link, [PoolName, Opts]},
restart => transient,
shutdown => 16#ffff,
type => worker,
modules => [ecpool_pool]
},
WorkerSup = #{
id => worker_sup,
start => {ecpool_worker_sup,start_link, [PoolName, Mod, Opts]},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [ecpool_worker_sup]
},
{ok, {SupFlags, [Pool, WorkerSup]}}.
34 changes: 34 additions & 0 deletions src/ecpool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-export([ start_pool/3
, stop_pool/1
, get_pool/1
, check_pool_integrity/1
]).

-export([pools/0]).
Expand Down Expand Up @@ -67,6 +68,16 @@ get_pool(Pool) ->
L -> hd(L)
end.

-spec check_pool_integrity(pool_name()) ->
ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}.
check_pool_integrity(Pool) ->
case get_pool(Pool) of
undefined ->
{error, not_found};
SupPid when is_pid(SupPid) ->
do_check_pool_integrity_root(SupPid)
end.

%% @doc Get All Pools supervisored by the ecpool_sup.
-spec(pools() -> [{pool_name(), pid()}]).
pools() ->
Expand All @@ -91,3 +102,26 @@ pool_spec(Pool, Mod, Opts) ->
modules => [ecpool_pool_sup]}.

child_id(Pool) -> {pool_sup, Pool}.

%%--------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------

do_check_pool_integrity_root(SupPid) ->
try supervisor:which_children(SupPid) of
Children ->
%% We ignore `restarting` here because those processes are still being
%% managed.
DeadChildren = [Id || {Id, undefined, _, _} <- Children],
%% Currently, at root, we only have one supervisor: `ecpool_worker_sup`, and
%% it does not contain other supervisors under it, so no need to dig deeper.
case DeadChildren of
[_ | _] ->
{error, {processes_down, DeadChildren}};
[] ->
ok
end
catch
exit:{noproc, _} ->
{error, {processes_down, [root]}}
end.
10 changes: 7 additions & 3 deletions src/ecpool_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,14 @@ safe_exec({_M, _F, _A} = Action, MainArg) ->
logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]),
{error, {exec_failed, E, R}}
end;

%% for backward compatibility upgrading from version =< 4.2.1
safe_exec(Action, MainArg) when is_function(Action) ->
Action(MainArg).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the cause for the crashes: this clause was not catching anything.

try
Action(MainArg)
catch
E:R:ST ->
logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]),
{error, {exec_failed, E, R}}
end.

exec({M, F, A}, MainArg) ->
erlang:apply(M, F, [MainArg]++A).
Expand Down
11 changes: 9 additions & 2 deletions src/ecpool_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ start_link(Pool, Mod, Opts) ->
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).

init([Pool, Mod, Opts]) ->
PoolSize = pool_size(Opts),
SupFlags = #{
strategy => one_for_one,
%% Allow whole pool dying simultaneously at least once.
intensity => 10 + PoolSize,
period => 60
},
WorkerSpec = fun(Id) ->
#{id => {worker, Id},
start => {ecpool_worker,
Expand All @@ -38,8 +45,8 @@ init([Pool, Mod, Opts]) ->
type => worker,
modules => [ecpool_worker, Mod]}
end,
Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))],
{ok, { {one_for_one, 10, 60}, Workers} }.
Workers = [WorkerSpec(I) || I <- lists:seq(1, PoolSize)],
{ok, {SupFlags, Workers} }.

pool_size(Opts) ->
Schedulers = erlang:system_info(schedulers),
Expand Down
46 changes: 45 additions & 1 deletion test/ecpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ groups() ->
t_client_exec2_random,
t_multiprocess_client,
t_multiprocess_client_not_restart,
t_pick_and_do_fun
t_pick_and_do_fun,
t_check_pool_integrity,
t_big_pool_dies_and_recovers
]}].

init_per_suite(Config) ->
Expand Down Expand Up @@ -350,3 +352,45 @@ t_pick_and_do_fun(_Config) ->
{ok, _} = ecpool:start_pool(Pool, test_client, Opts),
?assertEqual(4, ecpool:pick_and_do({Pool, <<"abc">>}, Action, no_handover)),
ecpool:stop_sup_pool(Pool).

%% Smoke tests for `ecpool:check_pool_integrity`, which should report an error when worker
%% supervisor is down.
t_check_pool_integrity(_TCConfig) ->
Pool = ?FUNCTION_NAME,
Opts1 = [ {pool_size, 15}
, {pool_type, hash}
, {auto_reconnect, false}
],
{ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts1),
?assertEqual(ok, ecpool:check_pool_integrity(Pool)),
ok = ecpool:stop_sup_pool(Pool),
Opts2 = [ {crash_after, 1}
, {auto_reconnect, true}
| Opts1
],
{ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts2),
%% Give it some time to reach maximum restart intensity
ct:sleep(100),
?assertEqual({error, {processes_down, [worker_sup]}}, ecpool:check_pool_integrity(Pool)),
ok = ecpool:stop_sup_pool(Pool),
?assertEqual({error, not_found}, ecpool:check_pool_integrity(Pool)),
ok.

%% Previously, we had a fixed restart intensity for the worker supervisor, meaning that if
%% a large pool dies once, it brings down the supervisor. This checks that we have an
%% intensity proportional to the pool size, so the whole pool may restart at once without
%% bringing the supervisor down.
t_big_pool_dies_and_recovers(_TCConfig) ->
Pool = ?FUNCTION_NAME,
Opts = [ {pool_size, 50}
, {pool_type, hash}
, {auto_reconnect, false}
],
{ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts),
%% Kill all workers at once.
Workers = ecpool:workers(Pool),
lists:foreach(fun({_Id, Pid}) -> exit(Pid, kill) end, Workers),
ct:sleep(100),
?assertEqual(ok, ecpool:check_pool_integrity(Pool)),
ok = ecpool:stop_sup_pool(Pool),
ok.
14 changes: 11 additions & 3 deletions test/test_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ connect(Opts) ->
{ok, Pid2} = gen_server:start_link(?MODULE, [Opts], []),
{ok, {Pid1, Pid2}, #{supervisees => [Pid1, Pid2]}};
false ->
gen_server:start_link(?MODULE, [Opts], [])
gen_server:start_link(?MODULE, Opts, [])
end.

plus(Pid, L, R) ->
Expand All @@ -60,7 +60,13 @@ stop(Pid, Reason) ->
%%-----------------------------------------------------------------------------

init(Args) ->
{ok, Args}.
case proplists:get_value(crash_after, Args, undefined) of
undefined ->
{ok, Args};
TimeMS when is_integer(TimeMS) ->
erlang:send_after(TimeMS, self(), crash),
{ok, Args}
end.

handle_call({stop, Reason}, _From, State) ->
{stop, Reason, ok, State};
Expand All @@ -74,6 +80,9 @@ handle_call(_Req, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(crash, _State) ->
ct:pal("~p crashing", [self()]),
exit(crash);
handle_info(_Info, State) ->
{noreply, State}.

Expand All @@ -82,4 +91,3 @@ terminate(_Reason, _State) ->

code_change(_OldVsn, State, _Extra) ->
{ok, State}.