Skip to content

Commit 008b1da

Browse files
committed
Add sync_start option to Consumer
Allows calling processes to wait until the Consumer is ready to consume.
1 parent 663729e commit 008b1da

File tree

3 files changed

+109
-7
lines changed

3 files changed

+109
-7
lines changed

lib/rabbit/consumer.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ defmodule Rabbit.Consumer do
100100
| {:arguments, Keyword.t()}
101101
| {:custom_meta, map()}
102102
| {:setup_opts, setup_options()}
103+
| {:sync_start, boolean()}
104+
| {:sync_start_delay, non_neg_integer()}
105+
| {:sync_start_max, non_neg_integer()}
103106
@type options :: [option()]
104107
@type delivery_tag :: non_neg_integer()
105108
@type action_options :: [{:multiple, boolean()} | {:requeue, boolean()}]
@@ -217,6 +220,12 @@ defmodule Rabbit.Consumer do
217220
* `:custom_meta` - A map of custom data that will be included in each `Rabbit.Message`
218221
handled by the consumer.
219222
* `:setup_opts` - A keyword list of custom options for use in `c:handle_setup/1`.
223+
* `:sync_start` - Boolean representing whether to establish the connection,
224+
channel, and setup synchronously - defaults to `false`.
225+
* `:sync_start_delay` - The amount of time in milliseconds to sleep between
226+
sync start attempts - defaults to `50`.
227+
* `:sync_start_max` - The max amount of sync start attempts that will occur
228+
before proceeding with async start - defaults to `100`.
220229
221230
## Server Options
222231

lib/rabbit/consumer/server.ex

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ defmodule Rabbit.Consumer.Server do
2020
arguments: [type: :list, default: []],
2121
timeout: [type: [:integer, :atom], required: false],
2222
custom_meta: [type: :map, default: %{}],
23-
setup_opts: [type: :list, default: [], required: false]
23+
setup_opts: [type: :list, default: [], required: false],
24+
sync_start: [type: :boolean, required: true, default: false],
25+
sync_start_delay: [type: :integer, required: true, default: 50],
26+
sync_start_max: [type: :integer, required: true, default: 100]
2427
}
2528

2629
@qos_opts [
@@ -61,6 +64,7 @@ defmodule Rabbit.Consumer.Server do
6164
with {:ok, opts} <- module.init(:consumer, opts),
6265
{:ok, opts} <- validate_opts(opts, @opts_schema) do
6366
state = init_state(module, opts)
67+
state = sync_start(state)
6468
{:ok, state, {:continue, :connection}}
6569
end
6670
end
@@ -171,10 +175,37 @@ defmodule Rabbit.Consumer.Server do
171175
consume_opts: Keyword.take(opts, @consume_opts),
172176
worker_opts: Keyword.take(opts, @worker_opts),
173177
custom_meta: Keyword.get(opts, :custom_meta),
174-
setup_opts: Keyword.get(opts, :setup_opts)
178+
setup_opts: Keyword.get(opts, :setup_opts),
179+
sync_start: Keyword.get(opts, :sync_start),
180+
sync_start_delay: Keyword.get(opts, :sync_start_delay),
181+
sync_start_max: Keyword.get(opts, :sync_start_max),
182+
started_mode: :async
175183
}
176184
end
177185

186+
defp sync_start(state, attempt \\ 1)
187+
188+
defp sync_start(%{sync_start: false} = state, _attempt), do: state
189+
190+
defp sync_start(%{sync_start_max: max} = state, attempt) when attempt >= max do
191+
log_error(state, {:error, :sync_start_failed})
192+
state
193+
end
194+
195+
defp sync_start(state, attempt) do
196+
with {:ok, state} <- connection(state),
197+
{:ok, connection} <- Rabbit.Connection.fetch(state.connection),
198+
{:ok, state} <- channel(state, connection),
199+
{:ok, state} <- handle_setup(state),
200+
{:ok, state} <- consume(state) do
201+
%{state | started_mode: :sync}
202+
else
203+
_ ->
204+
:timer.sleep(state.sync_start_delay)
205+
sync_start(state, attempt + 1)
206+
end
207+
end
208+
178209
defp connection(%{connection_subscribed: true} = state), do: {:ok, state}
179210

180211
defp connection(state) do

