Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Clustering:


Refactoring
- chunk transformer
- hanging streams after delete component
- empty_gen_mix at the end of composite. delete, insert, and replace the last component

Bugs:
- Topology.draw source, when there are a lot of sources at the beginning. Also check the SplitterTree test

New features:
- Splitter tree (copy, random, round-robin)
4 changes: 2 additions & 2 deletions lib/composite/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Strom.Composite.Topology do
{streams, index} ->
{inputs, outputs} =
case component do
%Source{} -> {[], Map.keys(outputs)}
%Source{} -> {Map.keys(outputs), Map.keys(outputs)}
%Sink{} -> {inputs, []}
_ -> {inputs, Map.keys(outputs)}
end
Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule Strom.Composite.Topology do

case nils do
[] ->
[output | acc]
acc ++ [output]

nils when is_list(nils) ->
{nil, closest_to_average} =
Expand Down
5 changes: 3 additions & 2 deletions lib/mixer_tree.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Strom.MixerTree do
list()
) :: Composite.t()

@parts 10
@parts 2

def new(inputs, output, opts \\ [])
when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0 and is_list(opts)) do
Expand All @@ -25,12 +25,13 @@ defmodule Strom.MixerTree do
inputs
|> Enum.chunk_every(parts)
|> Enum.reduce({[], [], 0}, fn stream_names, {acc, outputs, counter} ->
output = String.to_atom("mixer_tree_#{level}_#{counter}")
output = String.to_atom("_mt_#{level}#{counter}")
mixer = Mixer.new(stream_names, output, opts)
{[mixer | acc], [output | outputs], counter + 1}
end)

mixers = Enum.reverse(mixers)
outputs = Enum.reverse(outputs)

if count > parts do
mixers ++ build_mixers(outputs, level + 1, parts, final_output, opts)
Expand Down
69 changes: 69 additions & 0 deletions lib/splitter_tree.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
defmodule Strom.SplitterTree do
@moduledoc "Composite of mixers, use it when you need mixing a lot of streams"
alias Strom.Composite
alias Strom.Splitter

@type event() :: any()
@parts 2
@modes [:copy, :hash]
@default_mode :copy

@spec new(Strom.stream_name(), [Strom.stream_name()], list()) :: Composite.t()
def new(input, outputs, opts \\ [])
when is_atom(input) and is_list(outputs) and is_list(opts) do
{mode, opts} = define_mode(opts)
{parts, opts} = Keyword.pop(opts, :parts, @parts)
splitters = build_splitters(input, outputs, 0, parts, opts, mode)
Composite.new(splitters)
end

defp build_splitters(input, outputs, level, parts, opts, mode) do
{splitters, local_inputs, count} =
outputs
|> Enum.chunk_every(parts)
|> Enum.reduce({[], [], 0}, fn stream_names, {acc, local_inputs, counter} ->
local_input = String.to_atom("_st_#{level}#{counter}")
local_outputs = build_local_outputs(stream_names, mode)
splitter = Splitter.new(local_input, local_outputs, opts)
{[splitter | acc], [local_input | local_inputs], counter + 1}
end)

local_inputs = Enum.reverse(local_inputs)
splitters = Enum.reverse(splitters)

if count > parts do
build_splitters(input, local_inputs, level + 1, parts, opts, mode) ++ splitters
else
local_outputs = build_local_outputs(local_inputs, mode)
[Splitter.new(input, local_outputs, opts) | splitters]
end
end

defp define_mode(opts) do
{mode, opts} = Keyword.pop(opts, :mode, @default_mode)

if mode not in @modes do
raise "Mode #{mode} must be in #{@modes}"
end

{mode, opts}
end

defp build_local_outputs(stream_names, :copy) do
stream_names
end

defp build_local_outputs(stream_names, :hash) do
{local_outputs, _} =
Enum.reduce(stream_names, {%{}, 0}, fn output, {acc, index} ->
acc =
Map.put(acc, output, fn event ->
:erlang.phash2(event, length(stream_names)) == index
end)

{acc, index + 1}
end)

