diff --git a/lib/rabbit/broker/supervisor.ex b/lib/rabbit/broker/supervisor.ex index 7af7c1c..2d9e5df 100644 --- a/lib/rabbit/broker/supervisor.ex +++ b/lib/rabbit/broker/supervisor.ex @@ -69,7 +69,11 @@ defmodule Rabbit.Broker.Supervisor do name = Broker.consumers(module) opts = Keyword.get(opts, :consumers, []) opts = Enum.map(opts, &Keyword.put(&1, :connection, conn)) - spec = build_spec(Rabbit.Consumer.Supervisor, name, module, opts) + + spec = + Rabbit.Consumer.Supervisor + |> build_spec(name, module, opts) + |> Map.put(:type, :supervisor) children ++ [spec] end diff --git a/lib/rabbit/consumer/executer.ex b/lib/rabbit/consumer/executer.ex index 57b413c..2563172 100644 --- a/lib/rabbit/consumer/executer.ex +++ b/lib/rabbit/consumer/executer.ex @@ -20,7 +20,8 @@ defmodule Rabbit.Consumer.Executer do %{ id: __MODULE__, start: {__MODULE__, :start_link, args}, - restart: :temporary + restart: :temporary, + shutdown: 25_000 } end @@ -49,18 +50,26 @@ defmodule Rabbit.Consumer.Executer do @impl GenServer def handle_continue(:run, state) do state = run(state) - {:noreply, state, state.timeout} + {:noreply, state} end @doc false @impl GenServer def handle_info(:timeout, state) do - if is_pid(state.executer), do: Process.exit(state.executer, :normal) + Task.shutdown(state.task, :brutal_kill) handle_error(state, {:exit, :timeout}, []) - {:stop, :timeout, state} + {:stop, :timeout, %{state | completed: true}} end - def handle_info({:EXIT, pid1, reason}, %{executer: pid2} = state) when pid1 == pid2 do + def handle_info({ref, _result}, %{task: %Task{ref: ref}} = state) do + # Task completed successfully - the task body already did ack/nack. + # Flush the :DOWN message that Task.async sends after completion. + Process.demonitor(ref, [:flush]) + {:stop, :normal, %{state | completed: true}} + end + + def handle_info({:DOWN, ref, :process, _pid, reason}, %{task: %Task{ref: ref}} = state) do + # Task crashed before completing. Run error handler. {reason, stack} = case reason do {%_{} = reason, stack} -> {reason, stack} @@ -68,15 +77,34 @@ defmodule Rabbit.Consumer.Executer do end handle_error(state, reason, stack) - {:stop, reason, state} + {:stop, reason, %{state | completed: true}} + end + + def handle_info({:EXIT, _, _}, state) do + # Task.async links to the caller. Since we trap exits, we receive + # EXIT messages from the task process. The actual result/crash is + # handled via the task ref and :DOWN messages above, so we ignore + # EXIT signals here. + {:noreply, state} end - @doc false @impl GenServer - def handle_cast({:complete, ref1}, %{executer_ref: ref2} = state) when ref1 == ref2 do - {:stop, :normal, state} + def terminate(_reason, %{completed: false, message: message} = state) do + if state.task do + Task.shutdown(state.task, 5_000) + end + + try do + Message.nack(message, requeue: true) + catch + _, _ -> :ok + end + + :ok end + def terminate(_reason, _state), do: :ok + ################################ # Private Functions ################################ @@ -85,9 +113,9 @@ defmodule Rabbit.Consumer.Executer do opts |> Enum.into(%{}) |> Map.merge(%{ - executer: nil, - executer_ref: nil, - message: message + task: nil, + message: message, + completed: false }) end @@ -100,11 +128,8 @@ defmodule Rabbit.Consumer.Executer do end defp run(state) do - parent = self() - ref = make_ref() - - executer = - spawn_link(fn -> + task = + Task.async(fn -> try do message = decode_payload!(state.message) consumer_callback(state, :handle_message, [message]) @@ -113,11 +138,9 @@ defmodule Rabbit.Consumer.Executer do catch msg, reason -> handle_error(state, {msg, reason}, __STACKTRACE__) end - - GenServer.cast(parent, {:complete, ref}) end) - %{state | executer: executer, executer_ref: ref} + %{state | task: task} end defp decode_payload!(message) do diff --git a/lib/rabbit/consumer/server.ex b/lib/rabbit/consumer/server.ex index dc712ca..32e54d2 100644 --- a/lib/rabbit/consumer/server.ex +++ b/lib/rabbit/consumer/server.ex @@ -286,19 +286,36 @@ defmodule Rabbit.Consumer.Server do end state + |> cancel_consumer() |> stop_workers() |> close_channel() end + defp cancel_consumer(%{consuming: true, channel_open: true} = state) do + try do + AMQP.Basic.cancel(state.channel, state.consumer_tag) + catch + _, _ -> :ok + end + + %{state | consuming: false} + end + + defp cancel_consumer(state), do: state + defp stop_workers(state) do if state.workers_started do - Enum.each(state.workers, fn worker -> - try do - :ok = Worker.stop(worker) - catch - _, _ -> :ok - end + state.workers + |> Enum.map(fn worker -> + Task.async(fn -> + try do + Worker.stop(worker) + catch + _, _ -> :ok + end + end) end) + |> Task.await_many(25_000) end %{state | workers: nil, workers_started: false} diff --git a/lib/rabbit/consumer/supervisor.ex b/lib/rabbit/consumer/supervisor.ex index 6821d10..27ca39e 100644 --- a/lib/rabbit/consumer/supervisor.ex +++ b/lib/rabbit/consumer/supervisor.ex @@ -35,7 +35,13 @@ defmodule Rabbit.Consumer.Supervisor do defp build_children(module, [consumer | consumers], children) do id = children |> Enum.count() |> to_string() |> String.to_atom() - spec = %{id: id, start: {Rabbit.Consumer, :start_link, [module, consumer]}} + + spec = %{ + id: id, + start: {Rabbit.Consumer, :start_link, [module, consumer]}, + shutdown: 30_000 + } + children = children ++ [spec] build_children(module, consumers, children) end diff --git a/mix.exs b/mix.exs index 8a5c620..28b2a84 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Rabbit.MixProject do use Mix.Project - @version "0.21.0" + @version "0.22.0" def project do [