diff --git a/.travis.yml b/.travis.yml
index 31bb44c..56c24b8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,9 +1,8 @@
language: ruby
rvm:
- - 2.3.0
- - 2.2.0
- - 2.1.0
- - 2.0.0
- - 1.9.3
+ - 2.4.2
+ - 2.3.5
+ - 2.2
+ - 2.1
before_install:
- gem update --remote bundler
diff --git a/README.md b/README.md
index 5490e2f..9ab52e8 100644
--- a/README.md
+++ b/README.md
@@ -18,27 +18,33 @@ Or install it yourself as:
```
- type eval_filter
+ @type eval_filter
remove_tag_prefix raw
add_tag_prefix filtered
- config1 @hostname = `hostname -s`.chomp
-
- filter1 [[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'
+
+ config @hostname = `hostname -s`.chomp
+
+
+ filter "[[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'"
+
```
### require libraries
```
- type eval_filter
+ @type eval_filter
remove_tag_prefix raw
add_tag_prefix filtered
requires yaml # comma separated values
- config1 @hostname = YAML.load({'hostname' => 'web01'})['hostname']
-
- filter1 [[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'
+
+ config @hostname = YAML.load({'hostname' => 'web01'})['hostname']
+
+
+ filter "[[tag, @hostname].join('.'), time, record] if record['method'] == 'GET'"
+
```
@@ -53,24 +59,32 @@ Should return [time, record].
### filter:
- type eval
- filter1 "[time, record] if record['status'] == '404'"
- filter2 "[time, record] if record['status'] == 'POST'"
+ @type eval
+
+ filter "[time, record] if record['status'] == '404'"
+
+
+ filter "[time, record] if record['status'] == 'POST'"
+
### typecast(string to integer):
- type eval
- filter1 "record['status'] = record['status'].to_i; [time, record]"
+ @type eval
+
+ filter "record['status'] = record['status'].to_i; [time, record]"
+
### modify record(add value):
- type eval
- filter1 "record['user_id'] = record['message'].split(':').last.to_i; [time, record]"
+ @type eval
+
+ filter "record['user_id'] = record['message'].split(':').last.to_i; [time, record]"
+
#### input
@@ -94,13 +108,16 @@ Should return [time, record].
Can not be used expression substitution.
```
- filter1 "#{tag}"
+ @type eval_rule
+
+ filter "#{tag}"
+
```
'#' Is interpreted as the beginning of a comment.
```
- filter1 #=> "\""
+ filter #=> "\""
```
## Contributing
diff --git a/fluent-plugin-eval-filter.gemspec b/fluent-plugin-eval-filter.gemspec
index 89ab3ef..8bf7f48 100644
--- a/fluent-plugin-eval-filter.gemspec
+++ b/fluent-plugin-eval-filter.gemspec
@@ -17,7 +17,7 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]
- spec.add_dependency "fluentd", "~> 0"
+ spec.add_dependency "fluentd", [">= 0.14.0", "< 2"]
spec.add_development_dependency "rake", "~> 0"
spec.add_development_dependency "test-unit", "~> 3.1.0"
end
diff --git a/lib/fluent/plugin/filter_eval.rb b/lib/fluent/plugin/filter_eval.rb
index 7cbfe0d..9b03be8 100644
--- a/lib/fluent/plugin/filter_eval.rb
+++ b/lib/fluent/plugin/filter_eval.rb
@@ -1,8 +1,16 @@
-module Fluent
+require 'fluent/plugin/filter'
+
+module Fluent::Plugin
class EvalFilter < Filter
Fluent::Plugin.register_filter('eval', self)
- config_param :requires, :string, default: nil, :desc => "require libraries."
+ config_param :requires, :array, default: [], :desc => "require libraries."
+ config_section :rule, param_name: :filter_config, multi: true do
+ config_param :filter, :string
+ end
+ config_section :eval, param_name: :eval_config, multi: true do
+ config_param :config, :string
+ end
def initialize
super
@@ -12,7 +20,7 @@ def configure(conf)
super
if @requires
- @requires.split(',').each do |lib|
+ @requires.each do |lib|
begin
require lib
rescue Exception => e
@@ -21,20 +29,20 @@ def configure(conf)
end
end
- conf.keys.select { |key| key =~ /^config\d+$/ }.sort_by { |key| key.sub('config', '').to_i }.each do |key|
+ @eval_config.each do |conf|
begin
- instance_eval("#{conf[key]}")
+ instance_eval("#{conf.config}")
rescue Exception => e
- raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
+ raise Fluent::ConfigError, "#{key} #{conf.config}\n" + e.to_s
end
end
@filters = []
- conf.keys.select { |key| key =~ /^filter\d+$/ }.sort_by { |key| key.sub('filter', '').to_i }.each do |key|
+ @filter_config.each do |conf|
begin
- @filters << instance_eval("lambda do |tag, time, record| #{conf[key]} end")
+ @filters << instance_eval("lambda do |tag, time, record| #{conf.filter} end")
rescue Exception => e
- raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
+ raise Fluent::ConfigError, "#{key} #{conf.filter}\n" + e.to_s
end
end
@@ -43,26 +51,16 @@ def configure(conf)
end
end
- def filter_stream(tag, es)
- new_es = MultiEventStream.new
- es.each { |time, record|
- begin
- filtered_record = filter_record(tag, time, record)
- new_es.add(*filtered_record) if filtered_record
- rescue => e
- router.emit_error_event(tag, time, record, e)
+ def filter(tag, time, record)
+ begin
+ @filters.each do |filter|
+ filter_results = filter.call(tag, time, record)
+ return filter_results if filter_results
end
- }
- new_es
- end
-
- private
- def filter_record(tag, time, record)
- @filters.each do |filter|
- filter_results = filter.call(tag, time, record)
- return filter_results if filter_results
+ nil
+ rescue => e
+ router.emit_error_event(tag, time, record, e)
end
- nil
end
end
end
diff --git a/lib/fluent/plugin/out_eval_filter.rb b/lib/fluent/plugin/out_eval_filter.rb
index ec90e5f..5483b1f 100644
--- a/lib/fluent/plugin/out_eval_filter.rb
+++ b/lib/fluent/plugin/out_eval_filter.rb
@@ -1,19 +1,23 @@
-class Fluent::EvalFilterOutput < Fluent::Output
+require 'fluent/plugin/output'
+
+class Fluent::Plugin::EvalFilterOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('eval_filter', self)
- config_param :requires, :string, default: nil, :desc => "require libraries."
+ helpers :event_emitter
- # Define `router` method of v0.12 to support v0.10 or earlier
- unless method_defined?(:router)
- define_method("router") { Fluent::Engine }
+ config_param :requires, :array, default: [], :desc => "require libraries."
+ config_section :rule, param_name: :filter_config, multi: true do
+ config_param :filter, :string
+ end
+ config_section :eval, param_name: :eval_config, multi: true do
+ config_param :config, :string
end
-
def configure(conf)
super
if @requires
- @requires.split(',').each do |lib|
+ @requires.each do |lib|
begin
require lib
rescue Exception => e
@@ -31,20 +35,20 @@ def configure(conf)
@add_tag_prefix = conf['add_tag_prefix']
@add_tag_suffix = conf['add_tag_suffix']
- conf.keys.select { |key| key =~ /^config\d+$/ }.sort_by { |key| key.sub('config', '').to_i }.each do |key|
+ @eval_config.each do |conf|
begin
- instance_eval("#{conf[key]}")
+ instance_eval("#{conf.config}")
rescue Exception => e
- raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
+ raise Fluent::ConfigError, "#{key} #{conf.config}\n" + e.to_s
end
end
@filters = []
- conf.keys.select { |key| key =~ /^filter\d+$/ }.sort_by { |key| key.sub('filter', '').to_i }.each do |key|
+ @filter_config.each do |conf|
begin
- @filters << instance_eval("lambda do |tag, time, record| #{conf[key]} end")
+ @filters << instance_eval("lambda do |tag, time, record| #{conf.filter} end")
rescue Exception => e
- raise Fluent::ConfigError, "#{key} #{conf[key]}\n" + e.to_s
+ raise Fluent::ConfigError, "#{key} #{conf.filter}\n" + e.to_s
end
end
@@ -53,7 +57,7 @@ def configure(conf)
end
end
- def emit(tag, es, chain)
+ def process(tag, es)
tag = handle_tag(tag)
es.each do |time, record|
results = filter_record(tag, time, record)
@@ -63,7 +67,6 @@ def emit(tag, es, chain)
end
end
end
- chain.next
end
def handle_tag(tag)
diff --git a/test/helper.rb b/test/helper.rb
index 55cce87..a683ff9 100644
--- a/test/helper.rb
+++ b/test/helper.rb
@@ -8,6 +8,7 @@
exit e.status_code
end
require 'test/unit'
+require 'fluent/test/helpers'
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
@@ -26,4 +27,5 @@ def method_missing(method, *args)
require 'fluent/plugin/filter_eval'
class Test::Unit::TestCase
+ include Fluent::Test::Helpers
end
diff --git a/test/plugin/test_filter_eval.rb b/test/plugin/test_filter_eval.rb
index 8110665..4a9446a 100644
--- a/test/plugin/test_filter_eval.rb
+++ b/test/plugin/test_filter_eval.rb
@@ -1,32 +1,36 @@
+# coding: utf-8
require 'helper'
+require 'fluent/test/driver/filter'
class EvalFilterTest < Test::Unit::TestCase
include Fluent
setup do
Fluent::Test.setup
- @time = Fluent::Engine.now
+ @time = event_time
end
def create_driver(conf = '')
- Test::FilterTestDriver.new(EvalFilter).configure(conf)
+ Test::Driver::Filter.new(Plugin::EvalFilter).configure(conf)
end
def filter(config, msgs)
d = create_driver(config)
- d.run {
+ d.run(default_tag: 'test') {
msgs.each {|msg|
- d.filter(msg, @time) # Filterプラグインにメッセージを通す
+ d.feed(@time, msg) # Filterプラグインにメッセージを通す
}
}
- filtered = d.filtered_as_array # 結果を受け取る. [tag, time, record]の配列
+ filtered = d.filtered.map{|e| e.last} # 結果を受け取る. [record]の配列
filtered
end
sub_test_case 'configure' do
test 'check default' do
config = %[
- filter1 [time, record] if record['status'] == '404'
+
+ filter "[time, record] if record['status'] == '404'"
+
]
assert_nothing_raised {
create_driver(config)
@@ -48,13 +52,17 @@ def filter(config, msgs)
{'status' => '401', 'message' => 'message'}
]
config = %[
- filter1 [time, record] if record['status'] == '404'
- filter2 [time, record] if record['status'] == '503'
+
+ filter "[time, record] if record['status'] == '404'"
+
+
+ filter "[time, record] if record['status'] == '503'"
+
]
es = filter(config, msgs)
assert_equal(es.size, 2)
- assert_equal(es[0][2]['status'], '404')
- assert_equal(es[1][2]['status'], '503')
+ assert_equal(es[0][1]['status'], '404')
+ assert_equal(es[1][1]['status'], '503')
end
end
@@ -68,15 +76,17 @@ def filter(config, msgs)
{'status' => '401', 'message' => 'message'}
]
config = %[
- filter1 record['status'] = record['status'].to_i; [time, record]
+
+ filter "record['status'] = record['status'].to_i; [time, record]"
+
]
es = filter(config, msgs)
assert_equal(es.size, 5)
- assert_equal(es[0][2]['status'], 301)
- assert_equal(es[1][2]['status'], 302)
- assert_equal(es[2][2]['status'], 404)
- assert_equal(es[3][2]['status'], 503)
- assert_equal(es[4][2]['status'], 401)
+ assert_equal(es[0][1]['status'], 301)
+ assert_equal(es[1][1]['status'], 302)
+ assert_equal(es[2][1]['status'], 404)
+ assert_equal(es[3][1]['status'], 503)
+ assert_equal(es[4][1]['status'], 401)
end
end
@@ -90,15 +100,17 @@ def filter(config, msgs)
{'status' => '401', 'message' => 'user_id:5'}
]
config = %[
- filter1 record['user_id'] = record['message'].split(':').last.to_i; [time, record]
+
+ filter "record['user_id'] = record['message'].split(':').last.to_i; [time, record]"
+
]
es = filter(config, msgs)
assert_equal(es.size, 5)
- assert_equal(es[0][2]['user_id'], 1)
- assert_equal(es[1][2]['user_id'], 2)
- assert_equal(es[2][2]['user_id'], 3)
- assert_equal(es[3][2]['user_id'], 4)
- assert_equal(es[4][2]['user_id'], 5)
+ assert_equal(es[0][1]['user_id'], 1)
+ assert_equal(es[1][1]['user_id'], 2)
+ assert_equal(es[2][1]['user_id'], 3)
+ assert_equal(es[3][1]['user_id'], 4)
+ assert_equal(es[4][1]['user_id'], 5)
end
end
@@ -106,7 +118,9 @@ def filter(config, msgs)
test 'require yaml' do
config = %[
requires yaml
- filter1 record.to_yaml; [time, record]
+
+ filter "record.to_yaml; [time, record]"
+
]
assert_nothing_raised {
create_driver(config)
@@ -115,14 +129,28 @@ def filter(config, msgs)
}
end
+ test 'require libraries with whitespace' do
+ d = create_driver(%[
+ requires yaml, time
+
+ filter "record.to_yaml; [Time.now.to_i, record]"
+
+ ])
+ assert_nothing_raised {
+ d.run(default_tag: 'test') { d.feed({'key' => 'value'}) }
+ }
+ end
+
test 'require error' do
config = %[
requires hoge
- filter1 record.to_yaml; [time, record]
+
+ filter "record.to_yaml; [time, record]"
+
]
assert_raise(Fluent::ConfigError) do
create_driver(config)
end
end
end
-end
\ No newline at end of file
+end
diff --git a/test/plugin/test_out_eval_filter.rb b/test/plugin/test_out_eval_filter.rb
index 9e6f751..6422f72 100644
--- a/test/plugin/test_out_eval_filter.rb
+++ b/test/plugin/test_out_eval_filter.rb
@@ -1,4 +1,5 @@
require 'helper'
+require 'fluent/test/driver/output'
class EvalFilterOutputTest < Test::Unit::TestCase
@@ -6,8 +7,8 @@ def setup
Fluent::Test.setup
end
- def create_driver(conf, tag = 'test')
- Fluent::Test::OutputTestDriver.new(Fluent::EvalFilterOutput, tag).configure(conf)
+ def create_driver(conf)
+ Fluent::Test::Driver::Output.new(Fluent::Plugin::EvalFilterOutput).configure(conf)
end
def test_configure
@@ -15,66 +16,78 @@ def test_configure
create_driver('')
end
assert_raise(Fluent::ConfigError) do
- create_driver(%[config1 @test = "\#{self.to_s}"])
+ create_driver(%[
+
+ config @test = "\#{self.to_s}"
+
+ ])
end
- assert_raise(Fluent::ConfigError) do
- create_driver(%[filter1 "\#{tag}"])
+ assert_raise(NameError) do
+ create_driver(%[
+
+ filter "\#{tag}"
+
+ ])
end
end
def test_remove_tag_prefix
d = create_driver(%[
remove_tag_prefix t1
- filter1 tag
- ], 't1.t2.t3')
+
+ filter tag
+
+ ])
- d.run { d.emit({}) }
+ d.run(default_tag: 't1.t2.t3') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 't2.t3', emits[0][0]
end
def test_remove_tag_suffix
d = create_driver(%[
remove_tag_suffix t3
- filter1 tag
- ], 't1.t2.t3')
+
+ filter tag
+
+ ])
- d.run { d.emit({}) }
+ d.run(default_tag: 't1.t2.t3') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 't1.t2', emits[0][0]
end
def test_add_tag_prefix
d = create_driver(%[
add_tag_prefix t0
- filter1 tag
- ], 't1.t2.t3')
+
+ filter tag
+
+ ])
- d.run { d.emit({}) }
+ d.run(default_tag: 't1.t2.t3') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 't0.t1.t2.t3', emits[0][0]
end
def test_add_tag_suffix
d = create_driver(%[
add_tag_suffix t4
- filter1 tag
- ], 't1.t2.t3')
+
+ filter tag
+
+ ])
- d.run { d.emit({}) }
+ d.run(default_tag: 't1.t2.t3') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 't1.t2.t3.t4', emits[0][0]
end
@@ -84,39 +97,42 @@ def test_handle_tag_all
remove_tag_suffix t3
add_tag_prefix t4
add_tag_suffix t5
- filter1 tag
- ], 't1.t2.t3')
+
+ filter tag
+
+ ])
- d.run { d.emit({}) }
+ d.run(default_tag: 't1.t2.t3') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 't4.t2.t5', emits[0][0]
end
def test_drop_all_filter
d = create_driver(%[
- filter1 nil
+
+ filter nil
+
])
- d.run { d.emit({}) }
+ d.run(default_tag: 't1.t2.t3') { d.feed({}) }
- emits = d.emits
- p emits
+ emits = d.events
assert_equal 0, emits.size
end
def test_modify_record_filter
d = create_driver(%[
- filter1 record.merge!({'key' => 'value'})
+
+ filter "record.merge!({'key' => 'value'})"
+
])
- d.run { d.emit({}) }
+ d.run(default_tag: 'test') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 'test', emits[0][0]
assert_equal 1, emits[0][2].size
assert_equal true, emits[0][2].key?('key')
@@ -125,15 +141,18 @@ def test_modify_record_filter
def test_replace_all_filter
d = create_driver(%[
- filter1 nil
- filter2 ['tag', 0, {'key' => 'value'}]
+
+ filter nil
+
+
+ filter "['tag', 0, {'key' => 'value'}]"
+
])
- d.run { d.emit({}) }
+ d.run(default_tag: 'test') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal 'tag', emits[0][0]
assert_equal 0, emits[0][1]
assert_equal 1, emits[0][2].size
@@ -143,24 +162,26 @@ def test_replace_all_filter
def test_conditional_filter
d = create_driver(%[
- filter1 [['http', tag].join('.'), record] if /^http:/.match(record['url'])
- filter2 (record['secure'] = true; [['https', tag].join('.'), record]) if /^https:/.match(record['url'])
+
+ filter "[['http', tag].join('.'), record] if /^http:/.match(record['url'])"
+
+
+ filter "(record['secure'] = true; [['https', tag].join('.'), record]) if /^https:/.match(record['url'])"
+
])
- d.run do
- d.emit({'url' => 'http://example.com/'})
- d.emit({'url' => 'https://example.com/'})
- d.emit({'url' => 'ftp://example.com/'})
+ d.run(default_tag: 'test') do
+ d.feed({'url' => 'http://example.com/'})
+ d.feed({'url' => 'https://example.com/'})
+ d.feed({'url' => 'ftp://example.com/'})
end
- emits = d.emits
+ emits = d.events
assert_equal 2, emits.size
- p emits[0]
assert_equal 'http.test', emits[0][0]
assert_equal 1, emits[0][2].size
assert_equal true, emits[0][2].key?('url')
assert_equal 'http://example.com/', emits[0][2]['url']
- p emits[1]
assert_equal 'https.test', emits[1][0]
assert_equal 2, emits[1][2].size
assert_equal true, emits[1][2].key?('url')
@@ -172,74 +193,74 @@ def test_conditional_filter
def test_reference_to_an_instance_variable_filter
hostname = `hostname -s`.chomp
d = create_driver(%[
- config1 @hostname = `hostname -s`.chomp
- filter1 [tag, @hostname].join('.')
+
+ config @hostname = `hostname -s`.chomp
+
+
+ filter "[tag, @hostname].join('.')"
+
])
- d.run { d.emit({}) }
+ d.run(default_tag: 'test') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 1, emits.size
- p emits[0]
assert_equal "test.#{hostname}", emits[0][0]
end
def test_amplify_tag_filter
d = create_driver(%[
- filter1 (1..3).map { |n| tag + n.to_s }.to_enum
+
+ filter "(1..3).map { |n| tag + n.to_s }.to_enum"
+
])
- d.run { d.emit({}) }
+ d.run(default_tag: 'test') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 3, emits.size
- p emits[0]
assert_equal "test1", emits[0][0]
- p emits[1]
assert_equal "test2", emits[1][0]
- p emits[2]
assert_equal "test3", emits[2][0]
end
def test_amplify_time_filter
d = create_driver(%[
- filter1 (1..3).map { |n| time + n }.to_enum
+
+ filter "(1..3).map { |n| time + n }.to_enum"
+
])
- d.run { d.emit({}) }
+ d.run(default_tag: 'test') { d.feed({}) }
- emits = d.emits
+ emits = d.events
assert_equal 3, emits.size
- p emits[0]
assert emits[0][1] > 0
- p emits[1]
assert_equal emits[0][1] + 1, emits[1][1]
- p emits[2]
assert_equal emits[1][1] + 1, emits[2][1]
end
def test_amplify_record_filter
d = create_driver(%[
- filter1 (1..3).map { |n| record.merge({'n' => n}) }.to_enum
+
+ filter "(1..3).map { |n| record.merge({'n' => n}) }.to_enum"
+
])
- d.run { d.emit({'key' => 'value'}) }
+ d.run(default_tag: 'test') { d.feed({'key' => 'value'}) }
- emits = d.emits
+ emits = d.events
assert_equal 3, emits.size
- p emits[0]
assert_equal 2, emits[0][2].size
assert_equal true, emits[0][2].key?('key')
assert_equal 'value', emits[0][2]['key']
assert_equal true, emits[0][2].key?('n')
assert_equal 1, emits[0][2]['n']
- p emits[1]
assert_equal 2, emits[1][2].size
assert_equal true, emits[1][2].key?('key')
assert_equal 'value', emits[1][2]['key']
assert_equal true, emits[1][2].key?('n')
assert_equal 2, emits[1][2]['n']
- p emits[2]
assert_equal 2, emits[2][2].size
assert_equal true, emits[2][2].key?('key')
assert_equal 'value', emits[2][2]['key']
@@ -249,24 +270,23 @@ def test_amplify_record_filter
def test_split_record_filter
d = create_driver(%[
- filter1 record.map { |key, value| [[tag, key].join('.'), {'key' => value}] }.to_enum
+
+ filter "record.map { |key, value| [[tag, key].join('.'), {'key' => value}] }.to_enum"
+
])
- d.run { d.emit({'key1' => 'value1', 'key2' => 'value2', 'key3' => 'value3'}) }
+ d.run(default_tag: 'test') { d.feed({'key1' => 'value1', 'key2' => 'value2', 'key3' => 'value3'}) }
- emits = d.emits
+ emits = d.events
assert_equal 3, emits.size
- p emits[0]
assert_equal 'test.key1', emits[0][0]
assert_equal 1, emits[0][2].size
assert_equal true, emits[0][2].key?('key')
assert_equal 'value1', emits[0][2]['key']
- p emits[1]
assert_equal 'test.key2', emits[1][0]
assert_equal 1, emits[1][2].size
assert_equal true, emits[1][2].key?('key')
assert_equal 'value2', emits[1][2]['key']
- p emits[2]
assert_equal 'test.key3', emits[2][0]
assert_equal 1, emits[2][2].size
assert_equal true, emits[2][2].key?('key')
@@ -275,22 +295,21 @@ def test_split_record_filter
def test_split_array_in_record_filter
d = create_driver(%[
- filter1 record['array'].map { |v| {'value' => v} }.to_enum
+
+ filter "record['array'].map { |v| {'value' => v} }.to_enum"
+
])
- d.run { d.emit({'array' => ['test1', 'test2', 'test3']}) }
+ d.run(default_tag: 'test') { d.feed({'array' => ['test1', 'test2', 'test3']}) }
- emits = d.emits
+ emits = d.events
assert_equal 3, emits.size
- p emits[0]
assert_equal 1, emits[0][2].size
assert_equal true, emits[0][2].key?('value')
assert_equal 'test1', emits[0][2]['value']
- p emits[1]
assert_equal 1, emits[1][2].size
assert_equal true, emits[1][2].key?('value')
assert_equal 'test2', emits[1][2]['value']
- p emits[2]
assert_equal 1, emits[2][2].size
assert_equal true, emits[2][2].key?('value')
assert_equal 'test3', emits[2][2]['value']
@@ -299,10 +318,24 @@ def test_split_array_in_record_filter
def test_require_libraries
d = create_driver(%[
requires yaml
- filter1 record.to_yaml; ['tag', 0, record]
+
+ filter "record.to_yaml; ['tag', 0, record]"
+
+ ])
+ assert_nothing_raised {
+ d.run(default_tag: 'test') { d.feed({'key' => 'value'}) }
+ }
+ end
+
+ def test_require_libraries_with_whitespace
+ d = create_driver(%[
+ requires yaml, time
+
+ filter "record.to_yaml; ['tag', Time.now.to_i, record]"
+
])
assert_nothing_raised {
- d.run { d.emit({'key' => 'value'}) }
+ d.run(default_tag: 'test') { d.feed({'key' => 'value'}) }
}
end
@@ -310,7 +343,9 @@ def test_require_error
assert_raise(Fluent::ConfigError) do
d = create_driver(%[
requires hoge
- filter1 record.to_yaml; ['tag', 0, record]
+
+ filter "record.to_yaml; ['tag', 0, record]"
+
])
end
end