local_outputs
end
end
30 changes: 28 additions & 2 deletions test/composite/topology/draw_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Strom.Composite.Topology.DrawTest do
use ExUnit.Case, async: false
alias Strom.{Composite, Mixer, MixerTree, Transformer, Sink, Source, Splitter}
alias Strom.{Composite, Mixer, Transformer, Sink, Source, Splitter}
alias Strom.{MixerTree, SplitterTree}
alias Strom.Composite.Topology

test "draw example 1" do
Expand Down Expand Up @@ -35,7 +36,7 @@ defmodule Strom.Composite.Topology.DrawTest do
end

test "draw mixer tree" do
mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 2)
mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 3)
transformer = Transformer.new(:stream, & &1)

composite =
Expand All @@ -56,4 +57,29 @@ defmodule Strom.Composite.Topology.DrawTest do

Topology.draw(composite)
end

test "draw several sources and sinks" do
source1 = Source.new(:s1, [])
source2 = Source.new(:s2, [])
source3 = Source.new(:s3, [])
sink1 = Sink.new(:s1, %Sink{})
sink2 = Sink.new(:s2, %Sink{})
sink3 = Sink.new(:s3, %Sink{})

composite =
[source1, source2, source3, sink1, sink2, sink3]
|> Composite.new()

Topology.draw(composite)
end

test "draw mixer_tree (order of streams is important)" do
mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 3)
Topology.draw(mixer_tree)
end

test "draw splitter_tree" do
splitter_tree = SplitterTree.new(:stream, [:s1, :s2, :s3, :s4, :s5, :s6, :s7], parts: 3)
Topology.draw(splitter_tree)
end
end
111 changes: 4 additions & 107 deletions test/data/output.csv
Original file line number Diff line number Diff line change
@@ -1,107 +1,4 @@
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3
ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2
ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2
1
3
4
5
50 changes: 34 additions & 16 deletions test/mixer_tree_test.exs
Original file line number Diff line number Diff line change
@@ -1,29 +1,47 @@
defmodule Strom.MixerTreeTest do
use ExUnit.Case, async: true

alias Strom.{Composite, MixerTree, Source}
alias Strom.{Composite, MixerTree}

@tag timeout: :infinity
test "messages" do
count = :rand.uniform(100)
test "mixes 5 streams with 2 parts" do
count = 5
parts = 2
names = Enum.map(1..count, &String.to_atom("s#{&1}"))

mixer_tree =
names
|> MixerTree.new(:stream, parts: parts)
|> Composite.start()

assert length(Composite.components(mixer_tree)) == 6

names = Enum.map(1..count, &String.to_atom("tick#{&1}"))
flow =
names
|> Enum.reduce(%{}, fn name, acc -> Map.put(acc, name, Enum.to_list(1..10)) end)
|> Composite.call(mixer_tree)

assert length(Enum.to_list(flow[:stream])) == count * 10
Composite.stop(mixer_tree)
end

test "mixes random number of streams" do
count = :rand.uniform(100)

sources =
Enum.map(names, fn name ->
Source.new(name, [:tick])
end)
names = Enum.map(1..count, &String.to_atom("s#{&1}"))

mixer = MixerTree.new(names, :stream, parts: 5 + :rand.uniform(5))
parts = 5 + :rand.uniform(5)

composite =
[sources, mixer]
|> Composite.new()
mixer_tree =
names
|> MixerTree.new(:stream, parts: parts)
|> Composite.start()

flow = Composite.call(%{}, composite)
flow =
names
|> Enum.reduce(%{}, fn name, acc -> Map.put(acc, name, Enum.to_list(1..10)) end)
|> Composite.call(mixer_tree)

assert length(Enum.to_list(flow[:stream])) == count
Composite.stop(composite)
assert length(Enum.to_list(flow[:stream])) == count * 10
Composite.stop(mixer_tree)
end
end
Loading