Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions lib/pipely/build/template_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,56 @@ def s3n_step_path(path)
"#{s3n_step_prefix if '/' == path[0]}#{path}"
end

def streaming_hadoop_step(options)
parts = [ '/home/hadoop/contrib/streaming/hadoop-streaming.jar' ]
# Renders command for a streaming Hadoop step
#
class StreamingHadoopStep

if jars = options[:lib_jars]
parts += Array(jars).map { |jar| ['-libjars', "#{jar}"] }.flatten
end
def initialize
@parts = [ '/home/hadoop/contrib/streaming/hadoop-streaming.jar' ]

(options[:defs] || {}).each do |name, value|
parts += ['-D', "#{name}=#{value}"]
yield self if block_given?
end

Array(options[:input]).each do |input|
parts += [ '-input', s3n_asset_path(input) ]
def apply_part(key, values)
values.each do |value|
@parts += [key, value]
end
end

Array(options[:output]).each do |output|
parts += ['-output', s3_asset_path(output) ]
def to_s
@parts.join(',')
end

Array(options[:mapper]).each do |mapper|
parts += ['-mapper', s3n_step_path(mapper) ]
end
end

Array(options[:reducer]).each do |reducer|
parts += ['-reducer', s3n_step_path(reducer) ]
end
def s3n_asset_paths(paths)
Array(paths).map { |p| s3n_asset_path(p) }
end

Array(options[:cache_file]).each do |cache_file|
parts += ['-cacheFile', s3n_asset_path(cache_file)]
end
def s3_asset_paths(paths)
Array(paths).map { |p| s3_asset_path(p) }
end

(options[:env] || {}).each do |name, value|
parts += ['-cmdenv', "#{name}=#{value}"]
end
def s3n_step_paths(paths)
Array(paths).map { |p| s3n_step_path(p) }
end

parts.join(',')
def env_vars(options)
return [] unless options
options.map { |k, v| "#{ k }=#{ v }" }
end

def streaming_hadoop_step(options)
StreamingHadoopStep.new do |s|
s.apply_part('-libjars', Array(options[:lib_jars]))
s.apply_part('-D', env_vars(options[:defs]))
s.apply_part('-input', s3n_asset_paths(options[:input]))
s.apply_part('-output', s3_asset_paths(options[:output]))
s.apply_part('-mapper', s3n_step_paths(options[:mapper]))
s.apply_part('-reducer', s3n_step_paths(options[:reducer]))
s.apply_part('-cacheFile', s3n_asset_paths(options[:cache_file]))
s.apply_part('-cmdenv', env_vars(options[:env]))
end.to_s
end

end
Expand Down