From 4db17d8e8a8213007a1f7ae51fff4481d13ff47b Mon Sep 17 00:00:00 2001 From: Matt Gillooly Date: Tue, 24 Jun 2014 16:27:40 -0400 Subject: [PATCH] start of a refac to streaming_hadoop_step to enable better hooks for testing --- lib/pipely/build/template_helpers.rb | 62 +++++++++++++++++----------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/lib/pipely/build/template_helpers.rb b/lib/pipely/build/template_helpers.rb index f93386f..729d1ea 100644 --- a/lib/pipely/build/template_helpers.rb +++ b/lib/pipely/build/template_helpers.rb @@ -17,42 +17,56 @@ def s3n_step_path(path) "#{s3n_step_prefix if '/' == path[0]}#{path}" end - def streaming_hadoop_step(options) - parts = [ '/home/hadoop/contrib/streaming/hadoop-streaming.jar' ] + # Renders command for a streaming Hadoop step + # + class StreamingHadoopStep - if jars = options[:lib_jars] - parts += Array(jars).map { |jar| ['-libjars', "#{jar}"] }.flatten - end + def initialize + @parts = [ '/home/hadoop/contrib/streaming/hadoop-streaming.jar' ] - (options[:defs] || {}).each do |name, value| - parts += ['-D', "#{name}=#{value}"] + yield self if block_given? end - Array(options[:input]).each do |input| - parts += [ '-input', s3n_asset_path(input) ] + def apply_part(key, values) + values.each do |value| + @parts += [key, value] + end end - Array(options[:output]).each do |output| - parts += ['-output', s3_asset_path(output) ] + def to_s + @parts.join(',') end - Array(options[:mapper]).each do |mapper| - parts += ['-mapper', s3n_step_path(mapper) ] - end + end - Array(options[:reducer]).each do |reducer| - parts += ['-reducer', s3n_step_path(reducer) ] - end + def s3n_asset_paths(paths) + Array(paths).map { |p| s3n_asset_path(p) } + end - Array(options[:cache_file]).each do |cache_file| - parts += ['-cacheFile', s3n_asset_path(cache_file)] - end + def s3_asset_paths(paths) + Array(paths).map { |p| s3_asset_path(p) } + end - (options[:env] || {}).each do |name, value| - parts += ['-cmdenv', "#{name}=#{value}"] - end + def s3n_step_paths(paths) + Array(paths).map { |p| s3n_step_path(p) } + end - parts.join(',') + def env_vars(options) + return [] unless options + options.map { |k, v| "#{ k }=#{ v }" } + end + + def streaming_hadoop_step(options) + StreamingHadoopStep.new do |s| + s.apply_part('-libjars', Array(options[:lib_jars])) + s.apply_part('-D', env_vars(options[:defs])) + s.apply_part('-input', s3n_asset_paths(options[:input])) + s.apply_part('-output', s3_asset_paths(options[:output])) + s.apply_part('-mapper', s3n_step_paths(options[:mapper])) + s.apply_part('-reducer', s3n_step_paths(options[:reducer])) + s.apply_part('-cacheFile', s3n_asset_paths(options[:cache_file])) + s.apply_part('-cmdenv', env_vars(options[:env])) + end.to_s end end