fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.92k stars 1.34k forks source link

filter_parser: Transition from single record to stream #4620

Open Athishpranav2003 opened 2 months ago

Athishpranav2003 commented 2 months ago

Which issue(s) this PR fixes: Fixes #4100

What this PR does / why we need it: Transition filter_parser from using filter_with_time to filter_stream logic Docs Changes: N/A Release Note: N/A

Athishpranav2003 commented 2 months ago

@daipom i have raised this as a draft. This needs other changes like changing the tests around this, exception handling and etc.

For now lets have discussion on the base logic and in meantime i will add them once we fix it

daipom commented 2 months ago

Thanks! I will see this this weekend!

Athishpranav2003 commented 2 months ago

@daipom thank you. The tests needs to be revamped for filter parser since the tests involve few instance variables. Do check it in freetime and let me know

Athishpranav2003 commented 2 months ago

@daipom any opinion on this logic? Should we go ahead with this?

daipom commented 2 months ago

@Athishpranav2003 Sorry for my late response. I got COVID-19 last weekend and only got a little work done this week :cry: I will see this now, please wait a few more days :cry:

daipom commented 2 months ago

It appears that adding some inner methods would improve code readability. For example ...

I guess we need to think a little better about handling error events...

Athishpranav2003 commented 2 months ago

@daipom yah I wanted to make sure we are in same page. I will refactor the code and change the error part after that

daipom commented 2 months ago

@daipom yah I wanted to make sure we are in same page. I will refactor the code and change the error part after that

Thanks! This would be the right direction!

Athishpranav2003 commented 2 months ago

@daipom you can check this now

Athishpranav2003 commented 2 months ago

@daipom I guess the CI test fail is not related to this change

daipom commented 2 months ago

Thanks!! I will see this!

I guess the CI test fail is not related to this change

Yes, They are not related.

Athishpranav2003 commented 2 months ago

@daipom https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/test/plugin/test_filter_parser.rb#L659-L666 could you please point which emit_event is this call part of

Athishpranav2003 commented 2 months ago

After doing some verification https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/lib/fluent/plugin/filter.rb#L82-L104

The emit error event is in line 90/99. This implies that earlier we raise the exception in filter one record and capture in the filter stream. Now since we have implemented stream directly so the raise not needed

Failure: test_call_emit_error_event_when_parser_error(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"data"=>"{\"time\":[], \"f1\":\"v1\"}"}, #<Fluent::Plugin::Parser::ParserError: value must be a string or a number: [](Array)>)
  Defined expectations:
    should_receive(:emit_error_event).with("String", Integer, Hash, Fluent::Plugin::Parser::ParserError).once.
  <false> is not true.
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/test_unit_integration.rb:57:in `make_assertion'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/test_unit_integration.rb:65:in `check'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core_class_methods.rb:104:in `check'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/expectation_director.rb:41:in `call'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:150:in `block in method_missing'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:289:in `flexmock_wrap'
/home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/core.rb:146:in `method_missing'
(eval at /home/aggressive_racer1/projects/fluentd/vendor/bundle/ruby/3.3.0/gems/flexmock-2.3.8/lib/flexmock/partial_mock.rb:429):3:in `emit_error_event'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:90:in `rescue in block in filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:86:in `block in filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/event.rb:110:in `each'
/home/aggressive_racer1/projects/fluentd/lib/fluent/plugin/filter.rb:85:in `filter_stream'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/filter.rb:49:in `block (2 levels) in instance_hook_after_started'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:43:in `feed_to_plugin'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:87:in `feed'
/home/aggressive_racer1/projects/fluentd/test/plugin/test_filter_parser.rb:664:in `block in test_call_emit_error_event_when_parser_error'
     661:     flexmock(d.instance.router).should_receive(:emit_error_event).
     662:       with("String", Integer, Hash, ParserError).once
     663:     d.run do
  => 664:       d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
     665:     end
     666:   end
     667: 
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:204:in `block in run_actual'
/usr/share/ruby/timeout.rb:186:in `block in timeout'
/usr/share/ruby/timeout.rb:41:in `handle_timeout'
/usr/share/ruby/timeout.rb:195:in `timeout'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:203:in `run_actual'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base.rb:96:in `run'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/base_owner.rb:130:in `run'
/home/aggressive_racer1/projects/fluentd/lib/fluent/test/driver/event_feeder.rb:39:in `run'
/home/aggressive_racer1/projects/fluentd/test/plugin/test_filter_parser.rb:663:in `test_call_emit_error_event_when_parser_error'

I used the following jugad way to find the function call

daipom commented 2 months ago

Now since we have implemented stream directly so the raise not needed

Yes, we don't need raise in the new implementation. However, router.emit_error_event should be called as before.

daipom commented 2 months ago

https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/test/plugin/test_filter_parser.rb#L659-L666

could you please point which emit_event is this call part of

In the current implementation:

https://github.com/fluent/fluentd/blob/78a7972bfe6b421f08472701f04f00515ed24bee/lib/fluent/plugin/filter_parser.rb#L114-L119

In the new implementation:

https://github.com/fluent/fluentd/blob/9ec5a95c964cb2eb6340b7538e25c8547d4bbaec/lib/fluent/plugin/filter_parser.rb#L105-L108

