Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
language: ruby
rvm:
- 2.3.0
- 2.2.0
- 2.1.0
- 2.0.0
- 1.9.3
- 2.4.2
- 2.3.5
- 2.2
- 2.1
before_install:
- gem update --remote bundler
51 changes: 34 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,33 @@ Or install it yourself as:

```
<match raw.apache.access>
type eval_filter
@type eval_filter
remove_tag_prefix raw
add_tag_prefix filtered

config1 @hostname = `hostname -s`.chomp

filter1 [[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'
<eval>
config @hostname = `hostname -s`.chomp
</eval>
<rule>
filter "[[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'"
</rule>
</match>
```

### require libraries
```
<match raw.apache.access>
type eval_filter
@type eval_filter
remove_tag_prefix raw
add_tag_prefix filtered
requires yaml # comma separated values

config1 @hostname = YAML.load({'hostname' => 'web01'})['hostname']

filter1 [[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'
<eval>
config @hostname = YAML.load({'hostname' => 'web01'})['hostname']
</eval>
<rule>
filter "[[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'"
</rule>
</match>
```

Expand All @@ -53,24 +59,32 @@ Should return [time, record].
### filter:

<filter **>
type eval
filter1 "[time, record] if record['status'] == '404'"
filter2 "[time, record] if record['status'] == 'POST'"
@type eval
<rule>
filter "[time, record] if record['status'] == '404'"
</rule>
<rule>
filter "[time, record] if record['status'] == 'POST'"
</rule>
</filter>


### typecast(string to integer):

<filter **>
type eval
filter1 "record['status'] = record['status'].to_i; [time, record]"
@type eval
<rule>
filter "record['status'] = record['status'].to_i; [time, record]"
</rule>
</filter>

### modify record(add value):

<filter **>
type eval
filter1 "record['user_id'] = record['message'].split(':').last.to_i; [time, record]"
@type eval
<rule>
filter "record['user_id'] = record['message'].split(':').last.to_i; [time, record]"
</rule>
</filter>

#### input
Expand All @@ -94,13 +108,16 @@ Should return [time, record].
Can not be used expression substitution.
```
<match raw.apache.access>
filter1 "#{tag}"
@type eval_rule
<rule>
filter "#{tag}"
</rule>
</match>
```

'#' Is interpreted as the beginning of a comment.
```
filter1 #=> "\""
filter #=> "\""
```

## Contributing
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-eval-filter.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]

spec.add_dependency "fluentd", "~> 0"
spec.add_dependency "fluentd", [">= 0.14.0", "< 2"]
spec.add_development_dependency "rake", "~> 0"
spec.add_development_dependency "test-unit", "~> 3.1.0"
end
52 changes: 25 additions & 27 deletions lib/fluent/plugin/filter_eval.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
module Fluent
require 'fluent/plugin/filter'

module Fluent::Plugin
class EvalFilter < Filter
Fluent::Plugin.register_filter('eval', self)

config_param :requires, :string, default: nil, :desc => "require libraries."
config_param :requires, :array, default: [], :desc => "require libraries."
config_section :rule, param_name: :filter_config, multi: true do
config_param :filter, :string
end
config_section :eval, param_name: :eval_config, multi: true do
config_param :config, :string
end

def initialize
super
Expand All @@ -12,7 +20,7 @@ def configure(conf)
super

if @requires
@requires.split(',').each do |lib|
@requires.each do |lib|
begin
require lib
rescue Exception => e
Expand All @@ -21,20 +29,20 @@ def configure(conf)
end
end

conf.keys.select { |key| key =~ /^config\d+$/ }.sort_by { |key| key.sub('config', '').to_i }.each do |key|
@eval_config.each do |conf|
begin
instance_eval("#{conf[key]}")
instance_eval("#{conf.config}")
rescue Exception => e
raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
raise Fluent::ConfigError, "#{key} #{conf.config}\n" + e.to_s
end
end

@filters = []
conf.keys.select { |key| key =~ /^filter\d+$/ }.sort_by { |key| key.sub('filter', '').to_i }.each do |key|
@filter_config.each do |conf|
begin
@filters << instance_eval("lambda do |tag, time, record| #{conf[key]} end")
@filters << instance_eval("lambda do |tag, time, record| #{conf.filter} end")
rescue Exception => e
raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
raise Fluent::ConfigError, "#{key} #{conf.filter}\n" + e.to_s
end
end

Expand All @@ -43,26 +51,16 @@ def configure(conf)
end
end

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each { |time, record|
begin
filtered_record = filter_record(tag, time, record)
new_es.add(*filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
def filter(tag, time, record)
begin
@filters.each do |filter|
filter_results = filter.call(tag, time, record)
return filter_results if filter_results
end
}
new_es
end

private
def filter_record(tag, time, record)
@filters.each do |filter|
filter_results = filter.call(tag, time, record)
return filter_results if filter_results
nil
rescue => e
router.emit_error_event(tag, time, record, e)
end
nil
end
end
end
33 changes: 18 additions & 15 deletions lib/fluent/plugin/out_eval_filter.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
class Fluent::EvalFilterOutput < Fluent::Output
require 'fluent/plugin/output'

class Fluent::Plugin::EvalFilterOutput < Fluent::Plugin::Output

Fluent::Plugin.register_output('eval_filter', self)

config_param :requires, :string, default: nil, :desc => "require libraries."
helpers :event_emitter

# Define `router` method of v0.12 to support v0.10 or earlier
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
config_param :requires, :array, default: [], :desc => "require libraries."
config_section :rule, param_name: :filter_config, multi: true do
config_param :filter, :string
end
config_section :eval, param_name: :eval_config, multi: true do
config_param :config, :string
end

def configure(conf)
super

if @requires
@requires.split(',').each do |lib|
@requires.each do |lib|
begin
require lib
rescue Exception => e
Expand All @@ -31,20 +35,20 @@ def configure(conf)
@add_tag_prefix = conf['add_tag_prefix']
@add_tag_suffix = conf['add_tag_suffix']

conf.keys.select { |key| key =~ /^config\d+$/ }.sort_by { |key| key.sub('config', '').to_i }.each do |key|
@eval_config.each do |conf|
begin
instance_eval("#{conf[key]}")
instance_eval("#{conf.config}")
rescue Exception => e
raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
raise Fluent::ConfigError, "#{key} #{conf.config}\n" + e.to_s
end
end

@filters = []
conf.keys.select { |key| key =~ /^filter\d+$/ }.sort_by { |key| key.sub('filter', '').to_i }.each do |key|
@filter_config.each do |conf|
begin
@filters << instance_eval("lambda do |tag, time, record| #{conf[key]} end")
@filters << instance_eval("lambda do |tag, time, record| #{conf.filter} end")
rescue Exception => e
raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
raise Fluent::ConfigError, "#{key} #{conf.filter}\n" + e.to_s
end
end

Expand All @@ -53,7 +57,7 @@ def configure(conf)
end
end

def emit(tag, es, chain)
def process(tag, es)
tag = handle_tag(tag)
es.each do |time, record|
results = filter_record(tag, time, record)
Expand All @@ -63,7 +67,6 @@ def emit(tag, es, chain)
end
end
end
chain.next
end

def handle_tag(tag)
Expand Down
2 changes: 2 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
exit e.status_code
end
require 'test/unit'
require 'fluent/test/helpers'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
Expand All @@ -26,4 +27,5 @@ def method_missing(method, *args)
require 'fluent/plugin/filter_eval'

class Test::Unit::TestCase
include Fluent::Test::Helpers
end
Loading