test/consumer_test.exs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,36 @@ defmodule Rabbit.ConsumerTest do
101101
end
102102
end
103103

104+
defmodule TroublesomeTestConsumer do
105+
use Rabbit.Consumer
106+
107+
@impl Rabbit.Consumer
108+
def init(:consumer, opts) do
109+
{:ok, opts}
110+
end
111+
112+
@impl Rabbit.Consumer
113+
def handle_setup(state) do
114+
attempt = Agent.get_and_update(state.setup_opts[:counter], fn n -> {n, n + 1} end)
115+
116+
if attempt == 0 do
117+
{:error, :something_went_wrong}
118+
else
119+
AMQP.Queue.declare(state.channel, state.queue, auto_delete: true)
120+
:ok
121+
end
122+
end
123+
124+
@impl Rabbit.Consumer
125+
def handle_message(_msg) do
126+
end
127+
128+
@impl Rabbit.Consumer
129+
def handle_error(_) do
130+
:ok
131+
end
132+
end
133+
104134
setup do
105135
{:ok, connection} = Connection.start_link(TestConnection)
106136
{:ok, producer} = Producer.start_link(TestProducer, connection: connection)
@@ -118,6 +148,34 @@ defmodule Rabbit.ConsumerTest do
118148
end
119149
end
120150

151+
describe "start_link/3 with :sync_start" do
152+
test "starts consumer", meta do
153+
assert {:ok, consumer} =
154+
Consumer.start_link(TestConsumer,
155+
connection: meta.connection,
156+
queue: "consumer",
157+
sync_start: true
158+
)
159+
160+
assert %{started_mode: :sync, consuming: true} = get_state(consumer)
161+
end
162+
163+
test "starts consumer with multiple attempts", meta do
164+
{:ok, counter} = Agent.start(fn -> 0 end)
165+
166+
assert {:ok, consumer} =
167+
Consumer.start_link(TroublesomeTestConsumer,
168+
connection: meta.connection,
169+
queue: "consumer",
170+
sync_start: true,
171+
setup_opts: [counter: counter]
172+
)
173+
174+
assert Agent.get(counter, & &1) == 2
175+
assert %{started_mode: :sync, consuming: true} = get_state(consumer)
176+
end
177+
end
178+
121179
describe "stop/1" do
122180
test "stops consumer", meta do
123181
assert {:ok, consumer, _queue} = start_consumer(meta)
@@ -128,7 +186,7 @@ defmodule Rabbit.ConsumerTest do
128186
test "disconnects the amqp channel", meta do
129187
assert {:ok, consumer, _queue} = start_consumer(meta)
130188

131-
state = GenServer.call(consumer, :state)
189+
state = get_state(consumer)
132190

133191
assert Process.alive?(state.channel.pid)
134192
assert :ok = Consumer.stop(consumer)
@@ -143,10 +201,10 @@ defmodule Rabbit.ConsumerTest do
143201
assert {:ok, consumer, _queue} = start_consumer(meta)
144202

145203
connection_state = connection_state(meta.connection)
146-
consumer_state1 = GenServer.call(consumer, :state)
204+
consumer_state1 = get_state(consumer)
147205
AMQP.Connection.close(connection_state.connection)
148206
await_consuming(consumer)
149-
consumer_state2 = GenServer.call(consumer, :state)
207+
consumer_state2 = get_state(consumer)
150208

151209
assert consumer_state1.channel.pid != consumer_state2.channel.pid
152210
end
@@ -256,7 +314,7 @@ defmodule Rabbit.ConsumerTest do
256314
end
257315

258316
defp await_consuming(consumer) do
259-
state = GenServer.call(consumer, :state)
317+
state = get_state(consumer)
260318

261319
if state.consuming do
262320
:ok
@@ -270,7 +328,11 @@ defmodule Rabbit.ConsumerTest do
270328
:crypto.strong_rand_bytes(8) |> Base.encode64()
271329
end
272330

331+
defp get_state(consumer) do
332+
GenServer.call(consumer, :state)
333+
end
334+
273335
defp connection_state(connection) do
274-
Connection.transaction(connection, &GenServer.call(&1, :state))
336+
Connection.transaction(connection, &get_state/1)
275337
end
276338
end

0 commit comments

Comments
 (0)