This change is OK because emit_error_event is called similarly after all.

Athishpranav2003 commented 2 months ago

@daipom i am not sure what is this issue

Failure: test_filter_key_not_exist(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"foo"=>"bar"}, #<ArgumentError: data does not exist>)
  Defined expectations:
    should_receive(:emit_error_event).with(String, Integer, Hash, #<ArgumentError: data does not exist>).once.
  <false> is not true.

For now i have pushed the alternative code. Now the functionality looks visibly same as before. Could you please let me know which part is still ambigious and needs tests?

daipom commented 2 months ago

@daipom i am not sure what is this issue

Failure: test_filter_key_not_exist(ParserFilterTest):
  in mock 'flexmock(Fluent::Test::Driver::TestEventRouter)': no matching handler found for emit_error_event("test", 1272942121, {"foo"=>"bar"}, #<ArgumentError: data does not exist>)
  Defined expectations:
    should_receive(:emit_error_event).with(String, Integer, Hash, #<ArgumentError: data does not exist>).once.
  <false> is not true.

I see. This test code specifies the content of expected arguments too strictly. It appears that this requires all values of the instance match, but the stacktrace data can be different. We should soften the conditions.

diff --git a/test/plugin/test_filter_parser.rb b/test/plugin/test_filter_parser.rb
index ec36c836..4201cec8 100644
--- a/test/plugin/test_filter_parser.rb
+++ b/test/plugin/test_filter_parser.rb
@@ -650,7 +650,7 @@ class ParserFilterTest < Test::Unit::TestCase
   def test_filter_key_not_exist
     d = create_driver(CONFIG_NOT_IGNORE)
     flexmock(d.instance.router).should_receive(:emit_error_event).
-      with(String, Integer, Hash, ArgumentError.new("data does not exist")).once
+      with(String, Integer, Hash, ArgumentError).once
     assert_nothing_raised {
       d.run do
         d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'})
Athishpranav2003 commented 2 months ago

Got it @daipom Will make the changes in sometime

daipom commented 2 months ago

Thanks!

Athishpranav2003 commented 2 months ago

@daipom i have addressed the comments. Additionally i felt there was a redundant rescue in the prev code so i have removed that as well. Is it fine now?

daipom commented 2 months ago

I made another PR to add some test cases for abnormal cases.

Let's rebase after it is merged.

I confirmed the test results on my local for this PR. Basically, there is no problem, but a few tests fail. It is because this PR removes the following rescue, and it changes the error message a little bit.

https://github.com/fluent/fluentd/blob/51b860b1be2eb59076d706fda12d55657a614c8f/lib/fluent/plugin/filter_parser.rb#L114-L119

With this, I believe we should stop removing this rescue, even if this is redundant. It is preferable to keep the error message as unchanged as possible because it could cause a disadvantage to existing users.

The result of #4638 tests applied to this PR:

Failure: test_parser_error[invalid format with default](ParserFilterTest::abnormal cases)
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:546:in `run_and_assert'
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:691:in `test_parser_error'
     688:         </parse>
     689:       EOC
     690: 
  => 691:       run_and_assert(driver, **data.except(:additional_config))
     692:     end
     693: 
     694:     data(
<[[],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "value must be a string or a number: [](Array)"]]]> expected but was
<[[],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "parse failed value must be a string or a number: [](Array)"]]]>

diff:
  [[],
   [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
   [[Fluent::Plugin::Parser::ParserError,
?    "parse failed value must be a string or a number: [](Array)"]]]
======================================================================================================
F
======================================================================================================
Failure: test_parser_error[mixed valid and invalid with default](ParserFilterTest::abnormal cases)
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:546:in `run_and_assert'
/home/daipom/work/fluentd/fluentd/test/plugin/test_filter_parser.rb:691:in `test_parser_error'
     688:         </parse>
     689:       EOC
     690: 
  => 691:       run_and_assert(driver, **data.except(:additional_config))
     692:     end
     693: 
     694:     data(
<[[{"f1"=>"v1"}],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "value must be a string or a number: [](Array)"]]]> expected but was
<[[{"f1"=>"v1"}],
 [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
 [[Fluent::Plugin::Parser::ParserError,
   "parse failed value must be a string or a number: [](Array)"]]]>

diff:
  [[{"f1"=>"v1"}],
   [{"data"=>"{\"time\":[], \"f1\":\"v1\"}"}],
   [[Fluent::Plugin::Parser::ParserError,
?    "parse failed value must be a string or a number: [](Array)"]]]
Athishpranav2003 commented 2 months ago

@daipom thanks for this help. I had difficulty in figuring out the edgecases. Thanks for doing it along with this to make it move ahead

Athishpranav2003 commented 2 months ago

@daipom i have reverted the change with redundant rescue. Can you point out the change in the error messages if its still present in this pr?

daipom commented 2 months ago

@ashie @kenhys Which version should we release this on?

This solves the limitation of #4474, which is released on v1.17.0. It's ambiguous whether it's a bug fix or an enhancement.

I think it is possible to release this on v1.17. On the other hand, since this is a large fix for filter_parser, 1.18 may be safer, given the risk of regression.