diff --git a/src/wpool.erl b/src/wpool.erl index adca5f3..0441468 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -151,6 +151,12 @@ %% %% This option can take values `lifo' or `fifo'. Defaults to `fifo'. +-type enable_queues() :: boolean(). +%% A boolean value determining if `queue_manager' should be started for queueing requests. +%% +%% Defaults to `true'. +%% Note that disabling this will disable `available_worker' and `next_available_worker' strategies. + -type enable_callbacks() :: boolean(). %% A boolean value determining if `event_manager' should be started for callback modules. %% @@ -202,6 +208,7 @@ {pool_sup_period, pool_sup_period()} | {queue_type, queue_type()} | {enable_callbacks, enable_callbacks()} | + {enable_queues, enable_queues()} | {callbacks, callbacks()}. %% Options that can be provided to a new pool. %% @@ -221,6 +228,7 @@ pool_sup_period => pool_sup_period(), queue_type => queue_type(), enable_callbacks => enable_callbacks(), + enable_queues => enable_queues(), callbacks => callbacks(), _ => _}. %% Options that can be provided to a new pool. diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index f8a6ae5..4c9ca8e 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -366,8 +366,9 @@ init({Name, Options}) -> _Wpool = store_wpool(Name, Size, Options), WorkerOpts0 = - [{queue_manager, QueueManagerName}, {time_checker, TimeCheckerName} - | maybe_event_manager(Options, {event_manager, EventManagerName})], + [{time_checker, TimeCheckerName}] + ++ maybe_queue_manager(Options, {queue_manager, QueueManagerName}) + ++ maybe_event_manager(Options, {event_manager, EventManagerName}), WorkerOpts = maps:merge( maps:from_list(WorkerOpts0), Options), @@ -406,7 +407,8 @@ init({Name, Options}) -> [wpool_process_sup]}, Children = - [TimeCheckerSpec, QueueManagerSpec] + [TimeCheckerSpec] + ++ maybe_queue_manager(Options, QueueManagerSpec) ++ maybe_event_manager(Options, EventManagerSpec) ++ [ProcessSupSpec], @@ -566,6 +568,11 @@ build_wpool(Name) -> undefined end. +maybe_queue_manager(#{enable_queues := false}, _) -> + []; +maybe_queue_manager(_, Item) -> + [Item]. + maybe_event_manager(#{enable_callbacks := true}, Item) -> [Item]; maybe_event_manager(_, _) -> diff --git a/src/wpool_queue_manager.erl b/src/wpool_queue_manager.erl index 4efe464..787bac9 100644 --- a/src/wpool_queue_manager.erl +++ b/src/wpool_queue_manager.erl @@ -141,7 +141,12 @@ worker_dead(QueueManager, Worker) -> %% @see wpool_pool:stats/1 -spec pending_task_count(queue_mgr()) -> non_neg_integer(). pending_task_count(QueueManager) -> - gen_server:call(QueueManager, pending_task_count). + try + gen_server:call(QueueManager, pending_task_count) + catch + _:{noproc, _} -> + 0 + end. %%%=================================================================== %%% gen_server callbacks diff --git a/test/wpool_pool_SUITE.erl b/test/wpool_pool_SUITE.erl index ff98ae6..523d3c8 100644 --- a/test/wpool_pool_SUITE.erl +++ b/test/wpool_pool_SUITE.erl @@ -27,7 +27,7 @@ -export([init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2]). -export([stop_worker/1, best_worker/1, next_worker/1, random_worker/1, available_worker/1, hash_worker/1, custom_worker/1, next_available_worker/1, wpool_record/1, - queue_type_fifo/1, queue_type_lifo/1, get_workers/1]). + queue_type_fifo/1, queue_type_lifo/1, get_workers/1, no_queue_manager/1]). -export([manager_crash/1, super_fast/1, mess_up_with_store/1]). -elvis([{elvis_style, no_block_expressions, disable}]). @@ -50,6 +50,9 @@ end_per_suite(Config) -> Config. -spec init_per_testcase(atom(), config()) -> config(). +init_per_testcase(no_queue_manager = TestCase, Config) -> + {ok, _} = wpool:start_pool(TestCase, [{workers, 1}, {enable_queues, false}]), + Config; init_per_testcase(queue_type_lifo = TestCase, Config) -> {ok, _} = wpool:start_pool(TestCase, [{workers, 1}, {queue_type, lifo}]), Config; @@ -454,6 +457,37 @@ manager_crash(_Config) -> {comment, []}. +-spec no_queue_manager(config()) -> {comment, []}. +no_queue_manager(_Config) -> + Pool = no_queue_manager, + + ct:log("Check that the pool is working"), + {ok, ok} = wpool:call(Pool, {io, format, ["ok!~n"]}, random_worker), + {ok, ok} = wpool:call(Pool, {io, format, ["ok!~n"]}, next_worker), + {ok, ok} = wpool:call(Pool, {io, format, ["ok!~n"]}, {hash_worker, self()}), + + ct:log("Impossible task"), + Self = self(), + try wpool:call(Pool, {erlang, send, [Self, something]}, available_worker, 0) of + R -> + ct:fail("Unexpected ~p", [R]) + catch + _:no_workers -> + ok + end, + + 0 = proplists:get_value(total_message_queue_len, wpool:stats(Pool)), + + ct:log("Wait a second and nothing gets here"), + receive + X -> + ct:fail("Unexpected ~p", [X]) + after 1000 -> + ok + end, + + {comment, []}. + -spec super_fast(config()) -> {comment, []}. super_fast(_Config) -> Pool = super_fast,