-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel.ex
More file actions
129 lines (107 loc) · 4.04 KB
/
parallel.ex
File metadata and controls
129 lines (107 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
defmodule Parallel do
@moduledoc """
This module shows how you can hide the synchronous and asynchronous behaviour
behind an API.
"""
@doc """
This shows a common technique in Elixir/Erlang. If something is fast, do
it synchronous (either in process or multi-process), and it looks like
a big problem, then do it asynchronous, which lets the caller do other
work and check back later for the result.
Runs a parallel map by either a) in current process, b) synchronous multi-process or
c) fully asynchronous multi-process, and you need to wait for result.
It chooses which one to run based on the size of the collection.
Only works with ranges currently.
## Parameters
- range: a collection that can be enumerated
- function: an anonymous function to apply to each element in collection
## Examples
iex> Parallel.map_or_pmap_or_apmap((0..4), fn(x) -> x * x end)
[0, 1, 4, 9, 16]
"""
def map_or_pmap_or_apmap(%Range{} = range, function) do
me = self
first..last = range
result = case last - first do
x when x > 1000 ->
IO.puts("<delayed_pmap>")
pid = spawn_link( fn -> pmap_wrapper(me, range, function) end)
{:delayed, pid}
x when x > 100 ->
IO.puts("<pmap>")
{:ok, pmap(range, fn(x) -> x * x end) }
_ ->
IO.puts("<normal map>")
{:ok, Enum.map(range, fn(x) -> x * x end) }
end
result
end
def map_or_pmap(%Range{} = range, function) do
first..last = range
result = case last - first > 1000 do
true -> pmap(range, function)
_ -> Enum.map((0..100000), fn(x) -> x * x end)
end
result
end
@doc """
runs a parallel map by spawning a bunch of processes and returns array of results
## Parameters
- collection: a collection that can be enumerated
- function: an anonymous function to apply to each element in collection
## Examples
iex> Parallel.pmap((0..4), fn(x) -> x * x end)
[0, 1, 4, 9, 16]
"""
def pmap(collection, function) do
me = self
pid_list = Enum.map(collection, fn (elem) ->
spawn_link fn -> (send me, { self, function.(elem) }) end
end)
Enum.map(pid_list, fn (pid) ->
receive do { ^pid, result } -> result end
end)
end
def pmap_wrapper(me, collection, function) do
result = pmap(collection, function)
send me, {self, result}
end
def map(collection, function) do
Enum.map(collection, function)
end
def handle_result(result = {:delayed, pid}) do
IO.puts(">> Running asynchronously, I am not going to wait for results:")
waiting(pid)
end
def handle_result(result = {:ok, arr}) do
IO.puts(">> Ran synchronously, here are the results:")
IO.inspect(arr)
end
def run_fail_test do
[0, 1, 3, 9, 16 ] = Parallel.pmap((0..4), fn(x) -> x * x end) # multi-process
end
def run_tests do
IO.puts "------map:"
IO.inspect Parallel.map((0..100000), fn(x) -> x * x end) # single-process
IO.puts "------pmap"
IO.inspect Parallel.pmap((0..4), fn(x) -> x * x end) # multi-process
IO.puts "------map_or_pmap use multi-process"
IO.inspect Parallel.map_or_pmap((0..100000), fn(x) -> x * x end) # use multi-process
IO.puts "------map_or_pmap use this (one) process"
IO.inspect Parallel.map_or_pmap((0..100), fn(x) -> x * x end) # use single-process
IO.puts "checking the 3 versions"
# But what if you only want to wait if it is a BIG one...
Parallel.handle_result(Parallel.map_or_pmap_or_apmap((0..10), fn(x) -> x * x end))
Parallel.handle_result(Parallel.map_or_pmap_or_apmap((0..150), fn(x) -> x * x end))
Parallel.handle_result(Parallel.map_or_pmap_or_apmap((0..1000000), fn(x) -> x * x end))
end
def waiting(pid) do
receive do
{ ^pid, result } -> IO.puts("Results have ready. Yay!:")
IO.inspect(result)
after
1_000 -> IO.puts "still waiting... more hamsters needed..."
waiting(pid)
end
end
end