From 2d63194ba55ebc3b5a935d399ffe91c4ef2f4647 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:14:45 -0300 Subject: [PATCH 1/6] feat: add function to check supervision tree integrity --- src/ecpool.erl | 6 ++++++ src/ecpool_sup.erl | 34 ++++++++++++++++++++++++++++++++++ test/ecpool_SUITE.erl | 26 +++++++++++++++++++++++++- test/test_client.erl | 14 +++++++++++--- 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/src/ecpool.erl b/src/ecpool.erl index a0298b79..16b4d62b 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -20,6 +20,7 @@ , start_pool/3 , start_sup_pool/3 , stop_sup_pool/1 + , check_pool_integrity/1 , get_client/1 , get_client/2 , pick_and_do/3 @@ -111,6 +112,11 @@ start_sup_pool(Pool, Mod, Opts) -> stop_sup_pool(Pool) -> ecpool_sup:stop_pool(Pool). +-spec check_pool_integrity(pool_name()) -> + ok | {error, {processes_down, [term()]} | not_found}. +check_pool_integrity(Pool) -> + ecpool_sup:check_pool_integrity(Pool). + %% @doc Get client/connection -spec(get_client(pool_name()) -> get_client_ret()). get_client(Pool) -> diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index 80bf71f9..79285a40 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -24,6 +24,7 @@ -export([ start_pool/3 , stop_pool/1 , get_pool/1 + , check_pool_integrity/1 ]). -export([pools/0]). @@ -67,6 +68,16 @@ get_pool(Pool) -> L -> hd(L) end. +-spec check_pool_integrity(pool_name) -> + ok | {error, {processes_down, [term()]} | not_found}. +check_pool_integrity(Pool) -> + case get_pool(Pool) of + undefined -> + {error, not_found}; + SupPid when is_pid(SupPid) -> + do_check_pool_integrity_root(SupPid) + end. + %% @doc Get All Pools supervisored by the ecpool_sup. -spec(pools() -> [{pool_name(), pid()}]). pools() -> @@ -91,3 +102,26 @@ pool_spec(Pool, Mod, Opts) -> modules => [ecpool_pool_sup]}. child_id(Pool) -> {pool_sup, Pool}. + +%%-------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------- + +do_check_pool_integrity_root(SupPid) -> + try supervisor:which_children(SupPid) of + Children -> + %% We ignore `restarting` here because those processes are still being + %% managed. + DeadChildren = [Id || {Id, undefined, _, _} <- Children], + %% Currently, at root, we only have one supervisor: `ecpool_worker_sup`, and + %% it does not contain other supervisors under it, so no need to dig deeper. + case DeadChildren of + [_ | _] -> + {error, {processes_down, DeadChildren}}; + [] -> + ok + end + catch + exit:{noproc, _} -> + {error, {processes_down, [root]}} + end. diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 8658a53b..fd4e17f3 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -81,7 +81,8 @@ groups() -> t_client_exec2_random, t_multiprocess_client, t_multiprocess_client_not_restart, - t_pick_and_do_fun + t_pick_and_do_fun, + t_check_pool_integrity ]}]. init_per_suite(Config) -> @@ -350,3 +351,26 @@ t_pick_and_do_fun(_Config) -> {ok, _} = ecpool:start_pool(Pool, test_client, Opts), ?assertEqual(4, ecpool:pick_and_do({Pool, <<"abc">>}, Action, no_handover)), ecpool:stop_sup_pool(Pool). + +%% Smoke tests for `ecpool:check_pool_integrity`, which should report an error when worker +%% supervisor is down. +t_check_pool_integrity(_TCConfig) -> + Pool = ?FUNCTION_NAME, + Opts1 = [ {pool_size, 15} + , {pool_type, hash} + , {auto_reconnect, false} + ], + {ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts1), + ?assertEqual(ok, ecpool:check_pool_integrity(Pool)), + ok = ecpool:stop_sup_pool(Pool), + Opts2 = [ {crash_after, 1} + , {auto_reconnect, true} + | Opts1 + ], + {ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts2), + %% Give it some time to reach maximum restart intensity + ct:sleep(100), + ?assertEqual({error, {processes_down, [worker_sup]}}, ecpool:check_pool_integrity(Pool)), + ok = ecpool:stop_sup_pool(Pool), + ?assertEqual({error, not_found}, ecpool:check_pool_integrity(Pool)), + ok. diff --git a/test/test_client.erl b/test/test_client.erl index 44d7542e..17ea4d7d 100644 --- a/test/test_client.erl +++ b/test/test_client.erl @@ -43,7 +43,7 @@ connect(Opts) -> {ok, Pid2} = gen_server:start_link(?MODULE, [Opts], []), {ok, {Pid1, Pid2}, #{supervisees => [Pid1, Pid2]}}; false -> - gen_server:start_link(?MODULE, [Opts], []) + gen_server:start_link(?MODULE, Opts, []) end. plus(Pid, L, R) -> @@ -60,7 +60,13 @@ stop(Pid, Reason) -> %%----------------------------------------------------------------------------- init(Args) -> - {ok, Args}. + case proplists:get_value(crash_after, Args, undefined) of + undefined -> + {ok, Args}; + TimeMS when is_integer(TimeMS) -> + erlang:send_after(TimeMS, self(), crash), + {ok, Args} + end. handle_call({stop, Reason}, _From, State) -> {stop, Reason, ok, State}; @@ -74,6 +80,9 @@ handle_call(_Req, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(crash, _State) -> + ct:pal("~p crashing", [self()]), + exit(crash); handle_info(_Info, State) -> {noreply, State}. @@ -82,4 +91,3 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - From dffa7453dec1a06e61c5fcafc0a7e2f09b92cd22 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:15:02 -0300 Subject: [PATCH 2/6] fix: make max restart intensity proportional to pool size Previously, we had a fixed restart intensity for the worker supervisor, meaning that if a large pool dies once, it brings down the supervisor. Here, we have an intensity proportional to the pool size, so the whole pool may restart at once without bringing the supervisor down. --- src/ecpool_pool_sup.erl | 32 ++++++++++++++++++++++---------- src/ecpool_worker_sup.erl | 11 +++++++++-- test/ecpool_SUITE.erl | 22 +++++++++++++++++++++- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl index ab83c0c4..6d5df838 100644 --- a/src/ecpool_pool_sup.erl +++ b/src/ecpool_pool_sup.erl @@ -28,13 +28,25 @@ start_link(Pool, Mod, Opts) -> supervisor:start_link(?MODULE, [Pool, Mod, Opts]). init([Pool, Mod, Opts]) -> - {ok, { {one_for_all, 10, 100}, [ - {pool, {ecpool_pool, start_link, [Pool, Opts]}, - transient, 16#ffff, worker, [ecpool_pool]}, - {worker_sup, - {ecpool_worker_sup,start_link, - [Pool, Mod, Opts]}, - transient, - infinity, - supervisor, - [ecpool_worker_sup]}] }}. + SupFlags = #{ + strategy => one_for_all, + intensity => 10, + period => 100 + }, + PoolWorker = #{ + id => pool, + start => {ecpool_pool, start_link, [Pool, Opts]}, + restart => transient, + shutdown => 16#ffff, + type => worker, + modules => [ecpool_pool] + }, + WorkerSup = #{ + id => worker_sup, + start => {ecpool_worker_sup,start_link, [Pool, Mod, Opts]}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [ecpool_worker_sup] + }, + {ok, {SupFlags, [PoolWorker, WorkerSup]}}. diff --git a/src/ecpool_worker_sup.erl b/src/ecpool_worker_sup.erl index e8f6c618..a3c12e76 100644 --- a/src/ecpool_worker_sup.erl +++ b/src/ecpool_worker_sup.erl @@ -28,6 +28,13 @@ start_link(Pool, Mod, Opts) -> supervisor:start_link(?MODULE, [Pool, Mod, Opts]). init([Pool, Mod, Opts]) -> + PoolSize = pool_size(Opts), + SupFlags = #{ + strategy => one_for_one, + %% Allow whole pool dying simultaneously at least once. + intensity => 10 + PoolSize, + period => 60 + }, WorkerSpec = fun(Id) -> #{id => {worker, Id}, start => {ecpool_worker, @@ -38,8 +45,8 @@ init([Pool, Mod, Opts]) -> type => worker, modules => [ecpool_worker, Mod]} end, - Workers = [WorkerSpec(I) || I <- lists:seq(1, pool_size(Opts))], - {ok, { {one_for_one, 10, 60}, Workers} }. + Workers = [WorkerSpec(I) || I <- lists:seq(1, PoolSize)], + {ok, {SupFlags, Workers} }. pool_size(Opts) -> Schedulers = erlang:system_info(schedulers), diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index fd4e17f3..21a6f660 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -82,7 +82,8 @@ groups() -> t_multiprocess_client, t_multiprocess_client_not_restart, t_pick_and_do_fun, - t_check_pool_integrity + t_check_pool_integrity, + t_big_pool_dies_and_recovers ]}]. init_per_suite(Config) -> @@ -374,3 +375,22 @@ t_check_pool_integrity(_TCConfig) -> ok = ecpool:stop_sup_pool(Pool), ?assertEqual({error, not_found}, ecpool:check_pool_integrity(Pool)), ok. + +%% Previously, we had a fixed restart intensity for the worker supervisor, meaning that if +%% a large pool dies once, it brings down the supervisor. This checks that we have an +%% intensity proportional to the pool size, so the whole pool may restart at once without +%% bringing the supervisor down. +t_big_pool_dies_and_recovers(_TCConfig) -> + Pool = ?FUNCTION_NAME, + Opts = [ {pool_size, 50} + , {pool_type, hash} + , {auto_reconnect, false} + ], + {ok, _} = ecpool:start_sup_pool(Pool, test_client, Opts), + %% Kill all workers at once. + Workers = ecpool:workers(Pool), + lists:foreach(fun({_Id, Pid}) -> exit(Pid, kill) end, Workers), + ct:sleep(100), + ?assertEqual(ok, ecpool:check_pool_integrity(Pool)), + ok = ecpool:stop_sup_pool(Pool), + ok. From b7454905c85f829c7ab0c2702b2841e5822d4433 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Thu, 11 Sep 2025 15:21:17 -0300 Subject: [PATCH 3/6] ci: test with otp 27 --- .github/workflows/erlang.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index ea113d3e..b84ff648 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -18,6 +18,7 @@ jobs: - otp_release: 24 - otp_release: 25 - otp_release: 26 + - otp_release: 27 steps: - uses: actions/checkout@v2 From f24ded7245d1741edcf35062c1d6cd2ca453ce1b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Thu, 11 Sep 2025 16:36:41 -0300 Subject: [PATCH 4/6] chore: address review remarks --- src/ecpool_pool_sup.erl | 10 +++++----- src/ecpool_sup.erl | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl index 6d5df838..1ffad043 100644 --- a/src/ecpool_pool_sup.erl +++ b/src/ecpool_pool_sup.erl @@ -27,15 +27,15 @@ start_link(Pool, Mod, Opts) -> supervisor:start_link(?MODULE, [Pool, Mod, Opts]). -init([Pool, Mod, Opts]) -> +init([PoolName, Mod, Opts]) -> SupFlags = #{ strategy => one_for_all, intensity => 10, period => 100 }, - PoolWorker = #{ + Pool = #{ id => pool, - start => {ecpool_pool, start_link, [Pool, Opts]}, + start => {ecpool_pool, start_link, [PoolName, Opts]}, restart => transient, shutdown => 16#ffff, type => worker, @@ -43,10 +43,10 @@ init([Pool, Mod, Opts]) -> }, WorkerSup = #{ id => worker_sup, - start => {ecpool_worker_sup,start_link, [Pool, Mod, Opts]}, + start => {ecpool_worker_sup,start_link, [PoolName, Mod, Opts]}, restart => transient, shutdown => infinity, type => supervisor, modules => [ecpool_worker_sup] }, - {ok, {SupFlags, [PoolWorker, WorkerSup]}}. + {ok, {SupFlags, [Pool, WorkerSup]}}. diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index 79285a40..698c17ab 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -68,7 +68,7 @@ get_pool(Pool) -> L -> hd(L) end. --spec check_pool_integrity(pool_name) -> +-spec check_pool_integrity(pool_name()) -> ok | {error, {processes_down, [term()]} | not_found}. check_pool_integrity(Pool) -> case get_pool(Pool) of From fa8c20bb278fb0665365d50446deda9de376f258 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Thu, 11 Sep 2025 17:43:21 -0300 Subject: [PATCH 5/6] fix: make `ecpool_worker:safe_exec` actually safe for arity-1 fns --- src/ecpool_worker.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 59a9d3c1..de479c18 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -356,10 +356,14 @@ safe_exec({_M, _F, _A} = Action, MainArg) -> logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]), {error, {exec_failed, E, R}} end; - -%% for backward compatibility upgrading from version =< 4.2.1 safe_exec(Action, MainArg) when is_function(Action) -> - Action(MainArg). + try + Action(MainArg) + catch + E:R:ST -> + logger:error("[PoolWorker] safe_exec ~p, failed: ~0p", [Action, {E,R,ST}]), + {error, {exec_failed, E, R}} + end. exec({M, F, A}, MainArg) -> erlang:apply(M, F, [MainArg]++A). From 532580ed40cbaba61f389530e8bec33d8cd58cfe Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi <16166434+thalesmg@users.noreply.github.com> Date: Thu, 11 Sep 2025 17:46:13 -0300 Subject: [PATCH 6/6] chore: make typespecs more specific --- src/ecpool.erl | 2 +- src/ecpool_sup.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ecpool.erl b/src/ecpool.erl index 16b4d62b..18064794 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -113,7 +113,7 @@ stop_sup_pool(Pool) -> ecpool_sup:stop_pool(Pool). -spec check_pool_integrity(pool_name()) -> - ok | {error, {processes_down, [term()]} | not_found}. + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. check_pool_integrity(Pool) -> ecpool_sup:check_pool_integrity(Pool). diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index 698c17ab..a312d3a8 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -69,7 +69,7 @@ get_pool(Pool) -> end. -spec check_pool_integrity(pool_name()) -> - ok | {error, {processes_down, [term()]} | not_found}. + ok | {error, {processes_down, [root | pool | worker_sup]} | not_found}. check_pool_integrity(Pool) -> case get_pool(Pool) of undefined ->