From e576f825012d9d1496f89ba68b01867856418288 Mon Sep 17 00:00:00 2001 From: nsweeting Date: Wed, 25 Mar 2026 09:27:27 -0400 Subject: [PATCH 1/2] fix: ensure graceful consumer shutdown on SIGTERM - Set Consumer.Supervisor child spec type to :supervisor so it gets shutdown: :infinity instead of being killed after 5 seconds - Set explicit shutdown: 30_000 on each Consumer.Server child spec - Cancel AMQP consumption (Basic.cancel) before stopping workers so no new messages arrive during drain - Stop workers in parallel instead of sequentially to fit within the shutdown budget - Add terminate/2 to Executer that nacks unfinished messages with requeue: true, preventing unacked messages from accumulating on quorum queues - Set Executer child_spec shutdown: 25_000 to give in-flight messages time to complete before the safety-net nack - Bump version to 0.22.0 --- lib/rabbit/broker/supervisor.ex | 6 +++++- lib/rabbit/consumer/executer.ex | 29 ++++++++++++++++++++++++----- lib/rabbit/consumer/server.ex | 29 +++++++++++++++++++++++------ lib/rabbit/consumer/supervisor.ex | 8 +++++++- mix.exs | 2 +- 5 files changed, 60 insertions(+), 14 deletions(-) diff --git a/lib/rabbit/broker/supervisor.ex b/lib/rabbit/broker/supervisor.ex index 7af7c1c..2d9e5df 100644 --- a/lib/rabbit/broker/supervisor.ex +++ b/lib/rabbit/broker/supervisor.ex @@ -69,7 +69,11 @@ defmodule Rabbit.Broker.Supervisor do name = Broker.consumers(module) opts = Keyword.get(opts, :consumers, []) opts = Enum.map(opts, &Keyword.put(&1, :connection, conn)) - spec = build_spec(Rabbit.Consumer.Supervisor, name, module, opts) + + spec = + Rabbit.Consumer.Supervisor + |> build_spec(name, module, opts) + |> Map.put(:type, :supervisor) children ++ [spec] end diff --git a/lib/rabbit/consumer/executer.ex b/lib/rabbit/consumer/executer.ex index 57b413c..97b546e 100644 --- a/lib/rabbit/consumer/executer.ex +++ b/lib/rabbit/consumer/executer.ex @@ -20,7 +20,8 @@ defmodule Rabbit.Consumer.Executer do %{ id: __MODULE__, start: {__MODULE__, :start_link, args}, - restart: :temporary + restart: :temporary, + shutdown: 25_000 } end @@ -57,7 +58,7 @@ defmodule Rabbit.Consumer.Executer do def handle_info(:timeout, state) do if is_pid(state.executer), do: Process.exit(state.executer, :normal) handle_error(state, {:exit, :timeout}, []) - {:stop, :timeout, state} + {:stop, :timeout, %{state | completed: true}} end def handle_info({:EXIT, pid1, reason}, %{executer: pid2} = state) when pid1 == pid2 do @@ -68,15 +69,32 @@ defmodule Rabbit.Consumer.Executer do end handle_error(state, reason, stack) - {:stop, reason, state} + {:stop, reason, %{state | completed: true}} end @doc false @impl GenServer def handle_cast({:complete, ref1}, %{executer_ref: ref2} = state) when ref1 == ref2 do - {:stop, :normal, state} + {:stop, :normal, %{state | completed: true}} end + @impl GenServer + def terminate(_reason, %{completed: false, message: message} = state) do + if is_pid(state.executer) and Process.alive?(state.executer) do + Process.exit(state.executer, :kill) + end + + try do + Message.nack(message, requeue: true) + catch + _, _ -> :ok + end + + :ok + end + + def terminate(_reason, _state), do: :ok + ################################ # Private Functions ################################ @@ -87,7 +105,8 @@ defmodule Rabbit.Consumer.Executer do |> Map.merge(%{ executer: nil, executer_ref: nil, - message: message + message: message, + completed: false }) end diff --git a/lib/rabbit/consumer/server.ex b/lib/rabbit/consumer/server.ex index dc712ca..32e54d2 100644 --- a/lib/rabbit/consumer/server.ex +++ b/lib/rabbit/consumer/server.ex @@ -286,19 +286,36 @@ defmodule Rabbit.Consumer.Server do end state + |> cancel_consumer() |> stop_workers() |> close_channel() end + defp cancel_consumer(%{consuming: true, channel_open: true} = state) do + try do + AMQP.Basic.cancel(state.channel, state.consumer_tag) + catch + _, _ -> :ok + end + + %{state | consuming: false} + end + + defp cancel_consumer(state), do: state + defp stop_workers(state) do if state.workers_started do - Enum.each(state.workers, fn worker -> - try do - :ok = Worker.stop(worker) - catch - _, _ -> :ok - end + state.workers + |> Enum.map(fn worker -> + Task.async(fn -> + try do + Worker.stop(worker) + catch + _, _ -> :ok + end + end) end) + |> Task.await_many(25_000) end %{state | workers: nil, workers_started: false} diff --git a/lib/rabbit/consumer/supervisor.ex b/lib/rabbit/consumer/supervisor.ex index 6821d10..27ca39e 100644 --- a/lib/rabbit/consumer/supervisor.ex +++ b/lib/rabbit/consumer/supervisor.ex @@ -35,7 +35,13 @@ defmodule Rabbit.Consumer.Supervisor do defp build_children(module, [consumer | consumers], children) do id = children |> Enum.count() |> to_string() |> String.to_atom() - spec = %{id: id, start: {Rabbit.Consumer, :start_link, [module, consumer]}} + + spec = %{ + id: id, + start: {Rabbit.Consumer, :start_link, [module, consumer]}, + shutdown: 30_000 + } + children = children ++ [spec] build_children(module, consumers, children) end diff --git a/mix.exs b/mix.exs index 8a5c620..28b2a84 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Rabbit.MixProject do use Mix.Project - @version "0.21.0" + @version "0.22.0" def project do [ From 19c0e8a86e10478eb3765caaa6078ba3bfaba926 Mon Sep 17 00:00:00 2001 From: nsweeting Date: Wed, 25 Mar 2026 09:45:14 -0400 Subject: [PATCH 2/2] refactor: use Task.async in Executer for graceful shutdown Replace spawn_link with Task.async so terminate/2 can use Task.shutdown/2 to give in-flight messages a 5s grace period to complete before escalating to :kill and safety-net nacking. Previously the spawned process was killed immediately on shutdown with no chance to finish. Now the sequence is: 1. Task.shutdown(task, 5_000) - sends :shutdown, waits 5s 2. If task doesn't finish, brutally kills it 3. Safety-net nack with requeue: true --- lib/rabbit/consumer/executer.ex | 42 ++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/lib/rabbit/consumer/executer.ex b/lib/rabbit/consumer/executer.ex index 97b546e..2563172 100644 --- a/lib/rabbit/consumer/executer.ex +++ b/lib/rabbit/consumer/executer.ex @@ -50,18 +50,26 @@ defmodule Rabbit.Consumer.Executer do @impl GenServer def handle_continue(:run, state) do state = run(state) - {:noreply, state, state.timeout} + {:noreply, state} end @doc false @impl GenServer def handle_info(:timeout, state) do - if is_pid(state.executer), do: Process.exit(state.executer, :normal) + Task.shutdown(state.task, :brutal_kill) handle_error(state, {:exit, :timeout}, []) {:stop, :timeout, %{state | completed: true}} end - def handle_info({:EXIT, pid1, reason}, %{executer: pid2} = state) when pid1 == pid2 do + def handle_info({ref, _result}, %{task: %Task{ref: ref}} = state) do + # Task completed successfully - the task body already did ack/nack. + # Flush the :DOWN message that Task.async sends after completion. + Process.demonitor(ref, [:flush]) + {:stop, :normal, %{state | completed: true}} + end + + def handle_info({:DOWN, ref, :process, _pid, reason}, %{task: %Task{ref: ref}} = state) do + # Task crashed before completing. Run error handler. {reason, stack} = case reason do {%_{} = reason, stack} -> {reason, stack} @@ -72,16 +80,18 @@ defmodule Rabbit.Consumer.Executer do {:stop, reason, %{state | completed: true}} end - @doc false - @impl GenServer - def handle_cast({:complete, ref1}, %{executer_ref: ref2} = state) when ref1 == ref2 do - {:stop, :normal, %{state | completed: true}} + def handle_info({:EXIT, _, _}, state) do + # Task.async links to the caller. Since we trap exits, we receive + # EXIT messages from the task process. The actual result/crash is + # handled via the task ref and :DOWN messages above, so we ignore + # EXIT signals here. + {:noreply, state} end @impl GenServer def terminate(_reason, %{completed: false, message: message} = state) do - if is_pid(state.executer) and Process.alive?(state.executer) do - Process.exit(state.executer, :kill) + if state.task do + Task.shutdown(state.task, 5_000) end try do @@ -103,8 +113,7 @@ defmodule Rabbit.Consumer.Executer do opts |> Enum.into(%{}) |> Map.merge(%{ - executer: nil, - executer_ref: nil, + task: nil, message: message, completed: false }) @@ -119,11 +128,8 @@ defmodule Rabbit.Consumer.Executer do end defp run(state) do - parent = self() - ref = make_ref() - - executer = - spawn_link(fn -> + task = + Task.async(fn -> try do message = decode_payload!(state.message) consumer_callback(state, :handle_message, [message]) @@ -132,11 +138,9 @@ defmodule Rabbit.Consumer.Executer do catch msg, reason -> handle_error(state, {msg, reason}, __STACKTRACE__) end - - GenServer.cast(parent, {:complete, ref}) end) - %{state | executer: executer, executer_ref: ref} + %{state | task: task} end defp decode_payload!(message) do