Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bin/pipely
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ require 'pipely'

options = Pipely::Options.parse

if options.pipeline_id
if options.pipeline_id && options.list_log_paths
Pipely::Actions::ListLogPaths.new(options).execute

elsif options.pipeline_id
Pipely::Actions::GraphLivePipeline.new(options).execute

elsif options.input_path
Expand Down
1 change: 1 addition & 0 deletions lib/pipely/actions.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'pipely/actions/graph_live_pipeline'
require 'pipely/actions/graph_file_pipeline'
require 'pipely/actions/list_live_pipelines'
require 'pipely/actions/list_log_paths'

module Pipely
module Actions
Expand Down
47 changes: 47 additions & 0 deletions lib/pipely/actions/list_log_paths.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require 'pp'
require 'pipely/aws/data_pipeline/pipeline'

module Pipely
module Actions

# List currently deployed pipelines
#
class ListLogPaths

def initialize(options)
@options = options
end

def execute
if @options.object_id
$stdout.puts "\nLog paths for object:"
$stdout.puts PP.pp(log_paths_for_component, "")
$stdout.puts "\nEMR steps for object:"
$stdout.puts PP.pp(emr_steps_for_component, "")
else
$stdout.puts PP.pp(log_paths, "")
end
end

private

def emr_steps_for_component
data_pipeline.emr_steps_for_component(@options.object_id)
end

def log_paths_for_component
data_pipeline.log_paths_for_component(@options.object_id)
end

def log_paths
data_pipeline.log_paths
end

def data_pipeline
Pipely::DataPipeline::Pipeline.new(@options.pipeline_id)
end

end

end
end
34 changes: 34 additions & 0 deletions lib/pipely/aws/configure_aws.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require 'aws-sdk'

module Pipely

# Use AWS SDK to get information about a pipeline
module ConfigureAws

class PipelyConfigNotFound < StandardError; end

def configure
config = load_config

AWS.config(
access_key_id: config[:access_key_id],
secret_access_key: config[:secret_access_key],
region: config[:region]
)
end

private
def load_config
path = File.expand_path('~/.pipely')

if File.exist?(path) && data = File.open(path)
YAML.load(data)
else
raise PipelyConfigNotFound,
'Need a .pipely file in home directory'
end
end

end

end
27 changes: 27 additions & 0 deletions lib/pipely/aws/data_pipeline/api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
require_relative '../configure_aws'
require 'aws-sdk'
require 'singleton'

module Pipely

module DataPipeline

class Api
include Pipely::ConfigureAws
include Singleton

attr_accessor :client

def initialize
super()

configure

@client = AWS::DataPipeline.new.client
end

end

end

end
84 changes: 84 additions & 0 deletions lib/pipely/aws/data_pipeline/attempt.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
require 'pipely/aws/data_pipeline/api'
require 'pipely/aws/emr/api'

module Pipely

module DataPipeline

class Attempt

attr_accessor :id

def initialize(pipeline_id, attempt_id)
@api = Pipely::DataPipeline::Api.instance.client
@emr_api = Pipely::Emr::Api.instance

@id = attempt_id
@pipeline_id = pipeline_id
end

def details
@details ||= @api.describe_objects(
object_ids: [@id],
pipeline_id: @pipeline_id,
)[:pipeline_objects].first
end

def emr_step_logs
steps = @emr_api.describe_all_steps(emr_cluster[:id])
log_uri = emr_cluster[:log_uri]
step_id_for_this_attempt = emr_step[:id]

steps.reverse.each_with_index do |step, i|
log_suffix = "#{emr_cluster[:id]}/steps/#{i+1}/"

if step[:id] == step_id_for_this_attempt
return log_uri + log_suffix
end
end

nil
end

def emr_step
steps = @emr_api.find_emr_steps(emr_cluster[:id], hadoop_call)

return steps.first if steps.size == 1
steps.find { |step| error_message.include?(step[:name]) }
end

def hadoop_call
evaluate_expression('#{step}')

rescue AWS::DataPipeline::Errors::InvalidRequestException
$stderr.puts "No hadoop step call for attempt #{@id}"
nil
end

def emr_cluster
@emr_cluster ||= (
cluster_name = @pipeline_id + '_' + resource_name
@emr_api.find_cluster_by_name(cluster_name)
)
end

def error_message
details[:fields].find { |field| field[:key] == 'errorMessage' }[:string_value]
end

def resource_name
details[:fields].find { |field| field[:key] == '@resource' }[:ref_value]
end

def evaluate_expression(expression)
@api.evaluate_expression({
expression: expression,
object_id: @id,
pipeline_id: @pipeline_id,
})[:evaluated_expression]
end


end
end
end
34 changes: 34 additions & 0 deletions lib/pipely/aws/data_pipeline/component.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require 'pipely/aws/data_pipeline/instance'
require 'pipely/aws/data_pipeline/attempt'
require 'pipely/aws/data_pipeline/api'

module Pipely

module DataPipeline

class Component

def initialize(pipeline_id, component_id)
@api = Pipely::DataPipeline::Api.instance.client
@id = component_id
@pipeline_id = pipeline_id
end

def active_instances
Pipely::DataPipeline::Instance.new(
@pipeline_id,
evaluate_expression('#{@activeInstances}')
)
end

def evaluate_expression(expression)
@api.evaluate_expression({
expression: expression,
object_id: @id,
pipeline_id: @pipeline_id,
})[:evaluated_expression]
end

end
end
end
78 changes: 78 additions & 0 deletions lib/pipely/aws/data_pipeline/instance.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
require 'pipely/aws/data_pipeline/api'
require 'pipely/aws/data_pipeline/attempt'

module Pipely

module DataPipeline

class Instance

attr_accessor :id

def initialize(pipeline_id, instance_id)
@api = Pipely::DataPipeline::Api.instance.client

@id = instance_id
@pipeline_id = pipeline_id
end

def log_paths
if type == 'ShellCommandActivity'
stderr_and_stdout
elsif type == 'EmrActivity'
# sleep inbetween attempts to avoid a throttling exception from AWS API
attempts.map { |attempt| logs = attempt.emr_step_logs; sleep 1 ; logs }
end
end

def stderr_and_stdout
stderr, stdout = evaluate_expression(
'#{stderr + "," + stdout}',
).split(',')

{ stderr: stderr, stdout: stdout }

rescue AWS::DataPipeline::Errors::InvalidRequestException => ex
$stderr.puts ex.inspect
$stderr.puts "No stderr and stdout fields for ShellCommandActivity #{@id}"
nil
end

def type
@type ||= evaluate_expression('#{type}')
end

def emr_steps
return if type != 'EmrActivity'
attempts.map { |attempt| { attempt: attempt.id, emr_step: attempt.emr_step } }
end

def attempts
query = {
selectors: [ {
field_name: '@instanceParent',
operator: {
type: 'REF_EQ',
values: [@id]
}
} ]
}

@api.query_objects(
pipeline_id: @pipeline_id,
sphere: 'ATTEMPT',
query: query
)[:ids].map { |id| Pipely::DataPipeline::Attempt.new(@pipeline_id, id) }
end

def evaluate_expression(expression)
@api.evaluate_expression({
expression: expression,
object_id: @id,
pipeline_id: @pipeline_id,
})[:evaluated_expression]
end

end
end
end
Loading