From c5862e244f0d2f44f4886799a4f1f1b87e08b1d2 Mon Sep 17 00:00:00 2001 From: Marc Van Oevelen Date: Tue, 11 Sep 2018 19:51:13 +0200 Subject: [PATCH 1/4] Add naming to flow-generated processes Optional automatic generated unique names Optionally overruled using :name --- lib/flow.ex | 91 +++++++++++++++++++++++++++++++++++++---- lib/flow/coordinator.ex | 31 ++++++++++++-- lib/flow/materialize.ex | 66 +++++++++++++++++++----------- test/flow_test.exs | 87 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 35 deletions(-) diff --git a/lib/flow.ex b/lib/flow.ex index f538a6f..b218375 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -343,6 +343,55 @@ defmodule Flow do consumers child specification. Similar to `start_link/1`, they return either `{:ok, pid}` or `{:error, reason}`. + ## Naming Flows, Partitions and Stages + + By default all process stages generated by `Flow` are launched as a list + of children under a flow supervisor process. + Those processes have no name registrations and can only be identified by PID. + + By using the :name option to `start_link/2` all generated GenStage processes + automatically get unique name registration. + e.g. + + Flow.from_stages([:producer], stages: 1) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.partition(stages: 2) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) + |> Flow.start_link(name: :myflow) + + Results in named flow processes : + [:myflow] # flow top process + [:myflow_sup ] # flow supervisor + [:myflow_p0_0 ] # First partition stage (input stage) + [:myflow_p1_0] [:myflow_p1_1] # Second partition stages + + + In applications using multiple flows, it is also possible to get automatic + unique names for the flows themselves by specifying [name: :auto] + resulting in [:flow0, :flow1, ...] + + + When using named flows, partition names can be explicitly specified using + the :name option on statements that effectively create partition processes + like `partition/2`, `from_stages/2` and `from_enumerables/2` + e.g. + + Flow.from_stages([:producer], stages: 1, name: :input) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.partition(stages: 2, name: :output) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) + |> Flow.start_link(name: :myflow) + + Results in named flow processes : + [:myflow] # flow top process + [:myflow_sup ] # flow supervisor + [:myflow_input_0 ] # First partition stage (input stage) + [:myflow_output_0] [:myflow_output_1] # Second partition stages + ## Performance discussions In this section we will discuss points related to performance @@ -558,6 +607,8 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` * `:shutdown` - the shutdown time for this stage when the flow is shut down. @@ -577,7 +628,7 @@ defmodule Flow do def from_enumerables(enumerables, options \\ []) def from_enumerables([_ | _] = enumerables, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{producers: {:enumerables, enumerables}, options: options, window: window} end @@ -624,6 +675,8 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` * `:shutdown` - the shutdown time for this stage when the flow is shut down. @@ -663,7 +716,7 @@ defmodule Flow do def from_stages(producers, options \\ []) def from_stages([_ | _] = producers, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) producers = Enum.map(producers, &{&1, []}) @@ -706,7 +759,7 @@ defmodule Flow do def from_specs(producers, options \\ []) def from_specs([_ | _] = producers, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) fun = fn start_link -> @@ -799,7 +852,7 @@ defmodule Flow do def through_stages(flow, producer_consumers, options \\ []) def through_stages(%Flow{} = flow, [_ | _] = producer_consumers, options) do - options = stages(options) + options = stages(options) |> partition_id([flow]) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{ @@ -848,7 +901,7 @@ defmodule Flow do def through_specs(flow, producer_consumers, options \\ []) def through_specs(%Flow{} = flow, [_ | _] = producer_consumers, options) do - options = stages(options) + options = stages(options) |> partition_id([flow]) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{ @@ -1022,6 +1075,9 @@ defmodule Flow do ## Options * `:name` - the name of the flow + When non-nill, all processes created within the flow get default names + in this namespace. + Specify [name: :auto] to get automatically generated flownames [:flow0, :flow1, ...] * `:demand` - configures the demand on the flow producers to `:forward` or `:accumulate`. The default is `:forward`. See `GenStage.demand/2` @@ -1274,6 +1330,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. * `:key` - the key to use when partitioning. It is a function @@ -1427,6 +1485,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1448,6 +1508,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1459,9 +1521,9 @@ defmodule Flow do end def merge([%Flow{} | _] = flows, dispatcher, options) when is_list(options) do - options = options |> stages() |> put_dispatcher(dispatcher) + options = options |> stages() |> partition_id(flows) |> put_dispatcher(dispatcher) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) - %Flow{producers: {:flows, flows}, options: options, window: window} + %Flow{producers: {:flows, index_flows(flows)}, options: options, window: window} end def merge(other, _dispatcher, options) when is_list(options) do @@ -1469,6 +1531,21 @@ defmodule Flow do "expected a flow or a non-empty list of flows as first argument, got: #{inspect(other)}" end + defp index_flows(flows) when length(flows) == 1, do: flows + defp index_flows(flows) do + Enum.with_index(flows) |> Enum.map(fn {flow, index} -> %Flow{flow | options: Keyword.put(flow.options, :index, index)} end) + end + + defp partition_id(options) do + options |> Keyword.put(:partition_id, 0) + end + defp partition_id(options,[%Flow{options: opts} | _]) do + case opts[:partition_id] do + nil -> options |> Keyword.put(:partition_id, 0) + id -> options |> Keyword.put(:partition_id, id + 1) + end + end + defp stages(options) do case Keyword.fetch(options, :stages) do {:ok, _} -> diff --git a/lib/flow/coordinator.ex b/lib/flow/coordinator.ex index 074fe54..d8086a1 100644 --- a/lib/flow/coordinator.ex +++ b/lib/flow/coordinator.ex @@ -3,10 +3,12 @@ defmodule Flow.Coordinator do use GenServer def start_link(flow, type, consumers, options) do + options = naming(options) GenServer.start_link(__MODULE__, {flow, type, consumers, options}, options) end def start(flow, type, consumers, options) do + options = naming(options) GenServer.start(__MODULE__, {flow, type, consumers, options}, options) end @@ -19,10 +21,11 @@ defmodule Flow.Coordinator do def init({flow, type, {inner_or_outer, consumers}, options}) do Process.flag(:trap_exit, true) type_options = Keyword.take(options, [:dispatcher]) - - {:ok, supervisor} = start_supervisor() + flowname = options[:name] + supervisor_name = flowname && String.to_atom("#{flowname}_sup") + {:ok, supervisor} = start_supervisor(supervisor_name) start_link = &start_child(supervisor, &1, restart: :temporary) - {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options) + {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options, flowname) demand = Keyword.get(options, :demand, :forward) timeout = Keyword.get(options, :subscribe_timeout, 5_000) @@ -55,14 +58,34 @@ defmodule Flow.Coordinator do {:ok, state} end + defp naming(options) do + case options[:name] do + nil -> options + :auto -> Keyword.put(options, :name, auto_flowname()) + _name -> options + end + + end + + defp auto_flowname(flow_index\\0) do + name = String.to_atom("flow#{flow_index}") + case Process.whereis(name) do + nil -> name + _pid -> auto_flowname(flow_index + 1) + end + end + # We have a supervisor for the whole flow. We always wait for an error # to propagate through the whole flow, and then we terminate. For this # to work all children are started as temporary, except the consumers # given via into_specs. Once those crash, they terminate the whole # flow according to their restart type. - defp start_supervisor do + defp start_supervisor(nil) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0) end + defp start_supervisor(name) do + Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0, name: name) + end defp start_child(supervisor, spec, opts) do spec = Supervisor.child_spec(spec, [id: make_ref()] ++ opts) diff --git a/lib/flow/materialize.ex b/lib/flow/materialize.ex index 1c6f1f8..239a16d 100644 --- a/lib/flow/materialize.ex +++ b/lib/flow/materialize.ex @@ -5,21 +5,21 @@ defmodule Flow.Materialize do @map_reducer_opts [:buffer_keep, :buffer_size, :dispatcher] @supervisor_opts [:shutdown] - def materialize(%Flow{producers: nil}, _, _, _) do + def materialize(%Flow{producers: nil}, _, _, _,_) do raise ArgumentError, "cannot execute a flow without producers, " <> "please call \"from_enumerable\", \"from_stages\" or \"from_specs\" accordingly" end - def materialize(%Flow{} = flow, start_link, type, type_options) do + def materialize(%Flow{} = flow, start_link, type, type_options, flowname) do %{operations: operations, options: options, producers: producers, window: window} = flow options = Keyword.merge(type_options, options) ops = split_operations(operations) {producers, consumers, ops, window} = - start_producers(producers, ops, start_link, window, options) + start_producers(producers, ops, start_link, window, options, flowname) - {producers, start_stages(ops, window, consumers, start_link, type, options)} + {producers, start_stages(ops, window, consumers, start_link, type, options, flowname)} end ## Helpers @@ -40,7 +40,7 @@ defmodule Flow.Materialize do end end - defp start_stages(:none, window, producers, _start_link, _type, _options) do + defp start_stages(:none, window, producers, _start_link, _type, _options, _flowname) do if window != Flow.Window.global() do raise ArgumentError, "a window was set but no computation is happening on this partition" end @@ -50,12 +50,18 @@ defmodule Flow.Materialize do end end - defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts) do + defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts, flowname) do {acc, reducer, trigger} = window_ops(window, compiled_ops, opts) - {stages, opts} = Keyword.pop(opts, :stages) + {name, opts} = Keyword.pop(opts, :name) + {partition_id, opts} = Keyword.pop(opts, :partition_id) + {index, opts} = Keyword.pop(opts, :index) {supervisor_opts, opts} = Keyword.split(opts, @supervisor_opts) {init_opts, subscribe_opts} = Keyword.split(opts, @map_reducer_opts) + if name && ! flowname do + raise ArgumentError, "a partition is named '#{name}', this requires flow naming, either specific or using :auto" + end + partition_name = name || "p#{partition_id}" init_opts = case type do @@ -69,19 +75,28 @@ defmodule Flow.Materialize do opts = Keyword.merge(subscribe_opts, producer_opts) {producer, [partition: i, cancel: :transient] ++ opts} end - arg = {type, [subscribe_to: subscriptions] ++ init_opts, {i, stages}, trigger, acc, reducer} - {:ok, pid} = start_link.(map_reducer_spec(arg, supervisor_opts)) + {:ok, pid} = start_link.(map_reducer_spec(arg, child_opts(flowname, index, partition_name, i), supervisor_opts)) {pid, [cancel: :transient]} end end - defp map_reducer_spec(arg, supervisor_opts) do + defp child_opts(nil, _index, _partition_name, _stage), do: [] + defp child_opts(_flowname, _index, nil, _stage), do: [] + defp child_opts(flowname, index, partition_name, stage) do + [ name: [flowname, index, partition_name, stage] + |> Enum.filter(&(&1)) + |> Enum.join("_") + |> String.to_atom + ] + end + + defp map_reducer_spec(arg, child_opts, supervisor_opts) do shutdown = Keyword.get(supervisor_opts, :shutdown, 5000) %{ id: Flow.MapReducer, - start: {GenStage, :start_link, [Flow.MapReducer, arg, []]}, + start: {GenStage, :start_link, [Flow.MapReducer, arg, child_opts]}, modules: [Flow.MapReducer], shutdown: shutdown } @@ -94,13 +109,14 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flowname ) do partitions = Keyword.fetch!(options, :stages) - {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link) + {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link, flowname) {right_producers, right_consumers} = - start_join(:right, right, right_key, partitions, start_link) + start_join(:right, right, right_key, partitions, start_link, flowname) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) @@ -121,9 +137,10 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flowname ) do - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) stages = Keyword.fetch!(flow.options, :stages) @@ -134,19 +151,19 @@ defmodule Flow.Materialize do window} end - defp start_producers({:flows, flows}, ops, start_link, window, options) do + defp start_producers({:flows, flows}, ops, start_link, window, options, flowname) do options = partition(options) {producers, consumers} = Enum.reduce(flows, {[], []}, fn flow, {producers_acc, consumers_acc} -> - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) {producers ++ producers_acc, consumers ++ consumers_acc} end) {producers, consumers, ensure_ops(ops), window} end - defp start_producers({:from_stages, producers}, ops, start_link, window, options) do + defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flowname) do producers = producers.(start_link) # If there are no ops and there is a need for a custom @@ -163,9 +180,10 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flowname ) do - {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options) + {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options, flowname) timeout = Keyword.get(options, :subscribe_timeout, 5_000) producers_consumers = producers_consumers.(start_link) @@ -182,7 +200,7 @@ defmodule Flow.Materialize do {producers, producers_consumers, ensure_ops(ops), window} end - defp start_producers({:enumerables, enumerables}, ops, start_link, window, options) do + defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flowname) do # If there are no ops, just start the enumerables with the options. # Otherwise it is a regular producer consumer with demand dispatcher. # In this case, options is used by subsequent mapper/reducer stages. @@ -341,14 +359,14 @@ defmodule Flow.Materialize do ## Joins - defp start_join(side, flow, key_fun, stages, start_link) do + defp start_join(side, flow, key_fun, stages, start_link, flowname) do hash = fn event -> key = key_fun.(event) {{key, event}, :erlang.phash2(key, stages)} end opts = [dispatcher: {GenStage.PartitionDispatcher, partitions: 0..(stages - 1), hash: hash}] - {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flowname) consumers = for {consumer, consumer_opts} <- consumers do diff --git a/test/flow_test.exs b/test/flow_test.exs index 8b0f12d..9876e5d 100644 --- a/test/flow_test.exs +++ b/test/flow_test.exs @@ -1575,4 +1575,91 @@ defmodule FlowTest do assert_receive {:consumed, [1]} end end + + + describe "stage naming" do + test "start_link/2 with :name", config do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil + end + + test "start_link/2 with name: :auto " do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: :auto) + flowname = "flow0" + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil + end + + test "start_link/2 with :name and naming a partition", config do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2, name: :output) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_output_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil + end + + test "start_link/2 with :name and naming all partitions", config do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1, name: :input) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2, name: :output) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_input_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_input_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_output_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil + end + end + end From 394f8e79f5dba1348dffc3f02cacbcbc3857da48 Mon Sep 17 00:00:00 2001 From: marc Date: Tue, 11 Sep 2018 21:49:36 +0200 Subject: [PATCH 2/4] Remove auto-generation of flow names (toplevel) apply proper code formatting --- lib/flow.ex | 31 +++++++-------- lib/flow/coordinator.ex | 28 +++----------- lib/flow/materialize.ex | 84 +++++++++++++++++++++++++++-------------- test/flow_test.exs | 28 ++------------ 4 files changed, 78 insertions(+), 93 deletions(-) diff --git a/lib/flow.ex b/lib/flow.ex index b218375..289dfb4 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -367,12 +367,6 @@ defmodule Flow do [:myflow_p0_0 ] # First partition stage (input stage) [:myflow_p1_0] [:myflow_p1_1] # Second partition stages - - In applications using multiple flows, it is also possible to get automatic - unique names for the flows themselves by specifying [name: :auto] - resulting in [:flow0, :flow1, ...] - - When using named flows, partition names can be explicitly specified using the :name option on statements that effectively create partition processes like `partition/2`, `from_stages/2` and `from_enumerables/2` @@ -607,7 +601,7 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in this partition, requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` @@ -675,7 +669,7 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in this partition, requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` @@ -1075,9 +1069,7 @@ defmodule Flow do ## Options * `:name` - the name of the flow - When non-nill, all processes created within the flow get default names - in this namespace. - Specify [name: :auto] to get automatically generated flownames [:flow0, :flow1, ...] + When non-nil, all processes created within the flow get default names * `:demand` - configures the demand on the flow producers to `:forward` or `:accumulate`. The default is `:forward`. See `GenStage.demand/2` @@ -1330,7 +1322,7 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in the partition, requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1485,7 +1477,7 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in the partition, requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1508,7 +1500,7 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in the partition, requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1532,17 +1524,22 @@ defmodule Flow do end defp index_flows(flows) when length(flows) == 1, do: flows + defp index_flows(flows) do - Enum.with_index(flows) |> Enum.map(fn {flow, index} -> %Flow{flow | options: Keyword.put(flow.options, :index, index)} end) + Enum.with_index(flows) + |> Enum.map(fn {flow, index} -> + %Flow{flow | options: Keyword.put(flow.options, :index, index)} + end) end defp partition_id(options) do options |> Keyword.put(:partition_id, 0) end - defp partition_id(options,[%Flow{options: opts} | _]) do + + defp partition_id(options, [%Flow{options: opts} | _]) do case opts[:partition_id] do nil -> options |> Keyword.put(:partition_id, 0) - id -> options |> Keyword.put(:partition_id, id + 1) + id -> options |> Keyword.put(:partition_id, id + 1) end end diff --git a/lib/flow/coordinator.ex b/lib/flow/coordinator.ex index d8086a1..a6b0271 100644 --- a/lib/flow/coordinator.ex +++ b/lib/flow/coordinator.ex @@ -3,12 +3,10 @@ defmodule Flow.Coordinator do use GenServer def start_link(flow, type, consumers, options) do - options = naming(options) GenServer.start_link(__MODULE__, {flow, type, consumers, options}, options) end def start(flow, type, consumers, options) do - options = naming(options) GenServer.start(__MODULE__, {flow, type, consumers, options}, options) end @@ -21,11 +19,13 @@ defmodule Flow.Coordinator do def init({flow, type, {inner_or_outer, consumers}, options}) do Process.flag(:trap_exit, true) type_options = Keyword.take(options, [:dispatcher]) - flowname = options[:name] - supervisor_name = flowname && String.to_atom("#{flowname}_sup") + flow_name = options[:name] + supervisor_name = flow_name && String.to_atom("#{flow_name}_sup") {:ok, supervisor} = start_supervisor(supervisor_name) start_link = &start_child(supervisor, &1, restart: :temporary) - {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options, flowname) + + {producers, intermediary} = + Flow.Materialize.materialize(flow, start_link, type, type_options, flow_name) demand = Keyword.get(options, :demand, :forward) timeout = Keyword.get(options, :subscribe_timeout, 5_000) @@ -58,23 +58,6 @@ defmodule Flow.Coordinator do {:ok, state} end - defp naming(options) do - case options[:name] do - nil -> options - :auto -> Keyword.put(options, :name, auto_flowname()) - _name -> options - end - - end - - defp auto_flowname(flow_index\\0) do - name = String.to_atom("flow#{flow_index}") - case Process.whereis(name) do - nil -> name - _pid -> auto_flowname(flow_index + 1) - end - end - # We have a supervisor for the whole flow. We always wait for an error # to propagate through the whole flow, and then we terminate. For this # to work all children are started as temporary, except the consumers @@ -83,6 +66,7 @@ defmodule Flow.Coordinator do defp start_supervisor(nil) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0) end + defp start_supervisor(name) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0, name: name) end diff --git a/lib/flow/materialize.ex b/lib/flow/materialize.ex index 239a16d..48d0759 100644 --- a/lib/flow/materialize.ex +++ b/lib/flow/materialize.ex @@ -5,21 +5,21 @@ defmodule Flow.Materialize do @map_reducer_opts [:buffer_keep, :buffer_size, :dispatcher] @supervisor_opts [:shutdown] - def materialize(%Flow{producers: nil}, _, _, _,_) do + def materialize(%Flow{producers: nil}, _, _, _, _) do raise ArgumentError, "cannot execute a flow without producers, " <> "please call \"from_enumerable\", \"from_stages\" or \"from_specs\" accordingly" end - def materialize(%Flow{} = flow, start_link, type, type_options, flowname) do + def materialize(%Flow{} = flow, start_link, type, type_options, flow_name) do %{operations: operations, options: options, producers: producers, window: window} = flow options = Keyword.merge(type_options, options) ops = split_operations(operations) {producers, consumers, ops, window} = - start_producers(producers, ops, start_link, window, options, flowname) + start_producers(producers, ops, start_link, window, options, flow_name) - {producers, start_stages(ops, window, consumers, start_link, type, options, flowname)} + {producers, start_stages(ops, window, consumers, start_link, type, options, flow_name)} end ## Helpers @@ -40,7 +40,7 @@ defmodule Flow.Materialize do end end - defp start_stages(:none, window, producers, _start_link, _type, _options, _flowname) do + defp start_stages(:none, window, producers, _start_link, _type, _options, _flow_name) do if window != Flow.Window.global() do raise ArgumentError, "a window was set but no computation is happening on this partition" end @@ -50,7 +50,15 @@ defmodule Flow.Materialize do end end - defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts, flowname) do + defp start_stages( + {_mr, compiled_ops, _ops}, + window, + producers, + start_link, + type, + opts, + flow_name + ) do {acc, reducer, trigger} = window_ops(window, compiled_ops, opts) {stages, opts} = Keyword.pop(opts, :stages) {name, opts} = Keyword.pop(opts, :name) @@ -58,9 +66,12 @@ defmodule Flow.Materialize do {index, opts} = Keyword.pop(opts, :index) {supervisor_opts, opts} = Keyword.split(opts, @supervisor_opts) {init_opts, subscribe_opts} = Keyword.split(opts, @map_reducer_opts) - if name && ! flowname do - raise ArgumentError, "a partition is named '#{name}', this requires flow naming, either specific or using :auto" + + if name && !flow_name do + raise ArgumentError, + "a partition is named '#{inspect(name)}', this requires the flow to be named" end + partition_name = name || "p#{partition_id}" init_opts = @@ -75,23 +86,32 @@ defmodule Flow.Materialize do opts = Keyword.merge(subscribe_opts, producer_opts) {producer, [partition: i, cancel: :transient] ++ opts} end + arg = {type, [subscribe_to: subscriptions] ++ init_opts, {i, stages}, trigger, acc, reducer} - {:ok, pid} = start_link.(map_reducer_spec(arg, child_opts(flowname, index, partition_name, i), supervisor_opts)) + + {:ok, pid} = + start_link.( + map_reducer_spec(arg, child_opts(flow_name, index, partition_name, i), supervisor_opts) + ) + {pid, [cancel: :transient]} end end defp child_opts(nil, _index, _partition_name, _stage), do: [] - defp child_opts(_flowname, _index, nil, _stage), do: [] - defp child_opts(flowname, index, partition_name, stage) do - [ name: [flowname, index, partition_name, stage] - |> Enum.filter(&(&1)) - |> Enum.join("_") - |> String.to_atom + defp child_opts(_flow_name, _index, nil, _stage), do: [] + + defp child_opts(flow_name, index, partition_name, stage) do + [ + name: + [flow_name, index, partition_name, stage] + |> Enum.filter(& &1) + |> Enum.join("_") + |> String.to_atom() ] end - defp map_reducer_spec(arg, child_opts, supervisor_opts) do + defp map_reducer_spec(arg, child_opts, supervisor_opts) do shutdown = Keyword.get(supervisor_opts, :shutdown, 5000) %{ @@ -110,13 +130,15 @@ defmodule Flow.Materialize do start_link, window, options, - flowname + flow_name ) do partitions = Keyword.fetch!(options, :stages) - {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link, flowname) + + {left_producers, left_consumers} = + start_join(:left, left, left_key, partitions, start_link, flow_name) {right_producers, right_consumers} = - start_join(:right, right, right_key, partitions, start_link, flowname) + start_join(:right, right, right_key, partitions, start_link, flow_name) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) @@ -138,9 +160,9 @@ defmodule Flow.Materialize do start_link, window, options, - flowname + flow_name ) do - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flow_name) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) stages = Keyword.fetch!(flow.options, :stages) @@ -151,19 +173,21 @@ defmodule Flow.Materialize do window} end - defp start_producers({:flows, flows}, ops, start_link, window, options, flowname) do + defp start_producers({:flows, flows}, ops, start_link, window, options, flow_name) do options = partition(options) {producers, consumers} = Enum.reduce(flows, {[], []}, fn flow, {producers_acc, consumers_acc} -> - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) + {producers, consumers} = + materialize(flow, start_link, :producer_consumer, options, flow_name) + {producers ++ producers_acc, consumers ++ consumers_acc} end) {producers, consumers, ensure_ops(ops), window} end - defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flowname) do + defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flow_name) do producers = producers.(start_link) # If there are no ops and there is a need for a custom @@ -181,9 +205,11 @@ defmodule Flow.Materialize do start_link, window, options, - flowname + flow_name ) do - {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options, flowname) + {producers, intermediary} = + materialize(flow, start_link, :producer_consumer, options, flow_name) + timeout = Keyword.get(options, :subscribe_timeout, 5_000) producers_consumers = producers_consumers.(start_link) @@ -200,7 +226,7 @@ defmodule Flow.Materialize do {producers, producers_consumers, ensure_ops(ops), window} end - defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flowname) do + defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flow_name) do # If there are no ops, just start the enumerables with the options. # Otherwise it is a regular producer consumer with demand dispatcher. # In this case, options is used by subsequent mapper/reducer stages. @@ -359,14 +385,14 @@ defmodule Flow.Materialize do ## Joins - defp start_join(side, flow, key_fun, stages, start_link, flowname) do + defp start_join(side, flow, key_fun, stages, start_link, flow_name) do hash = fn event -> key = key_fun.(event) {{key, event}, :erlang.phash2(key, stages)} end opts = [dispatcher: {GenStage.PartitionDispatcher, partitions: 0..(stages - 1), hash: hash}] - {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flowname) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flow_name) consumers = for {consumer, consumer_opts} <- consumers do diff --git a/test/flow_test.exs b/test/flow_test.exs index 9876e5d..2d7d1dc 100644 --- a/test/flow_test.exs +++ b/test/flow_test.exs @@ -1576,11 +1576,10 @@ defmodule FlowTest do end end - describe "stage naming" do test "start_link/2 with :name", config do {:ok, pid} = - Stream.cycle([1,2,3]) + Stream.cycle([1, 2, 3]) |> Flow.from_enumerable(stages: 1) |> Flow.map(&(&1 + 1)) |> Flow.partition(stages: 2) @@ -1599,29 +1598,9 @@ defmodule FlowTest do assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil end - test "start_link/2 with name: :auto " do - {:ok, pid} = - Stream.cycle([1,2,3]) - |> Flow.from_enumerable(stages: 1) - |> Flow.map(&(&1 + 1)) - |> Flow.partition(stages: 2) - |> Flow.each(fn _ -> Process.sleep(:infinity) end) - |> Flow.start_link(name: :auto) - flowname = "flow0" - assert Process.whereis(String.to_atom(flowname)) == pid - assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil - - assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil - assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil - - assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil - assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil - assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil - end - test "start_link/2 with :name and naming a partition", config do {:ok, pid} = - Stream.cycle([1,2,3]) + Stream.cycle([1, 2, 3]) |> Flow.from_enumerable(stages: 1) |> Flow.map(&(&1 + 1)) |> Flow.partition(stages: 2, name: :output) @@ -1642,7 +1621,7 @@ defmodule FlowTest do test "start_link/2 with :name and naming all partitions", config do {:ok, pid} = - Stream.cycle([1,2,3]) + Stream.cycle([1, 2, 3]) |> Flow.from_enumerable(stages: 1, name: :input) |> Flow.map(&(&1 + 1)) |> Flow.partition(stages: 2, name: :output) @@ -1661,5 +1640,4 @@ defmodule FlowTest do assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil end end - end From 34b469cb7b2438577d927a3173ee972c17cf0ba7 Mon Sep 17 00:00:00 2001 From: Marc Van Oevelen Date: Tue, 11 Sep 2018 19:51:13 +0200 Subject: [PATCH 3/4] Add naming to flow-generated processes Optional automatic generated unique names Optionally overruled using :name --- lib/flow.ex | 91 +++++++++++++++++++++++++++++++++++++---- lib/flow/coordinator.ex | 31 ++++++++++++-- lib/flow/materialize.ex | 66 +++++++++++++++++++----------- test/flow_test.exs | 87 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 35 deletions(-) diff --git a/lib/flow.ex b/lib/flow.ex index 7fabc65..7410a65 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -343,6 +343,55 @@ defmodule Flow do consumers child specification. Similar to `start_link/1`, they return either `{:ok, pid}` or `{:error, reason}`. + ## Naming Flows, Partitions and Stages + + By default all process stages generated by `Flow` are launched as a list + of children under a flow supervisor process. + Those processes have no name registrations and can only be identified by PID. + + By using the :name option to `start_link/2` all generated GenStage processes + automatically get unique name registration. + e.g. + + Flow.from_stages([:producer], stages: 1) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.partition(stages: 2) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) + |> Flow.start_link(name: :myflow) + + Results in named flow processes : + [:myflow] # flow top process + [:myflow_sup ] # flow supervisor + [:myflow_p0_0 ] # First partition stage (input stage) + [:myflow_p1_0] [:myflow_p1_1] # Second partition stages + + + In applications using multiple flows, it is also possible to get automatic + unique names for the flows themselves by specifying [name: :auto] + resulting in [:flow0, :flow1, ...] + + + When using named flows, partition names can be explicitly specified using + the :name option on statements that effectively create partition processes + like `partition/2`, `from_stages/2` and `from_enumerables/2` + e.g. + + Flow.from_stages([:producer], stages: 1, name: :input) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.partition(stages: 2, name: :output) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) + |> Flow.start_link(name: :myflow) + + Results in named flow processes : + [:myflow] # flow top process + [:myflow_sup ] # flow supervisor + [:myflow_input_0 ] # First partition stage (input stage) + [:myflow_output_0] [:myflow_output_1] # Second partition stages + ## Performance discussions In this section we will discuss points related to performance @@ -558,6 +607,8 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` * `:shutdown` - the shutdown time for this stage when the flow is shut down. @@ -577,7 +628,7 @@ defmodule Flow do def from_enumerables(enumerables, options \\ []) def from_enumerables([_ | _] = enumerables, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{producers: {:enumerables, enumerables}, options: options, window: window} end @@ -600,6 +651,8 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` * `:shutdown` - the shutdown time for this stage when the flow is shut down. @@ -639,7 +692,7 @@ defmodule Flow do def from_stages(producers, options \\ []) def from_stages([_ | _] = producers, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) producers = Enum.map(producers, &{&1, []}) @@ -682,7 +735,7 @@ defmodule Flow do def from_specs(producers, options \\ []) def from_specs([_ | _] = producers, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) fun = fn start_link -> @@ -775,7 +828,7 @@ defmodule Flow do def through_stages(flow, producer_consumers, options \\ []) def through_stages(%Flow{} = flow, [_ | _] = producer_consumers, options) do - options = stages(options) + options = stages(options) |> partition_id([flow]) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{ @@ -824,7 +877,7 @@ defmodule Flow do def through_specs(flow, producer_consumers, options \\ []) def through_specs(%Flow{} = flow, [_ | _] = producer_consumers, options) do - options = stages(options) + options = stages(options) |> partition_id([flow]) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{ @@ -998,6 +1051,9 @@ defmodule Flow do ## Options * `:name` - the name of the flow + When non-nill, all processes created within the flow get default names + in this namespace. + Specify [name: :auto] to get automatically generated flownames [:flow0, :flow1, ...] * `:demand` - configures the demand on the flow producers to `:forward` or `:accumulate`. The default is `:forward`. See `GenStage.demand/2` @@ -1250,6 +1306,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. * `:key` - the key to use when partitioning. It is a function @@ -1403,6 +1461,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1424,6 +1484,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the namespace of the flow, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1435,9 +1497,9 @@ defmodule Flow do end def merge([%Flow{} | _] = flows, dispatcher, options) when is_list(options) do - options = options |> stages() |> put_dispatcher(dispatcher) + options = options |> stages() |> partition_id(flows) |> put_dispatcher(dispatcher) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) - %Flow{producers: {:flows, flows}, options: options, window: window} + %Flow{producers: {:flows, index_flows(flows)}, options: options, window: window} end def merge(other, _dispatcher, options) when is_list(options) do @@ -1445,6 +1507,21 @@ defmodule Flow do "expected a flow or a non-empty list of flows as first argument, got: #{inspect(other)}" end + defp index_flows(flows) when length(flows) == 1, do: flows + defp index_flows(flows) do + Enum.with_index(flows) |> Enum.map(fn {flow, index} -> %Flow{flow | options: Keyword.put(flow.options, :index, index)} end) + end + + defp partition_id(options) do + options |> Keyword.put(:partition_id, 0) + end + defp partition_id(options,[%Flow{options: opts} | _]) do + case opts[:partition_id] do + nil -> options |> Keyword.put(:partition_id, 0) + id -> options |> Keyword.put(:partition_id, id + 1) + end + end + defp stages(options) do case Keyword.fetch(options, :stages) do {:ok, _} -> diff --git a/lib/flow/coordinator.ex b/lib/flow/coordinator.ex index 074fe54..d8086a1 100644 --- a/lib/flow/coordinator.ex +++ b/lib/flow/coordinator.ex @@ -3,10 +3,12 @@ defmodule Flow.Coordinator do use GenServer def start_link(flow, type, consumers, options) do + options = naming(options) GenServer.start_link(__MODULE__, {flow, type, consumers, options}, options) end def start(flow, type, consumers, options) do + options = naming(options) GenServer.start(__MODULE__, {flow, type, consumers, options}, options) end @@ -19,10 +21,11 @@ defmodule Flow.Coordinator do def init({flow, type, {inner_or_outer, consumers}, options}) do Process.flag(:trap_exit, true) type_options = Keyword.take(options, [:dispatcher]) - - {:ok, supervisor} = start_supervisor() + flowname = options[:name] + supervisor_name = flowname && String.to_atom("#{flowname}_sup") + {:ok, supervisor} = start_supervisor(supervisor_name) start_link = &start_child(supervisor, &1, restart: :temporary) - {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options) + {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options, flowname) demand = Keyword.get(options, :demand, :forward) timeout = Keyword.get(options, :subscribe_timeout, 5_000) @@ -55,14 +58,34 @@ defmodule Flow.Coordinator do {:ok, state} end + defp naming(options) do + case options[:name] do + nil -> options + :auto -> Keyword.put(options, :name, auto_flowname()) + _name -> options + end + + end + + defp auto_flowname(flow_index\\0) do + name = String.to_atom("flow#{flow_index}") + case Process.whereis(name) do + nil -> name + _pid -> auto_flowname(flow_index + 1) + end + end + # We have a supervisor for the whole flow. We always wait for an error # to propagate through the whole flow, and then we terminate. For this # to work all children are started as temporary, except the consumers # given via into_specs. Once those crash, they terminate the whole # flow according to their restart type. - defp start_supervisor do + defp start_supervisor(nil) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0) end + defp start_supervisor(name) do + Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0, name: name) + end defp start_child(supervisor, spec, opts) do spec = Supervisor.child_spec(spec, [id: make_ref()] ++ opts) diff --git a/lib/flow/materialize.ex b/lib/flow/materialize.ex index 9ee15af..91a1d91 100644 --- a/lib/flow/materialize.ex +++ b/lib/flow/materialize.ex @@ -5,21 +5,21 @@ defmodule Flow.Materialize do @map_reducer_opts [:buffer_keep, :buffer_size, :dispatcher] @supervisor_opts [:shutdown] - def materialize(%Flow{producers: nil}, _, _, _) do + def materialize(%Flow{producers: nil}, _, _, _,_) do raise ArgumentError, "cannot execute a flow without producers, " <> "please call \"from_enumerable\", \"from_stages\" or \"from_specs\" accordingly" end - def materialize(%Flow{} = flow, start_link, type, type_options) do + def materialize(%Flow{} = flow, start_link, type, type_options, flowname) do %{operations: operations, options: options, producers: producers, window: window} = flow options = Keyword.merge(type_options, options) ops = split_operations(operations) {producers, consumers, ops, window} = - start_producers(producers, ops, start_link, window, options) + start_producers(producers, ops, start_link, window, options, flowname) - {producers, start_stages(ops, window, consumers, start_link, type, options)} + {producers, start_stages(ops, window, consumers, start_link, type, options, flowname)} end ## Helpers @@ -40,7 +40,7 @@ defmodule Flow.Materialize do end end - defp start_stages(:none, window, producers, _start_link, _type, _options) do + defp start_stages(:none, window, producers, _start_link, _type, _options, _flowname) do if window != Flow.Window.global() do raise ArgumentError, "a window was set but no computation is happening on this partition" end @@ -50,12 +50,18 @@ defmodule Flow.Materialize do end end - defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts) do + defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts, flowname) do {acc, reducer, trigger} = window_ops(window, compiled_ops, opts) - {stages, opts} = Keyword.pop(opts, :stages) + {name, opts} = Keyword.pop(opts, :name) + {partition_id, opts} = Keyword.pop(opts, :partition_id) + {index, opts} = Keyword.pop(opts, :index) {supervisor_opts, opts} = Keyword.split(opts, @supervisor_opts) {init_opts, subscribe_opts} = Keyword.split(opts, @map_reducer_opts) + if name && ! flowname do + raise ArgumentError, "a partition is named '#{name}', this requires flow naming, either specific or using :auto" + end + partition_name = name || "p#{partition_id}" init_opts = case type do @@ -69,19 +75,28 @@ defmodule Flow.Materialize do opts = Keyword.merge(subscribe_opts, producer_opts) {producer, [partition: i, cancel: :transient] ++ opts} end - arg = {type, [subscribe_to: subscriptions] ++ init_opts, {i, stages}, trigger, acc, reducer} - {:ok, pid} = start_link.(map_reducer_spec(arg, supervisor_opts)) + {:ok, pid} = start_link.(map_reducer_spec(arg, child_opts(flowname, index, partition_name, i), supervisor_opts)) {pid, [cancel: :transient]} end end - defp map_reducer_spec(arg, supervisor_opts) do + defp child_opts(nil, _index, _partition_name, _stage), do: [] + defp child_opts(_flowname, _index, nil, _stage), do: [] + defp child_opts(flowname, index, partition_name, stage) do + [ name: [flowname, index, partition_name, stage] + |> Enum.filter(&(&1)) + |> Enum.join("_") + |> String.to_atom + ] + end + + defp map_reducer_spec(arg, child_opts, supervisor_opts) do shutdown = Keyword.get(supervisor_opts, :shutdown, 5000) %{ id: Flow.MapReducer, - start: {GenStage, :start_link, [Flow.MapReducer, arg, []]}, + start: {GenStage, :start_link, [Flow.MapReducer, arg, child_opts]}, modules: [Flow.MapReducer], shutdown: shutdown } @@ -94,13 +109,14 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flowname ) do partitions = Keyword.fetch!(options, :stages) - {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link) + {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link, flowname) {right_producers, right_consumers} = - start_join(:right, right, right_key, partitions, start_link) + start_join(:right, right, right_key, partitions, start_link, flowname) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) @@ -121,9 +137,10 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flowname ) do - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) stages = Keyword.fetch!(flow.options, :stages) @@ -134,19 +151,19 @@ defmodule Flow.Materialize do window} end - defp start_producers({:flows, flows}, ops, start_link, window, options) do + defp start_producers({:flows, flows}, ops, start_link, window, options, flowname) do options = partition(options) {producers, consumers} = Enum.reduce(flows, {[], []}, fn flow, {producers_acc, consumers_acc} -> - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) {producers ++ producers_acc, consumers ++ consumers_acc} end) {producers, consumers, ensure_ops(ops), window} end - defp start_producers({:from_stages, producers}, ops, start_link, window, options) do + defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flowname) do producers = producers.(start_link) # If there are no ops and there is a need for a custom @@ -163,9 +180,10 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flowname ) do - {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options) + {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options, flowname) timeout = Keyword.get(options, :subscribe_timeout, 5_000) producers_consumers = producers_consumers.(start_link) @@ -182,7 +200,7 @@ defmodule Flow.Materialize do {producers, producers_consumers, ensure_ops(ops), window} end - defp start_producers({:enumerables, enumerables}, ops, start_link, window, options) do + defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flowname) do # If there are no ops, just start the enumerables with the options. # Otherwise it is a regular producer consumer with demand dispatcher. # In this case, options is used by subsequent mapper/reducer stages. @@ -341,14 +359,14 @@ defmodule Flow.Materialize do ## Joins - defp start_join(side, flow, key_fun, stages, start_link) do + defp start_join(side, flow, key_fun, stages, start_link, flowname) do hash = fn event -> key = key_fun.(event) {{key, event}, :erlang.phash2(key, stages)} end opts = [dispatcher: {GenStage.PartitionDispatcher, partitions: 0..(stages - 1), hash: hash}] - {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flowname) consumers = for {consumer, consumer_opts} <- consumers do diff --git a/test/flow_test.exs b/test/flow_test.exs index 1c62559..f939682 100644 --- a/test/flow_test.exs +++ b/test/flow_test.exs @@ -1597,4 +1597,91 @@ defmodule FlowTest do assert_receive {:consumed, [1]} end end + + + describe "stage naming" do + test "start_link/2 with :name", config do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil + end + + test "start_link/2 with name: :auto " do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: :auto) + flowname = "flow0" + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil + end + + test "start_link/2 with :name and naming a partition", config do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2, name: :output) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_output_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil + end + + test "start_link/2 with :name and naming all partitions", config do + {:ok, pid} = + Stream.cycle([1,2,3]) + |> Flow.from_enumerable(stages: 1, name: :input) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2, name: :output) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_input_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_input_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_output_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil + end + end + end From 39b18239c77991c51883a592e697923246344a34 Mon Sep 17 00:00:00 2001 From: marc Date: Tue, 11 Sep 2018 21:49:36 +0200 Subject: [PATCH 4/4] Remove auto-generation of flow names (toplevel) apply proper code formatting --- lib/flow.ex | 31 +++++++-------- lib/flow/coordinator.ex | 28 +++----------- lib/flow/materialize.ex | 84 +++++++++++++++++++++++++++-------------- test/flow_test.exs | 28 ++------------ 4 files changed, 78 insertions(+), 93 deletions(-) diff --git a/lib/flow.ex b/lib/flow.ex index 7410a65..73b3f5e 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -367,12 +367,6 @@ defmodule Flow do [:myflow_p0_0 ] # First partition stage (input stage) [:myflow_p1_0] [:myflow_p1_1] # Second partition stages - - In applications using multiple flows, it is also possible to get automatic - unique names for the flows themselves by specifying [name: :auto] - resulting in [:flow0, :flow1, ...] - - When using named flows, partition names can be explicitly specified using the :name option on statements that effectively create partition processes like `partition/2`, `from_stages/2` and `from_enumerables/2` @@ -607,7 +601,7 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in this partition, requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` @@ -651,7 +645,7 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in this partition, requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` @@ -1051,9 +1045,7 @@ defmodule Flow do ## Options * `:name` - the name of the flow - When non-nill, all processes created within the flow get default names - in this namespace. - Specify [name: :auto] to get automatically generated flownames [:flow0, :flow1, ...] + When non-nil, all processes created within the flow get default names * `:demand` - configures the demand on the flow producers to `:forward` or `:accumulate`. The default is `:forward`. See `GenStage.demand/2` @@ -1306,7 +1298,7 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in the partition, requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1461,7 +1453,7 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in the partition, requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1484,7 +1476,7 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) - * `:name` - the name for the stages in the namespace of the flow, + * `:name` - the name for the stages in the partition, requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1508,17 +1500,22 @@ defmodule Flow do end defp index_flows(flows) when length(flows) == 1, do: flows + defp index_flows(flows) do - Enum.with_index(flows) |> Enum.map(fn {flow, index} -> %Flow{flow | options: Keyword.put(flow.options, :index, index)} end) + Enum.with_index(flows) + |> Enum.map(fn {flow, index} -> + %Flow{flow | options: Keyword.put(flow.options, :index, index)} + end) end defp partition_id(options) do options |> Keyword.put(:partition_id, 0) end - defp partition_id(options,[%Flow{options: opts} | _]) do + + defp partition_id(options, [%Flow{options: opts} | _]) do case opts[:partition_id] do nil -> options |> Keyword.put(:partition_id, 0) - id -> options |> Keyword.put(:partition_id, id + 1) + id -> options |> Keyword.put(:partition_id, id + 1) end end diff --git a/lib/flow/coordinator.ex b/lib/flow/coordinator.ex index d8086a1..a6b0271 100644 --- a/lib/flow/coordinator.ex +++ b/lib/flow/coordinator.ex @@ -3,12 +3,10 @@ defmodule Flow.Coordinator do use GenServer def start_link(flow, type, consumers, options) do - options = naming(options) GenServer.start_link(__MODULE__, {flow, type, consumers, options}, options) end def start(flow, type, consumers, options) do - options = naming(options) GenServer.start(__MODULE__, {flow, type, consumers, options}, options) end @@ -21,11 +19,13 @@ defmodule Flow.Coordinator do def init({flow, type, {inner_or_outer, consumers}, options}) do Process.flag(:trap_exit, true) type_options = Keyword.take(options, [:dispatcher]) - flowname = options[:name] - supervisor_name = flowname && String.to_atom("#{flowname}_sup") + flow_name = options[:name] + supervisor_name = flow_name && String.to_atom("#{flow_name}_sup") {:ok, supervisor} = start_supervisor(supervisor_name) start_link = &start_child(supervisor, &1, restart: :temporary) - {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options, flowname) + + {producers, intermediary} = + Flow.Materialize.materialize(flow, start_link, type, type_options, flow_name) demand = Keyword.get(options, :demand, :forward) timeout = Keyword.get(options, :subscribe_timeout, 5_000) @@ -58,23 +58,6 @@ defmodule Flow.Coordinator do {:ok, state} end - defp naming(options) do - case options[:name] do - nil -> options - :auto -> Keyword.put(options, :name, auto_flowname()) - _name -> options - end - - end - - defp auto_flowname(flow_index\\0) do - name = String.to_atom("flow#{flow_index}") - case Process.whereis(name) do - nil -> name - _pid -> auto_flowname(flow_index + 1) - end - end - # We have a supervisor for the whole flow. We always wait for an error # to propagate through the whole flow, and then we terminate. For this # to work all children are started as temporary, except the consumers @@ -83,6 +66,7 @@ defmodule Flow.Coordinator do defp start_supervisor(nil) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0) end + defp start_supervisor(name) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0, name: name) end diff --git a/lib/flow/materialize.ex b/lib/flow/materialize.ex index 91a1d91..7b11cb4 100644 --- a/lib/flow/materialize.ex +++ b/lib/flow/materialize.ex @@ -5,21 +5,21 @@ defmodule Flow.Materialize do @map_reducer_opts [:buffer_keep, :buffer_size, :dispatcher] @supervisor_opts [:shutdown] - def materialize(%Flow{producers: nil}, _, _, _,_) do + def materialize(%Flow{producers: nil}, _, _, _, _) do raise ArgumentError, "cannot execute a flow without producers, " <> "please call \"from_enumerable\", \"from_stages\" or \"from_specs\" accordingly" end - def materialize(%Flow{} = flow, start_link, type, type_options, flowname) do + def materialize(%Flow{} = flow, start_link, type, type_options, flow_name) do %{operations: operations, options: options, producers: producers, window: window} = flow options = Keyword.merge(type_options, options) ops = split_operations(operations) {producers, consumers, ops, window} = - start_producers(producers, ops, start_link, window, options, flowname) + start_producers(producers, ops, start_link, window, options, flow_name) - {producers, start_stages(ops, window, consumers, start_link, type, options, flowname)} + {producers, start_stages(ops, window, consumers, start_link, type, options, flow_name)} end ## Helpers @@ -40,7 +40,7 @@ defmodule Flow.Materialize do end end - defp start_stages(:none, window, producers, _start_link, _type, _options, _flowname) do + defp start_stages(:none, window, producers, _start_link, _type, _options, _flow_name) do if window != Flow.Window.global() do raise ArgumentError, "a window was set but no computation is happening on this partition" end @@ -50,7 +50,15 @@ defmodule Flow.Materialize do end end - defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts, flowname) do + defp start_stages( + {_mr, compiled_ops, _ops}, + window, + producers, + start_link, + type, + opts, + flow_name + ) do {acc, reducer, trigger} = window_ops(window, compiled_ops, opts) {stages, opts} = Keyword.pop(opts, :stages) {name, opts} = Keyword.pop(opts, :name) @@ -58,9 +66,12 @@ defmodule Flow.Materialize do {index, opts} = Keyword.pop(opts, :index) {supervisor_opts, opts} = Keyword.split(opts, @supervisor_opts) {init_opts, subscribe_opts} = Keyword.split(opts, @map_reducer_opts) - if name && ! flowname do - raise ArgumentError, "a partition is named '#{name}', this requires flow naming, either specific or using :auto" + + if name && !flow_name do + raise ArgumentError, + "a partition is named '#{inspect(name)}', this requires the flow to be named" end + partition_name = name || "p#{partition_id}" init_opts = @@ -75,23 +86,32 @@ defmodule Flow.Materialize do opts = Keyword.merge(subscribe_opts, producer_opts) {producer, [partition: i, cancel: :transient] ++ opts} end + arg = {type, [subscribe_to: subscriptions] ++ init_opts, {i, stages}, trigger, acc, reducer} - {:ok, pid} = start_link.(map_reducer_spec(arg, child_opts(flowname, index, partition_name, i), supervisor_opts)) + + {:ok, pid} = + start_link.( + map_reducer_spec(arg, child_opts(flow_name, index, partition_name, i), supervisor_opts) + ) + {pid, [cancel: :transient]} end end defp child_opts(nil, _index, _partition_name, _stage), do: [] - defp child_opts(_flowname, _index, nil, _stage), do: [] - defp child_opts(flowname, index, partition_name, stage) do - [ name: [flowname, index, partition_name, stage] - |> Enum.filter(&(&1)) - |> Enum.join("_") - |> String.to_atom + defp child_opts(_flow_name, _index, nil, _stage), do: [] + + defp child_opts(flow_name, index, partition_name, stage) do + [ + name: + [flow_name, index, partition_name, stage] + |> Enum.filter(& &1) + |> Enum.join("_") + |> String.to_atom() ] end - defp map_reducer_spec(arg, child_opts, supervisor_opts) do + defp map_reducer_spec(arg, child_opts, supervisor_opts) do shutdown = Keyword.get(supervisor_opts, :shutdown, 5000) %{ @@ -110,13 +130,15 @@ defmodule Flow.Materialize do start_link, window, options, - flowname + flow_name ) do partitions = Keyword.fetch!(options, :stages) - {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link, flowname) + + {left_producers, left_consumers} = + start_join(:left, left, left_key, partitions, start_link, flow_name) {right_producers, right_consumers} = - start_join(:right, right, right_key, partitions, start_link, flowname) + start_join(:right, right, right_key, partitions, start_link, flow_name) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) @@ -138,9 +160,9 @@ defmodule Flow.Materialize do start_link, window, options, - flowname + flow_name ) do - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flow_name) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) stages = Keyword.fetch!(flow.options, :stages) @@ -151,19 +173,21 @@ defmodule Flow.Materialize do window} end - defp start_producers({:flows, flows}, ops, start_link, window, options, flowname) do + defp start_producers({:flows, flows}, ops, start_link, window, options, flow_name) do options = partition(options) {producers, consumers} = Enum.reduce(flows, {[], []}, fn flow, {producers_acc, consumers_acc} -> - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flowname) + {producers, consumers} = + materialize(flow, start_link, :producer_consumer, options, flow_name) + {producers ++ producers_acc, consumers ++ consumers_acc} end) {producers, consumers, ensure_ops(ops), window} end - defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flowname) do + defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flow_name) do producers = producers.(start_link) # If there are no ops and there is a need for a custom @@ -181,9 +205,11 @@ defmodule Flow.Materialize do start_link, window, options, - flowname + flow_name ) do - {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options, flowname) + {producers, intermediary} = + materialize(flow, start_link, :producer_consumer, options, flow_name) + timeout = Keyword.get(options, :subscribe_timeout, 5_000) producers_consumers = producers_consumers.(start_link) @@ -200,7 +226,7 @@ defmodule Flow.Materialize do {producers, producers_consumers, ensure_ops(ops), window} end - defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flowname) do + defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flow_name) do # If there are no ops, just start the enumerables with the options. # Otherwise it is a regular producer consumer with demand dispatcher. # In this case, options is used by subsequent mapper/reducer stages. @@ -359,14 +385,14 @@ defmodule Flow.Materialize do ## Joins - defp start_join(side, flow, key_fun, stages, start_link, flowname) do + defp start_join(side, flow, key_fun, stages, start_link, flow_name) do hash = fn event -> key = key_fun.(event) {{key, event}, :erlang.phash2(key, stages)} end opts = [dispatcher: {GenStage.PartitionDispatcher, partitions: 0..(stages - 1), hash: hash}] - {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flowname) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flow_name) consumers = for {consumer, consumer_opts} <- consumers do diff --git a/test/flow_test.exs b/test/flow_test.exs index f939682..d54f123 100644 --- a/test/flow_test.exs +++ b/test/flow_test.exs @@ -1598,11 +1598,10 @@ defmodule FlowTest do end end - describe "stage naming" do test "start_link/2 with :name", config do {:ok, pid} = - Stream.cycle([1,2,3]) + Stream.cycle([1, 2, 3]) |> Flow.from_enumerable(stages: 1) |> Flow.map(&(&1 + 1)) |> Flow.partition(stages: 2) @@ -1621,29 +1620,9 @@ defmodule FlowTest do assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil end - test "start_link/2 with name: :auto " do - {:ok, pid} = - Stream.cycle([1,2,3]) - |> Flow.from_enumerable(stages: 1) - |> Flow.map(&(&1 + 1)) - |> Flow.partition(stages: 2) - |> Flow.each(fn _ -> Process.sleep(:infinity) end) - |> Flow.start_link(name: :auto) - flowname = "flow0" - assert Process.whereis(String.to_atom(flowname)) == pid - assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil - - assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil - assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil - - assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil - assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil - assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil - end - test "start_link/2 with :name and naming a partition", config do {:ok, pid} = - Stream.cycle([1,2,3]) + Stream.cycle([1, 2, 3]) |> Flow.from_enumerable(stages: 1) |> Flow.map(&(&1 + 1)) |> Flow.partition(stages: 2, name: :output) @@ -1664,7 +1643,7 @@ defmodule FlowTest do test "start_link/2 with :name and naming all partitions", config do {:ok, pid} = - Stream.cycle([1,2,3]) + Stream.cycle([1, 2, 3]) |> Flow.from_enumerable(stages: 1, name: :input) |> Flow.map(&(&1 + 1)) |> Flow.partition(stages: 2, name: :output) @@ -1683,5 +1662,4 @@ defmodule FlowTest do assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil end end - end