diff --git a/lib/flow.ex b/lib/flow.ex index 7fabc65..73b3f5e 100644 --- a/lib/flow.ex +++ b/lib/flow.ex @@ -343,6 +343,49 @@ defmodule Flow do consumers child specification. Similar to `start_link/1`, they return either `{:ok, pid}` or `{:error, reason}`. + ## Naming Flows, Partitions and Stages + + By default all process stages generated by `Flow` are launched as a list + of children under a flow supervisor process. + Those processes have no name registrations and can only be identified by PID. + + By using the :name option to `start_link/2` all generated GenStage processes + automatically get unique name registration. + e.g. + + Flow.from_stages([:producer], stages: 1) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.partition(stages: 2) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) + |> Flow.start_link(name: :myflow) + + Results in named flow processes : + [:myflow] # flow top process + [:myflow_sup ] # flow supervisor + [:myflow_p0_0 ] # First partition stage (input stage) + [:myflow_p1_0] [:myflow_p1_1] # Second partition stages + + When using named flows, partition names can be explicitly specified using + the :name option on statements that effectively create partition processes + like `partition/2`, `from_stages/2` and `from_enumerables/2` + e.g. + + Flow.from_stages([:producer], stages: 1, name: :input) + |> Flow.flat_map(&String.split(&1, " ")) + |> Flow.partition(stages: 2, name: :output) + |> Flow.reduce(fn -> %{} end, fn word, acc -> + Map.update(acc, word, 1, & &1 + 1) + end) + |> Flow.start_link(name: :myflow) + + Results in named flow processes : + [:myflow] # flow top process + [:myflow_sup ] # flow supervisor + [:myflow_input_0 ] # First partition stage (input stage) + [:myflow_output_0] [:myflow_output_1] # Second partition stages + ## Performance discussions In this section we will discuss points related to performance @@ -558,6 +601,8 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages + * `:name` - the name for the stages in this partition, + requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` * `:shutdown` - the shutdown time for this stage when the flow is shut down. @@ -577,7 +622,7 @@ defmodule Flow do def from_enumerables(enumerables, options \\ []) def from_enumerables([_ | _] = enumerables, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{producers: {:enumerables, enumerables}, options: options, window: window} end @@ -600,6 +645,8 @@ defmodule Flow do * `:window` - a window to run the next stages in, see `Flow.Window` * `:stages` - the number of stages + * `:name` - the name for the stages in this partition, + requires the flow to be 'named' * `:buffer_keep` - how the buffer should behave, see `c:GenStage.init/1` * `:buffer_size` - how many events to buffer, see `c:GenStage.init/1` * `:shutdown` - the shutdown time for this stage when the flow is shut down. @@ -639,7 +686,7 @@ defmodule Flow do def from_stages(producers, options \\ []) def from_stages([_ | _] = producers, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) producers = Enum.map(producers, &{&1, []}) @@ -682,7 +729,7 @@ defmodule Flow do def from_specs(producers, options \\ []) def from_specs([_ | _] = producers, options) do - options = stages(options) + options = stages(options) |> partition_id() {window, options} = Keyword.pop(options, :window, Flow.Window.global()) fun = fn start_link -> @@ -775,7 +822,7 @@ defmodule Flow do def through_stages(flow, producer_consumers, options \\ []) def through_stages(%Flow{} = flow, [_ | _] = producer_consumers, options) do - options = stages(options) + options = stages(options) |> partition_id([flow]) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{ @@ -824,7 +871,7 @@ defmodule Flow do def through_specs(flow, producer_consumers, options \\ []) def through_specs(%Flow{} = flow, [_ | _] = producer_consumers, options) do - options = stages(options) + options = stages(options) |> partition_id([flow]) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) %Flow{ @@ -998,6 +1045,7 @@ defmodule Flow do ## Options * `:name` - the name of the flow + When non-nil, all processes created within the flow get default names * `:demand` - configures the demand on the flow producers to `:forward` or `:accumulate`. The default is `:forward`. See `GenStage.demand/2` @@ -1250,6 +1298,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the partition, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. * `:key` - the key to use when partitioning. It is a function @@ -1403,6 +1453,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the partition, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1424,6 +1476,8 @@ defmodule Flow do * `:window` - a `Flow.Window` struct which controls how the reducing function behaves, see `Flow.Window` for more information. * `:stages` - the number of partitions (reducer stages) + * `:name` - the name for the stages in the partition, + requires the flow to be 'named' * `:shutdown` - the shutdown time for this stage when the flow is shut down. The same as the `:shutdown` value in a Supervisor, defaults to 5000 milliseconds. @@ -1435,9 +1489,9 @@ defmodule Flow do end def merge([%Flow{} | _] = flows, dispatcher, options) when is_list(options) do - options = options |> stages() |> put_dispatcher(dispatcher) + options = options |> stages() |> partition_id(flows) |> put_dispatcher(dispatcher) {window, options} = Keyword.pop(options, :window, Flow.Window.global()) - %Flow{producers: {:flows, flows}, options: options, window: window} + %Flow{producers: {:flows, index_flows(flows)}, options: options, window: window} end def merge(other, _dispatcher, options) when is_list(options) do @@ -1445,6 +1499,26 @@ defmodule Flow do "expected a flow or a non-empty list of flows as first argument, got: #{inspect(other)}" end + defp index_flows(flows) when length(flows) == 1, do: flows + + defp index_flows(flows) do + Enum.with_index(flows) + |> Enum.map(fn {flow, index} -> + %Flow{flow | options: Keyword.put(flow.options, :index, index)} + end) + end + + defp partition_id(options) do + options |> Keyword.put(:partition_id, 0) + end + + defp partition_id(options, [%Flow{options: opts} | _]) do + case opts[:partition_id] do + nil -> options |> Keyword.put(:partition_id, 0) + id -> options |> Keyword.put(:partition_id, id + 1) + end + end + defp stages(options) do case Keyword.fetch(options, :stages) do {:ok, _} -> diff --git a/lib/flow/coordinator.ex b/lib/flow/coordinator.ex index 074fe54..a6b0271 100644 --- a/lib/flow/coordinator.ex +++ b/lib/flow/coordinator.ex @@ -19,10 +19,13 @@ defmodule Flow.Coordinator do def init({flow, type, {inner_or_outer, consumers}, options}) do Process.flag(:trap_exit, true) type_options = Keyword.take(options, [:dispatcher]) - - {:ok, supervisor} = start_supervisor() + flow_name = options[:name] + supervisor_name = flow_name && String.to_atom("#{flow_name}_sup") + {:ok, supervisor} = start_supervisor(supervisor_name) start_link = &start_child(supervisor, &1, restart: :temporary) - {producers, intermediary} = Flow.Materialize.materialize(flow, start_link, type, type_options) + + {producers, intermediary} = + Flow.Materialize.materialize(flow, start_link, type, type_options, flow_name) demand = Keyword.get(options, :demand, :forward) timeout = Keyword.get(options, :subscribe_timeout, 5_000) @@ -60,10 +63,14 @@ defmodule Flow.Coordinator do # to work all children are started as temporary, except the consumers # given via into_specs. Once those crash, they terminate the whole # flow according to their restart type. - defp start_supervisor do + defp start_supervisor(nil) do Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0) end + defp start_supervisor(name) do + Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0, name: name) + end + defp start_child(supervisor, spec, opts) do spec = Supervisor.child_spec(spec, [id: make_ref()] ++ opts) Supervisor.start_child(supervisor, spec) diff --git a/lib/flow/materialize.ex b/lib/flow/materialize.ex index 9ee15af..7b11cb4 100644 --- a/lib/flow/materialize.ex +++ b/lib/flow/materialize.ex @@ -5,21 +5,21 @@ defmodule Flow.Materialize do @map_reducer_opts [:buffer_keep, :buffer_size, :dispatcher] @supervisor_opts [:shutdown] - def materialize(%Flow{producers: nil}, _, _, _) do + def materialize(%Flow{producers: nil}, _, _, _, _) do raise ArgumentError, "cannot execute a flow without producers, " <> "please call \"from_enumerable\", \"from_stages\" or \"from_specs\" accordingly" end - def materialize(%Flow{} = flow, start_link, type, type_options) do + def materialize(%Flow{} = flow, start_link, type, type_options, flow_name) do %{operations: operations, options: options, producers: producers, window: window} = flow options = Keyword.merge(type_options, options) ops = split_operations(operations) {producers, consumers, ops, window} = - start_producers(producers, ops, start_link, window, options) + start_producers(producers, ops, start_link, window, options, flow_name) - {producers, start_stages(ops, window, consumers, start_link, type, options)} + {producers, start_stages(ops, window, consumers, start_link, type, options, flow_name)} end ## Helpers @@ -40,7 +40,7 @@ defmodule Flow.Materialize do end end - defp start_stages(:none, window, producers, _start_link, _type, _options) do + defp start_stages(:none, window, producers, _start_link, _type, _options, _flow_name) do if window != Flow.Window.global() do raise ArgumentError, "a window was set but no computation is happening on this partition" end @@ -50,13 +50,30 @@ defmodule Flow.Materialize do end end - defp start_stages({_mr, compiled_ops, _ops}, window, producers, start_link, type, opts) do + defp start_stages( + {_mr, compiled_ops, _ops}, + window, + producers, + start_link, + type, + opts, + flow_name + ) do {acc, reducer, trigger} = window_ops(window, compiled_ops, opts) - {stages, opts} = Keyword.pop(opts, :stages) + {name, opts} = Keyword.pop(opts, :name) + {partition_id, opts} = Keyword.pop(opts, :partition_id) + {index, opts} = Keyword.pop(opts, :index) {supervisor_opts, opts} = Keyword.split(opts, @supervisor_opts) {init_opts, subscribe_opts} = Keyword.split(opts, @map_reducer_opts) + if name && !flow_name do + raise ArgumentError, + "a partition is named '#{inspect(name)}', this requires the flow to be named" + end + + partition_name = name || "p#{partition_id}" + init_opts = case type do :consumer -> Keyword.drop(init_opts, [:dispatcher]) @@ -71,17 +88,35 @@ defmodule Flow.Materialize do end arg = {type, [subscribe_to: subscriptions] ++ init_opts, {i, stages}, trigger, acc, reducer} - {:ok, pid} = start_link.(map_reducer_spec(arg, supervisor_opts)) + + {:ok, pid} = + start_link.( + map_reducer_spec(arg, child_opts(flow_name, index, partition_name, i), supervisor_opts) + ) + {pid, [cancel: :transient]} end end - defp map_reducer_spec(arg, supervisor_opts) do + defp child_opts(nil, _index, _partition_name, _stage), do: [] + defp child_opts(_flow_name, _index, nil, _stage), do: [] + + defp child_opts(flow_name, index, partition_name, stage) do + [ + name: + [flow_name, index, partition_name, stage] + |> Enum.filter(& &1) + |> Enum.join("_") + |> String.to_atom() + ] + end + + defp map_reducer_spec(arg, child_opts, supervisor_opts) do shutdown = Keyword.get(supervisor_opts, :shutdown, 5000) %{ id: Flow.MapReducer, - start: {GenStage, :start_link, [Flow.MapReducer, arg, []]}, + start: {GenStage, :start_link, [Flow.MapReducer, arg, child_opts]}, modules: [Flow.MapReducer], shutdown: shutdown } @@ -94,13 +129,16 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flow_name ) do partitions = Keyword.fetch!(options, :stages) - {left_producers, left_consumers} = start_join(:left, left, left_key, partitions, start_link) + + {left_producers, left_consumers} = + start_join(:left, left, left_key, partitions, start_link, flow_name) {right_producers, right_consumers} = - start_join(:right, right, right_key, partitions, start_link) + start_join(:right, right, right_key, partitions, start_link, flow_name) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) @@ -121,9 +159,10 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flow_name ) do - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, options, flow_name) {type, {acc, fun, trigger}, ops} = ensure_ops(ops) stages = Keyword.fetch!(flow.options, :stages) @@ -134,19 +173,21 @@ defmodule Flow.Materialize do window} end - defp start_producers({:flows, flows}, ops, start_link, window, options) do + defp start_producers({:flows, flows}, ops, start_link, window, options, flow_name) do options = partition(options) {producers, consumers} = Enum.reduce(flows, {[], []}, fn flow, {producers_acc, consumers_acc} -> - {producers, consumers} = materialize(flow, start_link, :producer_consumer, options) + {producers, consumers} = + materialize(flow, start_link, :producer_consumer, options, flow_name) + {producers ++ producers_acc, consumers ++ consumers_acc} end) {producers, consumers, ensure_ops(ops), window} end - defp start_producers({:from_stages, producers}, ops, start_link, window, options) do + defp start_producers({:from_stages, producers}, ops, start_link, window, options, _flow_name) do producers = producers.(start_link) # If there are no ops and there is a need for a custom @@ -163,9 +204,12 @@ defmodule Flow.Materialize do ops, start_link, window, - options + options, + flow_name ) do - {producers, intermediary} = materialize(flow, start_link, :producer_consumer, options) + {producers, intermediary} = + materialize(flow, start_link, :producer_consumer, options, flow_name) + timeout = Keyword.get(options, :subscribe_timeout, 5_000) producers_consumers = producers_consumers.(start_link) @@ -182,7 +226,7 @@ defmodule Flow.Materialize do {producers, producers_consumers, ensure_ops(ops), window} end - defp start_producers({:enumerables, enumerables}, ops, start_link, window, options) do + defp start_producers({:enumerables, enumerables}, ops, start_link, window, options, _flow_name) do # If there are no ops, just start the enumerables with the options. # Otherwise it is a regular producer consumer with demand dispatcher. # In this case, options is used by subsequent mapper/reducer stages. @@ -341,14 +385,14 @@ defmodule Flow.Materialize do ## Joins - defp start_join(side, flow, key_fun, stages, start_link) do + defp start_join(side, flow, key_fun, stages, start_link, flow_name) do hash = fn event -> key = key_fun.(event) {{key, event}, :erlang.phash2(key, stages)} end opts = [dispatcher: {GenStage.PartitionDispatcher, partitions: 0..(stages - 1), hash: hash}] - {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts) + {producers, consumers} = materialize(flow, start_link, :producer_consumer, opts, flow_name) consumers = for {consumer, consumer_opts} <- consumers do diff --git a/test/flow_test.exs b/test/flow_test.exs index 1c62559..d54f123 100644 --- a/test/flow_test.exs +++ b/test/flow_test.exs @@ -1597,4 +1597,69 @@ defmodule FlowTest do assert_receive {:consumed, [1]} end end + + describe "stage naming" do + test "start_link/2 with :name", config do + {:ok, pid} = + Stream.cycle([1, 2, 3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_p1_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p1_2")) == nil + end + + test "start_link/2 with :name and naming a partition", config do + {:ok, pid} = + Stream.cycle([1, 2, 3]) + |> Flow.from_enumerable(stages: 1) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2, name: :output) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_p0_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_p0_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_output_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil + end + + test "start_link/2 with :name and naming all partitions", config do + {:ok, pid} = + Stream.cycle([1, 2, 3]) + |> Flow.from_enumerable(stages: 1, name: :input) + |> Flow.map(&(&1 + 1)) + |> Flow.partition(stages: 2, name: :output) + |> Flow.each(fn _ -> Process.sleep(:infinity) end) + |> Flow.start_link(name: config.test) + + flowname = Atom.to_string(config.test) + assert Process.whereis(String.to_atom(flowname)) == pid + assert Process.whereis(String.to_atom(flowname <> "_sup")) != nil + + assert Process.whereis(String.to_atom(flowname <> "_input_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_input_1")) == nil + + assert Process.whereis(String.to_atom(flowname <> "_output_0")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_1")) != nil + assert Process.whereis(String.to_atom(flowname <> "_output_2")) == nil + end + end end