From 2833d041da445f970abe0cbab266d6698d9458bd Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Fri, 18 Apr 2014 10:48:48 -0400 Subject: [PATCH 1/7] Pipely::AWSClient uses aws sdk to query pipeline logs --- lib/pipely/aws_client.rb | 89 ++++++++++++++++++++++++++++++++++++++++ pipely.gemspec | 1 + 2 files changed, 90 insertions(+) create mode 100644 lib/pipely/aws_client.rb diff --git a/lib/pipely/aws_client.rb b/lib/pipely/aws_client.rb new file mode 100644 index 0000000..e5cb771 --- /dev/null +++ b/lib/pipely/aws_client.rb @@ -0,0 +1,89 @@ +require 'aws-sdk' + +module Pipely + + # Use AWS SDK to get information about a pipeline + class AWSClient + def initialize(pipeline_id) + configure + @data_pipeline = AWS::DataPipeline.new + @pipeline_id = pipeline_id + end + + def get_log_paths + ids = get_object_ids_of_type('ShellCommandActivity') + ids.inject({}) do |memo, id| + if logs = get_log_paths_for_object(id) + memo[id] = logs + end + memo + end + end + + def get_object_ids_of_type(type) + query_objects( + selectors: [ + { + field_name: 'type', + operator: { + type: 'EQ', + values: [type], + } + } + ] + ) + end + + def get_log_paths_for_object(object_id) + instance = get_object_instance(object_id) + stderr, stdout = evaluate_expression( + '#{stderr + "," + stdout}', + instance + ).split(',') + + { stderr: stderr, stdout: stdout } + + rescue AWS::DataPipeline::Errors::InvalidRequestException + $stderr.puts "Missing stderr and/or stdout fields for #{object_id}" + nil + end + + def get_object_instance(object_id) + evaluate_expression( + '#{@activeInstances}', + object_id + ) + end + + def evaluate_expression(expression, object_id) + @data_pipeline.client.evaluate_expression({ + expression: expression, + object_id: object_id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + def query_objects(query, sphere='COMPONENT') + @data_pipeline.client.query_objects({ + pipeline_id: @pipeline_id, + query: query, + sphere: sphere, + })[:ids] + end + + private + def configure + if data = File.open(File.expand_path('~/.aws-sdk')).read + + config = YAML.load(data)['default'] + + AWS.config( + access_key_id: config[:access_key_id], + secret_access_key: config[:secret_access_key], + region: config[:region] + ) + end + end + + end +end diff --git a/pipely.gemspec b/pipely.gemspec index e0871e8..f627cc3 100644 --- a/pipely.gemspec +++ b/pipely.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |s| s.add_dependency "unf" s.add_dependency "uuidtools" s.add_dependency "activesupport" + s.add_dependency "aws-sdk", "~> 1.38" s.add_development_dependency "rspec" s.add_development_dependency "cane" s.add_development_dependency "timecop" From 3f8dbe234b11bd0930e9a219221b33f71c0070aa Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Fri, 18 Apr 2014 10:50:05 -0400 Subject: [PATCH 2/7] pipely has a -s/--logs command line option prints s3 logs for a shell command activity or given no argument prints all s3 logs --- bin/pipely | 5 +++- lib/pipely/actions.rb | 1 + lib/pipely/actions/list_log_paths.rb | 40 ++++++++++++++++++++++++++++ lib/pipely/options.rb | 15 ++++++++++- 4 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 lib/pipely/actions/list_log_paths.rb diff --git a/bin/pipely b/bin/pipely index 343559a..4c5d0c0 100755 --- a/bin/pipely +++ b/bin/pipely @@ -4,7 +4,10 @@ require 'pipely' options = Pipely::Options.parse -if options.pipeline_id +if options.pipeline_id && options.list_log_paths + Pipely::Actions::ListLogPaths.new(options).execute + +elsif options.pipeline_id Pipely::Actions::GraphLivePipeline.new(options).execute elsif options.input_path diff --git a/lib/pipely/actions.rb b/lib/pipely/actions.rb index 588dd26..c343521 100644 --- a/lib/pipely/actions.rb +++ b/lib/pipely/actions.rb @@ -1,6 +1,7 @@ require 'pipely/actions/graph_live_pipeline' require 'pipely/actions/graph_file_pipeline' require 'pipely/actions/list_live_pipelines' +require 'pipely/actions/list_log_paths' module Pipely module Actions diff --git a/lib/pipely/actions/list_log_paths.rb b/lib/pipely/actions/list_log_paths.rb new file mode 100644 index 0000000..bba99b5 --- /dev/null +++ b/lib/pipely/actions/list_log_paths.rb @@ -0,0 +1,40 @@ +require 'pp' +require 'pipely/aws_client' + +module Pipely + module Actions + + # List currently deployed pipelines + # + class ListLogPaths + + def initialize(options) + @options = options + end + + def execute + if @options.object_id + $stdout.puts PP.pp(log_paths_for_object, "") + else + $stdout.puts PP.pp(log_paths, "") + end + end + + private + + def log_paths + data_pipeline.get_log_paths + end + + def log_paths_for_object + data_pipeline.get_log_paths_for_object(@options.object_id) + end + + def data_pipeline + Pipely::AWSClient.new(@options.pipeline_id) + end + + end + + end +end diff --git a/lib/pipely/options.rb b/lib/pipely/options.rb index 7268e0d..332202e 100644 --- a/lib/pipely/options.rb +++ b/lib/pipely/options.rb @@ -7,7 +7,8 @@ module Pipely class Options attr_accessor :pipeline_id, :input_path, :output_path, - :verbose, :automatic_open, :json_output, :latest_run + :verbose, :automatic_open, :json_output, :latest_run, + :object_id, :list_log_paths def self.parse options = Pipely::Options.new @@ -37,6 +38,18 @@ def self.parse opts.on("-j", "--json", "Write STDOUT formatted as JSON") do |json| options.json_output = json end + + opts.on("-s", "--logs [OBJECT_ID]", + "Print s3 log paths for an object") do |obj_id| + options.object_id = obj_id + options.list_log_paths = true + end + + opts.on_tail("-h", "--help", "Show this message") do + puts opts + exit + end + end.parse! options From ea53438ac8b364e8f13e2a4654068811466e0ad1 Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Mon, 21 Apr 2014 15:22:04 -0400 Subject: [PATCH 3/7] start to pull pipeline logic out of aws_client and into pipeline-specific objects --- lib/pipely/actions/list_log_paths.rb | 14 ++-- lib/pipely/aws/data_pipeline/component.rb | 35 +++++++++ lib/pipely/aws/data_pipeline/instance.rb | 40 +++++++++++ lib/pipely/aws/data_pipeline/pipeline.rb | 66 +++++++++++++++++ lib/pipely/aws_client.rb | 87 +++++------------------ 5 files changed, 166 insertions(+), 76 deletions(-) create mode 100644 lib/pipely/aws/data_pipeline/component.rb create mode 100644 lib/pipely/aws/data_pipeline/instance.rb create mode 100644 lib/pipely/aws/data_pipeline/pipeline.rb diff --git a/lib/pipely/actions/list_log_paths.rb b/lib/pipely/actions/list_log_paths.rb index bba99b5..c1815bd 100644 --- a/lib/pipely/actions/list_log_paths.rb +++ b/lib/pipely/actions/list_log_paths.rb @@ -1,5 +1,5 @@ require 'pp' -require 'pipely/aws_client' +require 'pipely/aws/data_pipeline/pipeline' module Pipely module Actions @@ -14,7 +14,7 @@ def initialize(options) def execute if @options.object_id - $stdout.puts PP.pp(log_paths_for_object, "") + $stdout.puts PP.pp(log_paths_for_component, "") else $stdout.puts PP.pp(log_paths, "") end @@ -22,16 +22,16 @@ def execute private - def log_paths - data_pipeline.get_log_paths + def log_paths_for_component + data_pipeline.log_paths_for_component(@options.object_id) end - def log_paths_for_object - data_pipeline.get_log_paths_for_object(@options.object_id) + def log_paths + data_pipeline.log_paths end def data_pipeline - Pipely::AWSClient.new(@options.pipeline_id) + Pipely::DataPipeline::Pipeline.new(@options.pipeline_id) end end diff --git a/lib/pipely/aws/data_pipeline/component.rb b/lib/pipely/aws/data_pipeline/component.rb new file mode 100644 index 0000000..63231ce --- /dev/null +++ b/lib/pipely/aws/data_pipeline/component.rb @@ -0,0 +1,35 @@ +require 'pipely/aws_client' +require 'pipely/aws/data_pipeline/instance' + +module Pipely + + module DataPipeline + + class Component < Pipely::AWSClient + + def initialize(pipeline_id, component_id) + super() + + @id = component_id + @pipeline_id = pipeline_id + end + + def active_instances + Pipely::DataPipeline::Instance.new( + @pipeline_id, + @id, + evaluate_expression('#{@activeInstances}') + ) + end + + def evaluate_expression(expression) + @api.evaluate_expression({ + expression: expression, + object_id: @id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + end + end +end diff --git a/lib/pipely/aws/data_pipeline/instance.rb b/lib/pipely/aws/data_pipeline/instance.rb new file mode 100644 index 0000000..9f75a77 --- /dev/null +++ b/lib/pipely/aws/data_pipeline/instance.rb @@ -0,0 +1,40 @@ +require 'pipely/aws_client' + +module Pipely + + module DataPipeline + + class Instance < Pipely::AWSClient + + def initialize(pipeline_id, component_id, instance_id) + super() + + @id = instance_id + @component_id = component_id + @pipeline_id = pipeline_id + end + + def log_paths + stderr, stdout = evaluate_expression( + '#{stderr + "," + stdout}', + ).split(',') + + { stderr: stderr, stdout: stdout } + + rescue AWS::DataPipeline::Errors::InvalidRequestException => ex + $stderr.puts ex.inspect + $stderr.puts "Can't find log paths for #{@id}" + nil + end + + def evaluate_expression(expression) + @api.evaluate_expression({ + expression: expression, + object_id: @id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + end + end +end diff --git a/lib/pipely/aws/data_pipeline/pipeline.rb b/lib/pipely/aws/data_pipeline/pipeline.rb new file mode 100644 index 0000000..ad1bd51 --- /dev/null +++ b/lib/pipely/aws/data_pipeline/pipeline.rb @@ -0,0 +1,66 @@ +require 'pipely/aws_client' +require 'pipely/aws/data_pipeline/component' + +module Pipely + + module DataPipeline + + class Pipeline < Pipely::AWSClient + + def initialize(pipeline_id) + super() + + @id = pipeline_id + end + + def log_paths + ids = get_components_of_type('ShellCommandActivity') + + ids.inject({}) do |memo, component_id| + logs = log_paths_for_component(component_id) + memo[component_id] = logs if logs + + memo + end + end + + def log_paths_for_component(component_id) + Pipely::DataPipeline::Component.new( + @id, + component_id + ).active_instances.log_paths + end + + def get_components_of_type(type) + query_components( + selectors: [ + { + field_name: 'type', + operator: { + type: 'EQ', + values: [type], + } + } + ] + ) + end + + def query_components(query) + ids = @api.query_objects({ + pipeline_id: @id, + query: query, + sphere: 'COMPONENT', + })[:ids] + end + + def evaluate_expression(expression, object_id) + @api.evaluate_expression({ + expression: expression, + object_id: object_id, + pipeline_id: @id, + })[:evaluated_expression] + end + + end + end +end diff --git a/lib/pipely/aws_client.rb b/lib/pipely/aws_client.rb index e5cb771..a12ca67 100644 --- a/lib/pipely/aws_client.rb +++ b/lib/pipely/aws_client.rb @@ -4,84 +4,33 @@ module Pipely # Use AWS SDK to get information about a pipeline class AWSClient - def initialize(pipeline_id) - configure - @data_pipeline = AWS::DataPipeline.new - @pipeline_id = pipeline_id - end - def get_log_paths - ids = get_object_ids_of_type('ShellCommandActivity') - ids.inject({}) do |memo, id| - if logs = get_log_paths_for_object(id) - memo[id] = logs - end - memo - end - end + class PipelyConfigNotFound < StandardError; end - def get_object_ids_of_type(type) - query_objects( - selectors: [ - { - field_name: 'type', - operator: { - type: 'EQ', - values: [type], - } - } - ] - ) + def initialize + configure + @api = AWS::DataPipeline.new.client end - def get_log_paths_for_object(object_id) - instance = get_object_instance(object_id) - stderr, stdout = evaluate_expression( - '#{stderr + "," + stdout}', - instance - ).split(',') - - { stderr: stderr, stdout: stdout } - - rescue AWS::DataPipeline::Errors::InvalidRequestException - $stderr.puts "Missing stderr and/or stdout fields for #{object_id}" - nil - end + def configure + config = load_config - def get_object_instance(object_id) - evaluate_expression( - '#{@activeInstances}', - object_id + AWS.config( + access_key_id: config[:access_key_id], + secret_access_key: config[:secret_access_key], + region: config[:region] ) end - def evaluate_expression(expression, object_id) - @data_pipeline.client.evaluate_expression({ - expression: expression, - object_id: object_id, - pipeline_id: @pipeline_id, - })[:evaluated_expression] - end - - def query_objects(query, sphere='COMPONENT') - @data_pipeline.client.query_objects({ - pipeline_id: @pipeline_id, - query: query, - sphere: sphere, - })[:ids] - end - private - def configure - if data = File.open(File.expand_path('~/.aws-sdk')).read - - config = YAML.load(data)['default'] - - AWS.config( - access_key_id: config[:access_key_id], - secret_access_key: config[:secret_access_key], - region: config[:region] - ) + def load_config + path = File.expand_path('~/.pipely') + + if File.exist?(path) && data = File.open(path) + YAML.load(data) + else + raise PipelyConfigNotFound, + 'Need a .pipely file in home directory' end end From 8c20cda15451179f2187925e029a9dd74886c1d8 Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Tue, 22 Apr 2014 12:06:20 -0400 Subject: [PATCH 4/7] Splits responsibilities of aws client now ConfigureAws looks at the .pipely file The Api classes are singletons that mixin ConfigureAws so we aren't making repeated calls to configure/create the aws-sdk client objects Component, Instance, and Pipeline, don't inherit from anything --- .../{aws_client.rb => aws/configure_aws.rb} | 8 ++---- lib/pipely/aws/data_pipeline/api.rb | 26 +++++++++++++++++++ lib/pipely/aws/data_pipeline/component.rb | 7 +++-- lib/pipely/aws/data_pipeline/instance.rb | 6 ++--- lib/pipely/aws/data_pipeline/pipeline.rb | 7 +++-- lib/pipely/aws/emr/api.rb | 26 +++++++++++++++++++ lib/pipely/aws/emr/cluster.rb | 20 ++++++++++++++ 7 files changed, 83 insertions(+), 17 deletions(-) rename lib/pipely/{aws_client.rb => aws/configure_aws.rb} (86%) create mode 100644 lib/pipely/aws/data_pipeline/api.rb create mode 100644 lib/pipely/aws/emr/api.rb create mode 100644 lib/pipely/aws/emr/cluster.rb diff --git a/lib/pipely/aws_client.rb b/lib/pipely/aws/configure_aws.rb similarity index 86% rename from lib/pipely/aws_client.rb rename to lib/pipely/aws/configure_aws.rb index a12ca67..991c54f 100644 --- a/lib/pipely/aws_client.rb +++ b/lib/pipely/aws/configure_aws.rb @@ -3,15 +3,10 @@ module Pipely # Use AWS SDK to get information about a pipeline - class AWSClient + module ConfigureAws class PipelyConfigNotFound < StandardError; end - def initialize - configure - @api = AWS::DataPipeline.new.client - end - def configure config = load_config @@ -35,4 +30,5 @@ def load_config end end + end diff --git a/lib/pipely/aws/data_pipeline/api.rb b/lib/pipely/aws/data_pipeline/api.rb new file mode 100644 index 0000000..4924b24 --- /dev/null +++ b/lib/pipely/aws/data_pipeline/api.rb @@ -0,0 +1,26 @@ +require_relative '../configure_aws' +require 'aws-sdk' +require 'singleton' + +module Pipely + + module DataPipeline + + class Api + include Pipely::ConfigureAws + include Singleton + + attr_accessor :client + + def initialize + super() + + configure + + @client = AWS::DataPipeline.new.client + end + end + + end + +end diff --git a/lib/pipely/aws/data_pipeline/component.rb b/lib/pipely/aws/data_pipeline/component.rb index 63231ce..1166e63 100644 --- a/lib/pipely/aws/data_pipeline/component.rb +++ b/lib/pipely/aws/data_pipeline/component.rb @@ -1,15 +1,14 @@ -require 'pipely/aws_client' require 'pipely/aws/data_pipeline/instance' +require 'pipely/aws/data_pipeline/api' module Pipely module DataPipeline - class Component < Pipely::AWSClient + class Component def initialize(pipeline_id, component_id) - super() - + @api = Pipely::DataPipeline::Api.instance.client @id = component_id @pipeline_id = pipeline_id end diff --git a/lib/pipely/aws/data_pipeline/instance.rb b/lib/pipely/aws/data_pipeline/instance.rb index 9f75a77..0d12338 100644 --- a/lib/pipely/aws/data_pipeline/instance.rb +++ b/lib/pipely/aws/data_pipeline/instance.rb @@ -1,13 +1,13 @@ -require 'pipely/aws_client' +require 'pipely/aws/data_pipeline/api' module Pipely module DataPipeline - class Instance < Pipely::AWSClient + class Instance def initialize(pipeline_id, component_id, instance_id) - super() + @api = Pipely::DataPipeline::Api.instance.client @id = instance_id @component_id = component_id diff --git a/lib/pipely/aws/data_pipeline/pipeline.rb b/lib/pipely/aws/data_pipeline/pipeline.rb index ad1bd51..09dd327 100644 --- a/lib/pipely/aws/data_pipeline/pipeline.rb +++ b/lib/pipely/aws/data_pipeline/pipeline.rb @@ -1,15 +1,14 @@ -require 'pipely/aws_client' +require 'pipely/aws/data_pipeline/api' require 'pipely/aws/data_pipeline/component' module Pipely module DataPipeline - class Pipeline < Pipely::AWSClient + class Pipeline def initialize(pipeline_id) - super() - + @api = Pipely::DataPipeline::Api.instance.client @id = pipeline_id end diff --git a/lib/pipely/aws/emr/api.rb b/lib/pipely/aws/emr/api.rb new file mode 100644 index 0000000..d929adb --- /dev/null +++ b/lib/pipely/aws/emr/api.rb @@ -0,0 +1,26 @@ +require_relative '../configure_aws' +require 'aws-sdk' +require 'singleton' + +module Pipely + + module Emr + + class Api + include Pipely::ConfigureAws + include Singleton + + attr_accessor :client + + def initialize + super() + + configure + + @client = AWS::EMR.new.client + end + end + + end + +end diff --git a/lib/pipely/aws/emr/cluster.rb b/lib/pipely/aws/emr/cluster.rb new file mode 100644 index 0000000..c345cb5 --- /dev/null +++ b/lib/pipely/aws/emr/cluster.rb @@ -0,0 +1,20 @@ +require 'pipely/aws_client' + +module Pipely + + module Emr + + class InstanceGroup < Pipely::AWSClient + + def initialize(instance_group_id) + super() + + @id = instance_group_id + end + + def log_paths + end + + end + end +end From c7b262651212c53c610d507fe7b18fd9e4421eee Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Tue, 22 Apr 2014 17:40:11 -0400 Subject: [PATCH 5/7] list log paths will also give you the emr step if possible --- lib/pipely/actions/list_log_paths.rb | 7 +++++ lib/pipely/aws/data_pipeline/attempt.rb | 18 ++++++++++++ lib/pipely/aws/data_pipeline/component.rb | 24 ++++++++++++++++ lib/pipely/aws/data_pipeline/instance.rb | 2 ++ lib/pipely/aws/data_pipeline/pipeline.rb | 35 +++++++++++++++++++++++ lib/pipely/aws/emr/api.rb | 6 ++++ lib/pipely/aws/emr/cluster.rb | 20 ------------- 7 files changed, 92 insertions(+), 20 deletions(-) create mode 100644 lib/pipely/aws/data_pipeline/attempt.rb delete mode 100644 lib/pipely/aws/emr/cluster.rb diff --git a/lib/pipely/actions/list_log_paths.rb b/lib/pipely/actions/list_log_paths.rb index c1815bd..1c2149c 100644 --- a/lib/pipely/actions/list_log_paths.rb +++ b/lib/pipely/actions/list_log_paths.rb @@ -14,7 +14,10 @@ def initialize(options) def execute if @options.object_id + $stdout.puts "\nLog paths for object:" $stdout.puts PP.pp(log_paths_for_component, "") + $stdout.puts "\nEMR step for object:" + $stdout.puts PP.pp(emr_step_for_component, "") else $stdout.puts PP.pp(log_paths, "") end @@ -22,6 +25,10 @@ def execute private + def emr_step_for_component + data_pipeline.emr_step_for_component(@options.object_id) + end + def log_paths_for_component data_pipeline.log_paths_for_component(@options.object_id) end diff --git a/lib/pipely/aws/data_pipeline/attempt.rb b/lib/pipely/aws/data_pipeline/attempt.rb new file mode 100644 index 0000000..eb2515c --- /dev/null +++ b/lib/pipely/aws/data_pipeline/attempt.rb @@ -0,0 +1,18 @@ +require 'pipely/aws/data_pipeline/api' + +module Pipely + + module DataPipeline + + class Attempt + + def initialize(pipeline_id, attempt_id) + @api = Pipely::DataPipeline::Api.instance.client + + @id = attempt_id + @pipeline_id = pipeline_id + end + + end + end +end diff --git a/lib/pipely/aws/data_pipeline/component.rb b/lib/pipely/aws/data_pipeline/component.rb index 1166e63..be4681a 100644 --- a/lib/pipely/aws/data_pipeline/component.rb +++ b/lib/pipely/aws/data_pipeline/component.rb @@ -1,4 +1,5 @@ require 'pipely/aws/data_pipeline/instance' +require 'pipely/aws/data_pipeline/attempt' require 'pipely/aws/data_pipeline/api' module Pipely @@ -13,6 +14,29 @@ def initialize(pipeline_id, component_id) @pipeline_id = pipeline_id end + def attempts + query = { + selectors: [ { + field_name: '@componentParent', + operator: { + type: 'REF_EQ', + values: [@id] + } + } ] + } + + @api.query_objects( + pipeline_id: @pipeline_id, + sphere: 'ATTEMPT', + query: query + )[:ids].map do |id| + Pipely::DataPipeline::Attempt.new( + @pipeline_id, + id + ) + end + end + def active_instances Pipely::DataPipeline::Instance.new( @pipeline_id, diff --git a/lib/pipely/aws/data_pipeline/instance.rb b/lib/pipely/aws/data_pipeline/instance.rb index 0d12338..759a0fa 100644 --- a/lib/pipely/aws/data_pipeline/instance.rb +++ b/lib/pipely/aws/data_pipeline/instance.rb @@ -6,6 +6,8 @@ module DataPipeline class Instance + attr_accessor :id + def initialize(pipeline_id, component_id, instance_id) @api = Pipely::DataPipeline::Api.instance.client diff --git a/lib/pipely/aws/data_pipeline/pipeline.rb b/lib/pipely/aws/data_pipeline/pipeline.rb index 09dd327..2a8521e 100644 --- a/lib/pipely/aws/data_pipeline/pipeline.rb +++ b/lib/pipely/aws/data_pipeline/pipeline.rb @@ -1,4 +1,5 @@ require 'pipely/aws/data_pipeline/api' +require 'pipely/aws/emr/api' require 'pipely/aws/data_pipeline/component' module Pipely @@ -9,9 +10,43 @@ class Pipeline def initialize(pipeline_id) @api = Pipely::DataPipeline::Api.instance.client + @emr_api = Pipely::Emr::Api.instance @id = pipeline_id end + def emr_step_for_component(component_id) + instance = Component.new(@id, component_id).active_instances + component_hadoop_call = evaluate_expression('#{step}', instance.id) + + emr_step = @emr_api.step_details_for_cluster( + emr_cluster[:id] + ).find do |step| + cfg = step[:step][:config] + step_hadoop_call = cfg[:jar] + ',' + cfg[:args].join(',') + step_hadoop_call == component_hadoop_call + end.data[:step] + + { + id: emr_step[:id], + name: emr_step[:name], + status: emr_step[:status] + } + + rescue AWS::DataPipeline::Errors::InvalidRequestException + $stderr.puts "Couldn't find a corresponding EMR step for that component" + end + + def emr_cluster + cluster_id = Pipely::DataPipeline::Component.new( + @id, + get_components_of_type('EmrCluster').first + ).active_instances.id + + @emr_api.client.list_clusters.data[:clusters].find do |cluster| + cluster[:name] == @id + '_' + cluster_id + end + end + def log_paths ids = get_components_of_type('ShellCommandActivity') diff --git a/lib/pipely/aws/emr/api.rb b/lib/pipely/aws/emr/api.rb index d929adb..7b2a11c 100644 --- a/lib/pipely/aws/emr/api.rb +++ b/lib/pipely/aws/emr/api.rb @@ -19,6 +19,12 @@ def initialize @client = AWS::EMR.new.client end + + def step_details_for_cluster(cluster_id) + client.list_steps(cluster_id: cluster_id).data[:steps].map do |step| + client.describe_step(cluster_id: cluster_id, step_id: step[:id]) + end + end end end diff --git a/lib/pipely/aws/emr/cluster.rb b/lib/pipely/aws/emr/cluster.rb deleted file mode 100644 index c345cb5..0000000 --- a/lib/pipely/aws/emr/cluster.rb +++ /dev/null @@ -1,20 +0,0 @@ -require 'pipely/aws_client' - -module Pipely - - module Emr - - class InstanceGroup < Pipely::AWSClient - - def initialize(instance_group_id) - super() - - @id = instance_group_id - end - - def log_paths - end - - end - end -end From 0473619a17593a5bbcaa9c1014fc90fdb9b5d694 Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Thu, 24 Apr 2014 16:02:52 -0400 Subject: [PATCH 6/7] link attempts to emr steps --- lib/pipely/actions/list_log_paths.rb | 8 ++-- lib/pipely/aws/data_pipeline/api.rb | 1 + lib/pipely/aws/data_pipeline/attempt.rb | 49 +++++++++++++++++++++++ lib/pipely/aws/data_pipeline/component.rb | 24 ----------- lib/pipely/aws/data_pipeline/instance.rb | 22 +++++++++- lib/pipely/aws/data_pipeline/pipeline.rb | 33 ++------------- lib/pipely/aws/emr/api.rb | 39 ++++++++++++++++-- 7 files changed, 114 insertions(+), 62 deletions(-) diff --git a/lib/pipely/actions/list_log_paths.rb b/lib/pipely/actions/list_log_paths.rb index 1c2149c..39c25df 100644 --- a/lib/pipely/actions/list_log_paths.rb +++ b/lib/pipely/actions/list_log_paths.rb @@ -16,8 +16,8 @@ def execute if @options.object_id $stdout.puts "\nLog paths for object:" $stdout.puts PP.pp(log_paths_for_component, "") - $stdout.puts "\nEMR step for object:" - $stdout.puts PP.pp(emr_step_for_component, "") + $stdout.puts "\nEMR steps for object:" + $stdout.puts PP.pp(emr_steps_for_component, "") else $stdout.puts PP.pp(log_paths, "") end @@ -25,8 +25,8 @@ def execute private - def emr_step_for_component - data_pipeline.emr_step_for_component(@options.object_id) + def emr_steps_for_component + data_pipeline.emr_steps_for_component(@options.object_id) end def log_paths_for_component diff --git a/lib/pipely/aws/data_pipeline/api.rb b/lib/pipely/aws/data_pipeline/api.rb index 4924b24..0f50aed 100644 --- a/lib/pipely/aws/data_pipeline/api.rb +++ b/lib/pipely/aws/data_pipeline/api.rb @@ -19,6 +19,7 @@ def initialize @client = AWS::DataPipeline.new.client end + end end diff --git a/lib/pipely/aws/data_pipeline/attempt.rb b/lib/pipely/aws/data_pipeline/attempt.rb index eb2515c..11578ca 100644 --- a/lib/pipely/aws/data_pipeline/attempt.rb +++ b/lib/pipely/aws/data_pipeline/attempt.rb @@ -1,4 +1,5 @@ require 'pipely/aws/data_pipeline/api' +require 'pipely/aws/emr/api' module Pipely @@ -6,13 +7,61 @@ module DataPipeline class Attempt + attr_accessor :id + def initialize(pipeline_id, attempt_id) @api = Pipely::DataPipeline::Api.instance.client + @emr_api = Pipely::Emr::Api.instance @id = attempt_id @pipeline_id = pipeline_id end + def details + @details ||= @api.describe_objects( + object_ids: [@id], + pipeline_id: @pipeline_id, + )[:pipeline_objects].first + end + + def emr_step + steps = @emr_api.find_emr_steps(emr_cluster[:id], hadoop_call) + + return steps.first if steps.size == 1 + + steps.find { |step| error_message.include?(step[:name]) } + end + + def hadoop_call + evaluate_expression('#{step}') + + rescue AWS::DataPipeline::Errors::InvalidRequestException + $stderr.puts "No hadoop step call for attempt #{@id}" + nil + end + + def emr_cluster + cluster_name = @pipeline_id + '_' + resource_name + @emr_api.find_cluster_by_name(cluster_name) + end + + def error_message + details[:fields].find { |field| field[:key] == 'errorMessage' }[:string_value] + end + + def resource_name + details[:fields].find { |field| field[:key] == '@resource' }[:ref_value] + end + + def evaluate_expression(expression) + @api.evaluate_expression({ + expression: expression, + object_id: @id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + end end end diff --git a/lib/pipely/aws/data_pipeline/component.rb b/lib/pipely/aws/data_pipeline/component.rb index be4681a..8218d4d 100644 --- a/lib/pipely/aws/data_pipeline/component.rb +++ b/lib/pipely/aws/data_pipeline/component.rb @@ -14,33 +14,9 @@ def initialize(pipeline_id, component_id) @pipeline_id = pipeline_id end - def attempts - query = { - selectors: [ { - field_name: '@componentParent', - operator: { - type: 'REF_EQ', - values: [@id] - } - } ] - } - - @api.query_objects( - pipeline_id: @pipeline_id, - sphere: 'ATTEMPT', - query: query - )[:ids].map do |id| - Pipely::DataPipeline::Attempt.new( - @pipeline_id, - id - ) - end - end - def active_instances Pipely::DataPipeline::Instance.new( @pipeline_id, - @id, evaluate_expression('#{@activeInstances}') ) end diff --git a/lib/pipely/aws/data_pipeline/instance.rb b/lib/pipely/aws/data_pipeline/instance.rb index 759a0fa..c3b4d9f 100644 --- a/lib/pipely/aws/data_pipeline/instance.rb +++ b/lib/pipely/aws/data_pipeline/instance.rb @@ -1,4 +1,5 @@ require 'pipely/aws/data_pipeline/api' +require 'pipely/aws/data_pipeline/attempt' module Pipely @@ -8,11 +9,10 @@ class Instance attr_accessor :id - def initialize(pipeline_id, component_id, instance_id) + def initialize(pipeline_id, instance_id) @api = Pipely::DataPipeline::Api.instance.client @id = instance_id - @component_id = component_id @pipeline_id = pipeline_id end @@ -29,6 +29,24 @@ def log_paths nil end + def attempts + query = { + selectors: [ { + field_name: '@instanceParent', + operator: { + type: 'REF_EQ', + values: [@id] + } + } ] + } + + @api.query_objects( + pipeline_id: @pipeline_id, + sphere: 'ATTEMPT', + query: query + )[:ids].map { |id| Pipely::DataPipeline::Attempt.new(@pipeline_id, id) } + end + def evaluate_expression(expression) @api.evaluate_expression({ expression: expression, diff --git a/lib/pipely/aws/data_pipeline/pipeline.rb b/lib/pipely/aws/data_pipeline/pipeline.rb index 2a8521e..119d7a1 100644 --- a/lib/pipely/aws/data_pipeline/pipeline.rb +++ b/lib/pipely/aws/data_pipeline/pipeline.rb @@ -14,36 +14,11 @@ def initialize(pipeline_id) @id = pipeline_id end - def emr_step_for_component(component_id) - instance = Component.new(@id, component_id).active_instances - component_hadoop_call = evaluate_expression('#{step}', instance.id) + def emr_steps_for_component(component_id) + attempts = Component.new(@id, component_id).active_instances.attempts - emr_step = @emr_api.step_details_for_cluster( - emr_cluster[:id] - ).find do |step| - cfg = step[:step][:config] - step_hadoop_call = cfg[:jar] + ',' + cfg[:args].join(',') - step_hadoop_call == component_hadoop_call - end.data[:step] - - { - id: emr_step[:id], - name: emr_step[:name], - status: emr_step[:status] - } - - rescue AWS::DataPipeline::Errors::InvalidRequestException - $stderr.puts "Couldn't find a corresponding EMR step for that component" - end - - def emr_cluster - cluster_id = Pipely::DataPipeline::Component.new( - @id, - get_components_of_type('EmrCluster').first - ).active_instances.id - - @emr_api.client.list_clusters.data[:clusters].find do |cluster| - cluster[:name] == @id + '_' + cluster_id + attempts.map do |attempt| + { attempt: attempt.id, emr_step: attempt.emr_step } end end diff --git a/lib/pipely/aws/emr/api.rb b/lib/pipely/aws/emr/api.rb index 7b2a11c..e6f8721 100644 --- a/lib/pipely/aws/emr/api.rb +++ b/lib/pipely/aws/emr/api.rb @@ -18,12 +18,45 @@ def initialize configure @client = AWS::EMR.new.client + @cluster_steps = {} end - def step_details_for_cluster(cluster_id) - client.list_steps(cluster_id: cluster_id).data[:steps].map do |step| - client.describe_step(cluster_id: cluster_id, step_id: step[:id]) + # return info about the emr steps that ran in this cluster + # with a particular hadoop jar file and arguments + def find_emr_steps(cluster_id, hadoop_call) + steps = describe_all_steps(cluster_id).find_all do |step| + cfg = step[:step][:config] + step_hadoop_call = cfg[:jar] + ',' + cfg[:args].join(',') + step_hadoop_call == hadoop_call end + + steps.map do |step| + { + id: step.data[:step][:id], + name: step.data[:step][:name], + status: step.data[:step][:status], + } + end + end + + def find_cluster_by_name(name) + @cluster_list ||= client.list_clusters + + @cluster_list.data[:clusters].find do |cluster| + cluster[:name] == name + end + end + + # gets details on all steps for a cluster + # memoizes by cluster_id + def describe_all_steps(cluster_id) + return @cluster_steps[cluster_id] if @cluster_steps[cluster_id] + + @cluster_steps[cluster_id] = ( + client.list_steps(cluster_id: cluster_id).data[:steps].map do |step| + client.describe_step(cluster_id: cluster_id, step_id: step[:id]) + end + ) end end From 308090acecc1179c0f9ff0cde2e205a732f73f52 Mon Sep 17 00:00:00 2001 From: Bart Flaherty Date: Fri, 25 Apr 2014 17:23:13 -0400 Subject: [PATCH 7/7] -s option, given an EmrActivity, finds a log path for each attempt --- lib/pipely/aws/data_pipeline/attempt.rb | 23 ++++++++++++++++++++--- lib/pipely/aws/data_pipeline/instance.rb | 20 +++++++++++++++++++- lib/pipely/aws/data_pipeline/pipeline.rb | 6 +----- lib/pipely/aws/emr/api.rb | 20 +++++++++++++------- 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/lib/pipely/aws/data_pipeline/attempt.rb b/lib/pipely/aws/data_pipeline/attempt.rb index 11578ca..6f6b4e9 100644 --- a/lib/pipely/aws/data_pipeline/attempt.rb +++ b/lib/pipely/aws/data_pipeline/attempt.rb @@ -24,11 +24,26 @@ def details )[:pipeline_objects].first end + def emr_step_logs + steps = @emr_api.describe_all_steps(emr_cluster[:id]) + log_uri = emr_cluster[:log_uri] + step_id_for_this_attempt = emr_step[:id] + + steps.reverse.each_with_index do |step, i| + log_suffix = "#{emr_cluster[:id]}/steps/#{i+1}/" + + if step[:id] == step_id_for_this_attempt + return log_uri + log_suffix + end + end + + nil + end + def emr_step steps = @emr_api.find_emr_steps(emr_cluster[:id], hadoop_call) return steps.first if steps.size == 1 - steps.find { |step| error_message.include?(step[:name]) } end @@ -41,8 +56,10 @@ def hadoop_call end def emr_cluster - cluster_name = @pipeline_id + '_' + resource_name - @emr_api.find_cluster_by_name(cluster_name) + @emr_cluster ||= ( + cluster_name = @pipeline_id + '_' + resource_name + @emr_api.find_cluster_by_name(cluster_name) + ) end def error_message diff --git a/lib/pipely/aws/data_pipeline/instance.rb b/lib/pipely/aws/data_pipeline/instance.rb index c3b4d9f..0906559 100644 --- a/lib/pipely/aws/data_pipeline/instance.rb +++ b/lib/pipely/aws/data_pipeline/instance.rb @@ -17,6 +17,15 @@ def initialize(pipeline_id, instance_id) end def log_paths + if type == 'ShellCommandActivity' + stderr_and_stdout + elsif type == 'EmrActivity' + # sleep inbetween attempts to avoid a throttling exception from AWS API + attempts.map { |attempt| logs = attempt.emr_step_logs; sleep 1 ; logs } + end + end + + def stderr_and_stdout stderr, stdout = evaluate_expression( '#{stderr + "," + stdout}', ).split(',') @@ -25,10 +34,19 @@ def log_paths rescue AWS::DataPipeline::Errors::InvalidRequestException => ex $stderr.puts ex.inspect - $stderr.puts "Can't find log paths for #{@id}" + $stderr.puts "No stderr and stdout fields for ShellCommandActivity #{@id}" nil end + def type + @type ||= evaluate_expression('#{type}') + end + + def emr_steps + return if type != 'EmrActivity' + attempts.map { |attempt| { attempt: attempt.id, emr_step: attempt.emr_step } } + end + def attempts query = { selectors: [ { diff --git a/lib/pipely/aws/data_pipeline/pipeline.rb b/lib/pipely/aws/data_pipeline/pipeline.rb index 119d7a1..3aadb2b 100644 --- a/lib/pipely/aws/data_pipeline/pipeline.rb +++ b/lib/pipely/aws/data_pipeline/pipeline.rb @@ -15,11 +15,7 @@ def initialize(pipeline_id) end def emr_steps_for_component(component_id) - attempts = Component.new(@id, component_id).active_instances.attempts - - attempts.map do |attempt| - { attempt: attempt.id, emr_step: attempt.emr_step } - end + Component.new(@id, component_id).active_instances.emr_steps end def log_paths diff --git a/lib/pipely/aws/emr/api.rb b/lib/pipely/aws/emr/api.rb index e6f8721..b875835 100644 --- a/lib/pipely/aws/emr/api.rb +++ b/lib/pipely/aws/emr/api.rb @@ -19,32 +19,37 @@ def initialize @client = AWS::EMR.new.client @cluster_steps = {} + @clusters = {} end # return info about the emr steps that ran in this cluster # with a particular hadoop jar file and arguments def find_emr_steps(cluster_id, hadoop_call) steps = describe_all_steps(cluster_id).find_all do |step| - cfg = step[:step][:config] + cfg = step[:config] step_hadoop_call = cfg[:jar] + ',' + cfg[:args].join(',') step_hadoop_call == hadoop_call end steps.map do |step| { - id: step.data[:step][:id], - name: step.data[:step][:name], - status: step.data[:step][:status], + id: step[:id], + name: step[:name], + status: step[:status], } end end def find_cluster_by_name(name) + return @clusters[name] if @clusters[name] + @cluster_list ||= client.list_clusters - @cluster_list.data[:clusters].find do |cluster| + cluster_id = @cluster_list.data[:clusters].find do |cluster| cluster[:name] == name - end + end[:id] + + @clusters[name] = client.describe_cluster(cluster_id: cluster_id)[:cluster] end # gets details on all steps for a cluster @@ -54,10 +59,11 @@ def describe_all_steps(cluster_id) @cluster_steps[cluster_id] = ( client.list_steps(cluster_id: cluster_id).data[:steps].map do |step| - client.describe_step(cluster_id: cluster_id, step_id: step[:id]) + client.describe_step(cluster_id: cluster_id, step_id: step[:id]).data[:step] end ) end + end end