diff --git a/bin/pipely b/bin/pipely index 343559a..4c5d0c0 100755 --- a/bin/pipely +++ b/bin/pipely @@ -4,7 +4,10 @@ require 'pipely' options = Pipely::Options.parse -if options.pipeline_id +if options.pipeline_id && options.list_log_paths + Pipely::Actions::ListLogPaths.new(options).execute + +elsif options.pipeline_id Pipely::Actions::GraphLivePipeline.new(options).execute elsif options.input_path diff --git a/lib/pipely/actions.rb b/lib/pipely/actions.rb index 588dd26..c343521 100644 --- a/lib/pipely/actions.rb +++ b/lib/pipely/actions.rb @@ -1,6 +1,7 @@ require 'pipely/actions/graph_live_pipeline' require 'pipely/actions/graph_file_pipeline' require 'pipely/actions/list_live_pipelines' +require 'pipely/actions/list_log_paths' module Pipely module Actions diff --git a/lib/pipely/actions/list_log_paths.rb b/lib/pipely/actions/list_log_paths.rb new file mode 100644 index 0000000..39c25df --- /dev/null +++ b/lib/pipely/actions/list_log_paths.rb @@ -0,0 +1,47 @@ +require 'pp' +require 'pipely/aws/data_pipeline/pipeline' + +module Pipely + module Actions + + # List currently deployed pipelines + # + class ListLogPaths + + def initialize(options) + @options = options + end + + def execute + if @options.object_id + $stdout.puts "\nLog paths for object:" + $stdout.puts PP.pp(log_paths_for_component, "") + $stdout.puts "\nEMR steps for object:" + $stdout.puts PP.pp(emr_steps_for_component, "") + else + $stdout.puts PP.pp(log_paths, "") + end + end + + private + + def emr_steps_for_component + data_pipeline.emr_steps_for_component(@options.object_id) + end + + def log_paths_for_component + data_pipeline.log_paths_for_component(@options.object_id) + end + + def log_paths + data_pipeline.log_paths + end + + def data_pipeline + Pipely::DataPipeline::Pipeline.new(@options.pipeline_id) + end + + end + + end +end diff --git a/lib/pipely/aws/configure_aws.rb b/lib/pipely/aws/configure_aws.rb new file mode 100644 index 0000000..991c54f --- /dev/null +++ b/lib/pipely/aws/configure_aws.rb @@ -0,0 +1,34 @@ +require 'aws-sdk' + +module Pipely + + # Use AWS SDK to get information about a pipeline + module ConfigureAws + + class PipelyConfigNotFound < StandardError; end + + def configure + config = load_config + + AWS.config( + access_key_id: config[:access_key_id], + secret_access_key: config[:secret_access_key], + region: config[:region] + ) + end + + private + def load_config + path = File.expand_path('~/.pipely') + + if File.exist?(path) && data = File.open(path) + YAML.load(data) + else + raise PipelyConfigNotFound, + 'Need a .pipely file in home directory' + end + end + + end + +end diff --git a/lib/pipely/aws/data_pipeline/api.rb b/lib/pipely/aws/data_pipeline/api.rb new file mode 100644 index 0000000..0f50aed --- /dev/null +++ b/lib/pipely/aws/data_pipeline/api.rb @@ -0,0 +1,27 @@ +require_relative '../configure_aws' +require 'aws-sdk' +require 'singleton' + +module Pipely + + module DataPipeline + + class Api + include Pipely::ConfigureAws + include Singleton + + attr_accessor :client + + def initialize + super() + + configure + + @client = AWS::DataPipeline.new.client + end + + end + + end + +end diff --git a/lib/pipely/aws/data_pipeline/attempt.rb b/lib/pipely/aws/data_pipeline/attempt.rb new file mode 100644 index 0000000..6f6b4e9 --- /dev/null +++ b/lib/pipely/aws/data_pipeline/attempt.rb @@ -0,0 +1,84 @@ +require 'pipely/aws/data_pipeline/api' +require 'pipely/aws/emr/api' + +module Pipely + + module DataPipeline + + class Attempt + + attr_accessor :id + + def initialize(pipeline_id, attempt_id) + @api = Pipely::DataPipeline::Api.instance.client + @emr_api = Pipely::Emr::Api.instance + + @id = attempt_id + @pipeline_id = pipeline_id + end + + def details + @details ||= @api.describe_objects( + object_ids: [@id], + pipeline_id: @pipeline_id, + )[:pipeline_objects].first + end + + def emr_step_logs + steps = @emr_api.describe_all_steps(emr_cluster[:id]) + log_uri = emr_cluster[:log_uri] + step_id_for_this_attempt = emr_step[:id] + + steps.reverse.each_with_index do |step, i| + log_suffix = "#{emr_cluster[:id]}/steps/#{i+1}/" + + if step[:id] == step_id_for_this_attempt + return log_uri + log_suffix + end + end + + nil + end + + def emr_step + steps = @emr_api.find_emr_steps(emr_cluster[:id], hadoop_call) + + return steps.first if steps.size == 1 + steps.find { |step| error_message.include?(step[:name]) } + end + + def hadoop_call + evaluate_expression('#{step}') + + rescue AWS::DataPipeline::Errors::InvalidRequestException + $stderr.puts "No hadoop step call for attempt #{@id}" + nil + end + + def emr_cluster + @emr_cluster ||= ( + cluster_name = @pipeline_id + '_' + resource_name + @emr_api.find_cluster_by_name(cluster_name) + ) + end + + def error_message + details[:fields].find { |field| field[:key] == 'errorMessage' }[:string_value] + end + + def resource_name + details[:fields].find { |field| field[:key] == '@resource' }[:ref_value] + end + + def evaluate_expression(expression) + @api.evaluate_expression({ + expression: expression, + object_id: @id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + + end + end +end diff --git a/lib/pipely/aws/data_pipeline/component.rb b/lib/pipely/aws/data_pipeline/component.rb new file mode 100644 index 0000000..8218d4d --- /dev/null +++ b/lib/pipely/aws/data_pipeline/component.rb @@ -0,0 +1,34 @@ +require 'pipely/aws/data_pipeline/instance' +require 'pipely/aws/data_pipeline/attempt' +require 'pipely/aws/data_pipeline/api' + +module Pipely + + module DataPipeline + + class Component + + def initialize(pipeline_id, component_id) + @api = Pipely::DataPipeline::Api.instance.client + @id = component_id + @pipeline_id = pipeline_id + end + + def active_instances + Pipely::DataPipeline::Instance.new( + @pipeline_id, + evaluate_expression('#{@activeInstances}') + ) + end + + def evaluate_expression(expression) + @api.evaluate_expression({ + expression: expression, + object_id: @id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + end + end +end diff --git a/lib/pipely/aws/data_pipeline/instance.rb b/lib/pipely/aws/data_pipeline/instance.rb new file mode 100644 index 0000000..0906559 --- /dev/null +++ b/lib/pipely/aws/data_pipeline/instance.rb @@ -0,0 +1,78 @@ +require 'pipely/aws/data_pipeline/api' +require 'pipely/aws/data_pipeline/attempt' + +module Pipely + + module DataPipeline + + class Instance + + attr_accessor :id + + def initialize(pipeline_id, instance_id) + @api = Pipely::DataPipeline::Api.instance.client + + @id = instance_id + @pipeline_id = pipeline_id + end + + def log_paths + if type == 'ShellCommandActivity' + stderr_and_stdout + elsif type == 'EmrActivity' + # sleep inbetween attempts to avoid a throttling exception from AWS API + attempts.map { |attempt| logs = attempt.emr_step_logs; sleep 1 ; logs } + end + end + + def stderr_and_stdout + stderr, stdout = evaluate_expression( + '#{stderr + "," + stdout}', + ).split(',') + + { stderr: stderr, stdout: stdout } + + rescue AWS::DataPipeline::Errors::InvalidRequestException => ex + $stderr.puts ex.inspect + $stderr.puts "No stderr and stdout fields for ShellCommandActivity #{@id}" + nil + end + + def type + @type ||= evaluate_expression('#{type}') + end + + def emr_steps + return if type != 'EmrActivity' + attempts.map { |attempt| { attempt: attempt.id, emr_step: attempt.emr_step } } + end + + def attempts + query = { + selectors: [ { + field_name: '@instanceParent', + operator: { + type: 'REF_EQ', + values: [@id] + } + } ] + } + + @api.query_objects( + pipeline_id: @pipeline_id, + sphere: 'ATTEMPT', + query: query + )[:ids].map { |id| Pipely::DataPipeline::Attempt.new(@pipeline_id, id) } + end + + def evaluate_expression(expression) + @api.evaluate_expression({ + expression: expression, + object_id: @id, + pipeline_id: @pipeline_id, + })[:evaluated_expression] + end + + end + end +end diff --git a/lib/pipely/aws/data_pipeline/pipeline.rb b/lib/pipely/aws/data_pipeline/pipeline.rb new file mode 100644 index 0000000..3aadb2b --- /dev/null +++ b/lib/pipely/aws/data_pipeline/pipeline.rb @@ -0,0 +1,71 @@ +require 'pipely/aws/data_pipeline/api' +require 'pipely/aws/emr/api' +require 'pipely/aws/data_pipeline/component' + +module Pipely + + module DataPipeline + + class Pipeline + + def initialize(pipeline_id) + @api = Pipely::DataPipeline::Api.instance.client + @emr_api = Pipely::Emr::Api.instance + @id = pipeline_id + end + + def emr_steps_for_component(component_id) + Component.new(@id, component_id).active_instances.emr_steps + end + + def log_paths + ids = get_components_of_type('ShellCommandActivity') + + ids.inject({}) do |memo, component_id| + logs = log_paths_for_component(component_id) + memo[component_id] = logs if logs + + memo + end + end + + def log_paths_for_component(component_id) + Pipely::DataPipeline::Component.new( + @id, + component_id + ).active_instances.log_paths + end + + def get_components_of_type(type) + query_components( + selectors: [ + { + field_name: 'type', + operator: { + type: 'EQ', + values: [type], + } + } + ] + ) + end + + def query_components(query) + ids = @api.query_objects({ + pipeline_id: @id, + query: query, + sphere: 'COMPONENT', + })[:ids] + end + + def evaluate_expression(expression, object_id) + @api.evaluate_expression({ + expression: expression, + object_id: object_id, + pipeline_id: @id, + })[:evaluated_expression] + end + + end + end +end diff --git a/lib/pipely/aws/emr/api.rb b/lib/pipely/aws/emr/api.rb new file mode 100644 index 0000000..b875835 --- /dev/null +++ b/lib/pipely/aws/emr/api.rb @@ -0,0 +1,71 @@ +require_relative '../configure_aws' +require 'aws-sdk' +require 'singleton' + +module Pipely + + module Emr + + class Api + include Pipely::ConfigureAws + include Singleton + + attr_accessor :client + + def initialize + super() + + configure + + @client = AWS::EMR.new.client + @cluster_steps = {} + @clusters = {} + end + + # return info about the emr steps that ran in this cluster + # with a particular hadoop jar file and arguments + def find_emr_steps(cluster_id, hadoop_call) + steps = describe_all_steps(cluster_id).find_all do |step| + cfg = step[:config] + step_hadoop_call = cfg[:jar] + ',' + cfg[:args].join(',') + step_hadoop_call == hadoop_call + end + + steps.map do |step| + { + id: step[:id], + name: step[:name], + status: step[:status], + } + end + end + + def find_cluster_by_name(name) + return @clusters[name] if @clusters[name] + + @cluster_list ||= client.list_clusters + + cluster_id = @cluster_list.data[:clusters].find do |cluster| + cluster[:name] == name + end[:id] + + @clusters[name] = client.describe_cluster(cluster_id: cluster_id)[:cluster] + end + + # gets details on all steps for a cluster + # memoizes by cluster_id + def describe_all_steps(cluster_id) + return @cluster_steps[cluster_id] if @cluster_steps[cluster_id] + + @cluster_steps[cluster_id] = ( + client.list_steps(cluster_id: cluster_id).data[:steps].map do |step| + client.describe_step(cluster_id: cluster_id, step_id: step[:id]).data[:step] + end + ) + end + + end + + end + +end diff --git a/lib/pipely/options.rb b/lib/pipely/options.rb index 7268e0d..332202e 100644 --- a/lib/pipely/options.rb +++ b/lib/pipely/options.rb @@ -7,7 +7,8 @@ module Pipely class Options attr_accessor :pipeline_id, :input_path, :output_path, - :verbose, :automatic_open, :json_output, :latest_run + :verbose, :automatic_open, :json_output, :latest_run, + :object_id, :list_log_paths def self.parse options = Pipely::Options.new @@ -37,6 +38,18 @@ def self.parse opts.on("-j", "--json", "Write STDOUT formatted as JSON") do |json| options.json_output = json end + + opts.on("-s", "--logs [OBJECT_ID]", + "Print s3 log paths for an object") do |obj_id| + options.object_id = obj_id + options.list_log_paths = true + end + + opts.on_tail("-h", "--help", "Show this message") do + puts opts + exit + end + end.parse! options diff --git a/pipely.gemspec b/pipely.gemspec index e0871e8..f627cc3 100644 --- a/pipely.gemspec +++ b/pipely.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |s| s.add_dependency "unf" s.add_dependency "uuidtools" s.add_dependency "activesupport" + s.add_dependency "aws-sdk", "~> 1.38" s.add_development_dependency "rspec" s.add_development_dependency "cane" s.add_development_dependency "timecop"