diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index ea113d3e..b84ff648 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -18,6 +18,7 @@ jobs: - otp_release: 24 - otp_release: 25 - otp_release: 26 + - otp_release: 27 steps: - uses: actions/checkout@v2 diff --git a/src/ecpool.erl b/src/ecpool.erl index a0298b79..18064794 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -20,6 +20,7 @@ , start_pool/3 , start_sup_pool/3 , stop_sup_pool/1 + , check_pool_integrity/1 , get_client/1 , get_client/2 , pick_and_do/3 @@ -111,6 +112,11 @@ start_sup_pool(Pool, Mod, Opts) -> stop_sup_pool(Pool) -> ecpool_sup:stop_pool(Pool). +-spec check_pool_integrity(pool_name()) -> + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. +check_pool_integrity(Pool) -> + ecpool_sup:check_pool_integrity(Pool). + %% @doc Get client/connection -spec(get_client(pool_name()) -> get_client_ret()). get_client(Pool) -> diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl index ab83c0c4..1ffad043 100644 --- a/src/ecpool_pool_sup.erl +++ b/src/ecpool_pool_sup.erl @@ -27,14 +27,26 @@ start_link(Pool, Mod, Opts) -> supervisor:start_link(?MODULE, [Pool, Mod, Opts]). -init([Pool, Mod, Opts]) -> - {ok, { {one_for_all, 10, 100}, [ - {pool, {ecpool_pool, start_link, [Pool, Opts]}, - transient, 16#ffff, worker, [ecpool_pool]}, - {worker_sup, - {ecpool_worker_sup,start_link, - [Pool, Mod, Opts]}, - transient, - infinity, - supervisor, - [ecpool_worker_sup]}] }}. +init([PoolName, Mod, Opts]) -> + SupFlags = #{ + strategy => one_for_all, + intensity => 10, + period => 100 + }, + Pool = #{ + id => pool, + start => {ecpool_pool, start_link, [PoolName, Opts]}, + restart => transient, + shutdown => 16#ffff, + type => worker, + modules => [ecpool_pool] + }, + WorkerSup = #{ + id => worker_sup, + start => {ecpool_worker_sup,start_link, [PoolName, Mod, Opts]}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [ecpool_worker_sup] + }, + {ok, {SupFlags, [Pool, WorkerSup]}}. diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index 80bf71f9..a312d3a8 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -24,6 +24,7 @@ -export([ start_pool/3 , stop_pool/1 , get_pool/1 + , check_pool_integrity/1 ]). -export([pools/0]). @@ -67,6 +68,16 @@ get_pool(Pool) -> L -> hd(L) end. +-spec check_pool_integrity(pool_name()) -> + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. +check_pool_integrity(Pool) -> + case get_pool(Pool) of + undefined -> + {error, not_found}; + SupPid when is_pid(SupPid) -> + do_check_pool_integrity_root(SupPid) + end. + %% @doc Get All Pools supervisored by the ecpool_sup. -spec(pools() -> [{pool_name(), pid()}]). pools() -> @@ -91,3 +102,26 @@ pool_spec(Pool, Mod, Opts) -> modules => [ecpool_pool_sup]}. child_id(Pool) -> {pool_sup, Pool}. + +%%-------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------- + +do_check_pool_integrity_root(SupPid) -> + try supervisor:which_children(SupPid) of + Children -> + %% We ignore `restarting` here because those processes are still being + %% managed. + DeadChildren = [Id || {Id, undefined, _, _} <- Children], + %% Currently, at root, we only have one supervisor: `ecpool_worker_sup`, and + %% it does not contain other supervisors under it, so no need to dig deeper. + case DeadChildren of + [_ | _] -> + {error, {processes_down, DeadChildren}}; + [] -> + ok + end + catch + exit:{noproc, _} -> + {error, {processes_down, [root]}} + end. diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 59a9d3c1..de479c18 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -356,10 +356,14 @@ safe_exec({_M, _F, _A} = Action, MainArg) -> logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]), {error, {exec_failed, E, R}} end; - -%% for backward compatibility upgrading from version =< 4.2.1 safe_exec(Action, MainArg) when is_function(Action) -> - Action(MainArg). + try + Action(MainArg) + catch + E:R:ST -> + logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]), + {error, {exec_failed, E, R}} + end. exec({M, F, A}, MainArg) -> erlang:apply(M, F, [MainArg]++A). diff --git a/src/ecpool_worker_sup.erl b/src/ecpool_worker_sup.erl index e8f6c618..a3c12e76 100644 --- a/src/ecpool_worker_sup.erl +++ b/src/ecpool_worker_sup.erl @@ -28,6 +28,13 @@ start_link(Pool, Mod, Opts) -> supervisor:start_link(?MODULE, [Pool, Mod, Opts]). init([Pool, Mod, Opts]) -> + PoolSize = pool_size(Opts), + SupFlags = #{ + strategy => one_for_one, + %% Allow whole pool dying simultaneously at least once. + intensity => 10 + PoolSize, + period => 60 + }, WorkerSpec = fun(Id) -> #{id => {worker, Id}, start => {ecpool_worker, @@ -38,8 +45,8 @@ init([Pool, Mod, Opts]) -> type => worker, modules => [ecpool_worker, Mod]} end, - Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))], - {ok, { {one_for_one, 10, 60}, Workers} }. + Workers = [WorkerSpec(I) || I <- lists:seq(1, PoolSize)], + {ok, {SupFlags, Workers} }. pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 8658a53b..21a6f660 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -81,7 +81,9 @@ groups() -> t_client_exec2_random, t_multiprocess_client, t_multiprocess_client_not_restart, - t_pick_and_do_fun + t_pick_and_do_fun, + t_check_pool_integrity, + t_big_pool_dies_and_recovers ]}]. init_per_suite(Config) -> @@ -350,3 +352,45 @@ t_pick_and_do_fun(_Config) -> {ok, _} = ecpool:start_pool(Pool, test_client, Opts), ?assertEqual(4, ecpool:pick_and_do({Pool, <<"abc">>}, Action, no_handover)), ecpool:stop_sup_pool(Pool). + +%% Smoke tests for `ecpool:check_pool_integrity`, which should report an error when worker +%% supervisor is down. +t_check_pool_integrity(_TCConfig) -> + Pool = ?FUNCTION_NAME, + Opts1 = [ {pool_size, 15} + , {pool_type, hash} + , {auto_reconnect, false} + ], + {ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts1), + ?assertEqual(ok, ecpool:check_pool_integrity(Pool)), + ok = ecpool:stop_sup_pool(Pool), + Opts2 = [ {crash_after, 1} + , {auto_reconnect, true} + | Opts1 + ], + {ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts2), + %% Give it some time to reach maximum restart intensity + ct:sleep(100), + ?assertEqual({error, {processes_down, [worker_sup]}}, ecpool:check_pool_integrity(Pool)), + ok = ecpool:stop_sup_pool(Pool), + ?assertEqual({error, not_found}, ecpool:check_pool_integrity(Pool)), + ok. + +%% Previously, we had a fixed restart intensity for the worker supervisor, meaning that if +%% a large pool dies once, it brings down the supervisor. This checks that we have an +%% intensity proportional to the pool size, so the whole pool may restart at once without +%% bringing the supervisor down. +t_big_pool_dies_and_recovers(_TCConfig) -> + Pool = ?FUNCTION_NAME, + Opts = [ {pool_size, 50} + , {pool_type, hash} + , {auto_reconnect, false} + ], + {ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts), + %% Kill all workers at once. + Workers = ecpool:workers(Pool), + lists:foreach(fun({_Id, Pid}) -> exit(Pid, kill) end, Workers), + ct:sleep(100), + ?assertEqual(ok, ecpool:check_pool_integrity(Pool)), + ok = ecpool:stop_sup_pool(Pool), + ok. diff --git a/test/test_client.erl b/test/test_client.erl index 44d7542e..17ea4d7d 100644 --- a/test/test_client.erl +++ b/test/test_client.erl @@ -43,7 +43,7 @@ connect(Opts) -> {ok, Pid2} = gen_server:start_link(?MODULE, [Opts], []), {ok, {Pid1, Pid2}, #{supervisees => [Pid1, Pid2]}}; false -> - gen_server:start_link(?MODULE, [Opts], []) + gen_server:start_link(?MODULE, Opts, []) end. plus(Pid, L, R) -> @@ -60,7 +60,13 @@ stop(Pid, Reason) -> %%----------------------------------------------------------------------------- init(Args) -> - {ok, Args}. + case proplists:get_value(crash_after, Args, undefined) of + undefined -> + {ok, Args}; + TimeMS when is_integer(TimeMS) -> + erlang:send_after(TimeMS, self(), crash), + {ok, Args} + end. handle_call({stop, Reason}, _From, State) -> {stop, Reason, ok, State}; @@ -74,6 +80,9 @@ handle_call(_Req, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(crash, _State) -> + ct:pal("~p crashing", [self()]), + exit(crash); handle_info(_Info, State) -> {noreply, State}. @@ -82,4 +91,3 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -