From 91fd5f1e7a3457cde51ad17423474aaa88c6712e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 26 Nov 2024 20:37:41 +0800 Subject: [PATCH 1/3] chore: update appup --- src/ecpool.appup.src | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index c993d477..8b645ab8 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -1,11 +1,19 @@ %% -*-: erlang -*- -{"0.5.11", +{"0.5.12", [ + {"0.5.11", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_sup, brutal_purge, soft_purge, []} + ]}, {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, @@ -45,11 +53,19 @@ ]} ], [ + {"0.5.11", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_sup, brutal_purge, soft_purge, []} + ]}, {"0.5.10", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ + {load_module, ecpool, brutal_purge, soft_purge, []}, + {load_module, ecpool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, From 6293cdfe4db54d81148ed82601ec4f7102a9042b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 26 Nov 2024 20:31:38 +0800 Subject: [PATCH 2/3] feat: support timeout for stopping sup pools --- src/ecpool.erl | 4 ++ src/ecpool_sup.erl | 79 +++++++++++++++++++++++++++++++++--- test/ecpool_SUITE.erl | 14 +++++++ test/test_timeout_client.erl | 26 ++++++++++++ 4 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 test/test_timeout_client.erl diff --git a/src/ecpool.erl b/src/ecpool.erl index fb0409a6..0843ee5a 100644 --- a/src/ecpool.erl +++ b/src/ecpool.erl @@ -20,6 +20,7 @@ , start_pool/3 , start_sup_pool/3 , stop_sup_pool/1 + , stop_sup_pool/2 , get_client/1 , get_client/2 , pick_and_do/3 @@ -91,6 +92,9 @@ start_sup_pool(Pool, Mod, Opts) -> stop_sup_pool(Pool) -> ecpool_sup:stop_pool(Pool). +stop_sup_pool(Pool, Opts) -> + ecpool_sup:stop_pool(Pool, Opts). + %% @doc Get client/connection -spec(get_client(pool_name()) -> get_client_ret()). get_client(Pool) -> diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index 7395a618..38618864 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -23,6 +23,7 @@ %% API -export([ start_pool/3 , stop_pool/1 + , stop_pool/2 , get_pool/1 ]). @@ -32,6 +33,9 @@ -export([init/1]). -type pool_name() :: ecpool:pool_name(). +-type stop_opts() :: #{timeout => non_neg_integer()}. + +-define(STOP_TIMOEUT, infinity). %% @doc Start supervisor. -spec(start_link() -> {ok, pid()} | {error, term()}). @@ -50,13 +54,30 @@ start_pool(Pool, Mod, Opts) -> %% @doc Stop a pool. -spec(stop_pool(Pool :: pool_name()) -> ok | {error, term()}). stop_pool(Pool) -> + stop_pool(Pool, #{}). + +-spec(stop_pool(Pool :: pool_name(), stop_opts()) -> ok | {error, term()}). +stop_pool(Pool, Opts) -> ChildId = child_id(Pool), - case supervisor:terminate_child(?MODULE, ChildId) of - ok -> - supervisor:delete_child(?MODULE, ChildId); - {error, Reason} -> - {error, Reason} - end. + Timeout = maps:get(timeout, Opts, ?STOP_TIMOEUT), + try gen_server:call(?MODULE, {terminate_child, ChildId}, Timeout) of + ok -> delete_child(ChildId, Timeout); + {error, Reason} -> {error, Reason} + catch + exit:{R, _} when R == noproc; R == normal; R == shutdown -> + {error, not_found}; + exit:{timeout, _} -> + %% Sometimes the `ecpool_sup` is not responding to terminate request as the `ecpool_pool_sup` + %% process got stuck in connecting. In this case, we need to cancel the connection + %% by force killing it so the `ecpool_sup` won't be stuck. + _ = kill_ecpool_pool_sup_if_stuck(), + %% Now the `ecpool_pool_sup` process can be in one of the following state: + %% - has been force killed, or + %% - has gone down by itself or by the `terminate_child` call, or + %% - is still running normally + %% In any case we try to remove it from childspec as we have sent a `terminate_child`. + delete_child(ChildId, Timeout) + end. %% @doc Get a pool. -spec(get_pool(pool_name()) -> undefined | pid()). @@ -90,3 +111,49 @@ pool_spec(Pool, Mod, Opts) -> child_id(Pool) -> {pool_sup, Pool}. +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +kill_ecpool_pool_sup_if_stuck() -> + case process_info(whereis(?MODULE), links) of + {links, LinkedPids} -> + case search_ecpool_pool_sup_process(LinkedPids) of + {ok, Pid} -> + exit(Pid, kill), + ok; + Err -> + Err + end; + undefined -> + {error, not_found} + end. + +search_ecpool_pool_sup_process([]) -> + {error, not_found}; +search_ecpool_pool_sup_process([Pid | Rest]) -> + case process_info(Pid, [dictionary, status, current_function]) of + [{dictionary, Dicts}, {status, Status}, {current_function, CurrFunc}] -> + case proplists:get_value('$initial_call', Dicts) of + {supervisor, ecpool_pool_sup, _} -> + case {Status, CurrFunc} of + {waiting, {proc_lib, _, _}} -> + {ok, Pid}; + _ -> + {error, not_stuck_in_start} + end; + _ -> + search_ecpool_pool_sup_process(Rest) + end; + undefined -> + search_ecpool_pool_sup_process(Rest) + end. + +delete_child(ChildId, Timeout) -> + try gen_server:call(?MODULE, {delete_child, ChildId}, Timeout) + catch + exit:{R, _} when R == noproc; R == normal; R == shutdown -> + {error, not_found}; + exit:{timeout, _} -> + {error, timeout} + end. diff --git a/test/ecpool_SUITE.erl b/test/ecpool_SUITE.erl index 816d6b1b..5029c806 100644 --- a/test/ecpool_SUITE.erl +++ b/test/ecpool_SUITE.erl @@ -48,6 +48,7 @@ groups() -> [t_start_pool, t_start_pool_any_name, t_start_sup_pool, + t_start_sup_pool_timeout, t_empty_pool, t_empty_hash_pool, t_restart_client, @@ -113,6 +114,19 @@ t_start_sup_pool(_Config) -> ecpool:stop_sup_pool(xpool), ?assertEqual([], ecpool_sup:pools()). +t_start_sup_pool_timeout(_Config) -> + spawn_link(fun() -> + ?assertMatch({error, {killed, _}}, + ecpool:start_sup_pool(timeout_pool, test_timeout_client, ?POOL_OPTS)) + end), + timer:sleep(100), + {Time, Val} = timer:tc(ecpool, stop_sup_pool, [timeout_pool, #{timeout => 200}]), + ?assert(Time/1000 < 500), + %% The `ecpool:start_sup_pool/3` has not completed before it was cancelled (killed), + %% so `ecpool:stop_sup_pool/2` returns `{error, not_found}`. + ?assertEqual({error, not_found}, Val), + ?assertEqual([], ecpool_sup:pools()). + t_restart_client(_Config) -> ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]), Workers1 = ecpool:workers(?POOL), diff --git a/test/test_timeout_client.erl b/test/test_timeout_client.erl new file mode 100644 index 00000000..b601d042 --- /dev/null +++ b/test/test_timeout_client.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(test_timeout_client). + +-behaviour(ecpool_worker). + +-export([connect/1]). + +connect(Options) -> + Delay = proplists:get_value(delay, Options, 5000), + timer:sleep(Delay), + {ok, erlang:spawn_link(fun() -> ok end)}. From 7fec0612bdc6915206e4be9b142074292da8f0fe Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Nov 2024 00:57:34 +0800 Subject: [PATCH 3/3] chore: use proc dict to determine whether the process has completed initialization --- src/ecpool.appup.src | 16 ++++++++++++++-- src/ecpool_pool_sup.erl | 30 +++++++++++++++++++++++++++++- src/ecpool_sup.erl | 24 +++++++++++------------- 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index 8b645ab8..7e8b88ae 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -3,17 +3,20 @@ [ {"0.5.11", [ {load_module, ecpool, brutal_purge, soft_purge, []}, - {load_module, ecpool_sup, brutal_purge, soft_purge, []} + {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []} ]}, {"0.5.10", [ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, @@ -25,6 +28,7 @@ {apply, {ecpool_monitor, update_clients_global, []}}, {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool, brutal_purge, soft_purge, []} @@ -35,6 +39,7 @@ {apply, {ecpool_monitor, update_clients_global, []}}, {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {apply, {ecpool_monitor, upgrade_clients_state, []}}, {load_module, ecpool, brutal_purge, soft_purge, []}, @@ -49,29 +54,34 @@ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []} ]} ], [ {"0.5.11", [ {load_module, ecpool, brutal_purge, soft_purge, []}, - {load_module, ecpool_sup, brutal_purge, soft_purge, []} + {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []} ]}, {"0.5.10", [ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, {"0.5.9", [ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_monitor, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []} ]}, {<<"0\\.5\\.[3-8]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, @@ -80,6 +90,7 @@ {<<"0\\.5\\.[0-2]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_worker, brutal_purge, soft_purge, []}, {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, @@ -92,6 +103,7 @@ {load_module, ecpool, brutal_purge, soft_purge, []}, {load_module, ecpool_worker_sup, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, + {load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}, {apply, {ecpool_monitor, ensure_monitor_stopped, []}}, {delete_module,ecpool_monitor} ]} diff --git a/src/ecpool_pool_sup.erl b/src/ecpool_pool_sup.erl index 4d7d9874..52ad5bec 100644 --- a/src/ecpool_pool_sup.erl +++ b/src/ecpool_pool_sup.erl @@ -24,13 +24,41 @@ %% Supervisor callbacks -export([init/1]). +-export([clear_init_incomplete/0]). + start_link(Pool, Mod, Opts) -> - supervisor:start_link(?MODULE, [Pool, Mod, Opts]). + case supervisor:start_link(?MODULE, [Pool, Mod, Opts]) of + {ok, SupPid} = Ret -> + _ = init_complete(SupPid), + Ret; + Error -> + Error + end. init([Pool, Mod, Opts]) -> + set_init_incomplete(), {ok, { {one_for_all, 10, 100}, [ {pool, {ecpool_pool, start_link, [Pool, Opts]}, transient, 16#ffff, worker, [ecpool_pool]}, {worker_sup, {ecpool_worker_sup, start_link, [Pool, Mod, Opts]}, transient, infinity, supervisor, [ecpool_worker_sup]}] }}. +init_complete(SupPid) -> + %% Not a real supervisor child, just for clearing the process dict with key `init_incomplete` + DummySpec = #{ + id => clear_init_incomplete, + start => {?MODULE, clear_init_incomplete, []}, + restart => temporary, + shutdown => brutal_kill, + type => worker, + modules => [?MODULE] + }, + supervisor:start_child(SupPid, DummySpec). + +set_init_incomplete() -> + _ = erlang:put(init_incomplete, true), + ok. + +clear_init_incomplete() -> + _ = erlang:erase(init_incomplete), + {error, dummy_child}. diff --git a/src/ecpool_sup.erl b/src/ecpool_sup.erl index 38618864..5b6c9c67 100644 --- a/src/ecpool_sup.erl +++ b/src/ecpool_sup.erl @@ -118,10 +118,10 @@ child_id(Pool) -> {pool_sup, Pool}. kill_ecpool_pool_sup_if_stuck() -> case process_info(whereis(?MODULE), links) of {links, LinkedPids} -> - case search_ecpool_pool_sup_process(LinkedPids) of + case search_stuck_ecpool_pool_sup(LinkedPids) of {ok, Pid} -> exit(Pid, kill), - ok; + {ok, Pid}; Err -> Err end; @@ -129,24 +129,22 @@ kill_ecpool_pool_sup_if_stuck() -> {error, not_found} end. -search_ecpool_pool_sup_process([]) -> +search_stuck_ecpool_pool_sup([]) -> {error, not_found}; -search_ecpool_pool_sup_process([Pid | Rest]) -> - case process_info(Pid, [dictionary, status, current_function]) of - [{dictionary, Dicts}, {status, Status}, {current_function, CurrFunc}] -> +search_stuck_ecpool_pool_sup([Pid | Rest]) -> + case process_info(Pid, dictionary) of + {dictionary, Dicts} -> case proplists:get_value('$initial_call', Dicts) of {supervisor, ecpool_pool_sup, _} -> - case {Status, CurrFunc} of - {waiting, {proc_lib, _, _}} -> - {ok, Pid}; - _ -> - {error, not_stuck_in_start} + case proplists:get_value(init_incomplete, Dicts) of + true -> {ok, Pid}; + _ -> {error, {not_stuck_in_init, Pid}} end; _ -> - search_ecpool_pool_sup_process(Rest) + search_stuck_ecpool_pool_sup(Rest) end; undefined -> - search_ecpool_pool_sup_process(Rest) + search_stuck_ecpool_pool_sup(Rest) end. delete_child(ChildId, Timeout) ->