Open Athishpranav2003 opened 1 month ago
@daipom @ashie need some comments on this since its a very new compression method. I tried to make sure the existing support is not broken and added this feature additionally. Need to perform some more work but wanted to get some comments from you guys
Thanks! I will see this this weekend!
@daipom did u get chance to look at this?
Sorry, I haven't made time for this. :cry: I'll make time for it this week.
Its fine i guess, Even i was very busy in last 2 weeks :+1:
@Athishpranav2003 I'm sorry for my late response. :cry:
I have confirmed the entire direction! It's great! Thanks for starting this enhancement!
Sorry I haven't made time to see the detailed implementation, such as Compressable
module,
but the overall design of Fluentd would be essential for us now.
So, now, I comment on the overall direction of further modification.
It looks basically good as in_forward
support!
All that is left is to support Output/Buffer/Chunk logic for output plugins. (Currently, these logic assumes gzip
only)
To do so, it would be a good idea to support out_forward
first.
The core logic would be the following.
EventStream#to_compressed_msgpack_stream
Output#generate_format_proc
Chunk::Decompressable
It is complicated, but the compression of Buffer and Chunk and the flush process of output plugins are closely related. The following existing implementations may be helpful for how output plugins behaves:
@daipom thanks for the review.
Yah, this will kick a lot more changes in the overall events pipeline.
For now i have just added the compression support as a module in Compressible
. And used this in in-forward
. I will try to look at Buffer as well in meantime(might need more time since i got busy with a project). I guess maybe splitting it into multiple PRs will be easy since i can finish small chunks one by one(seperate functionality) and easy to review
@daipom can you please check the change in the Compression module as well. Once if we merge this we can proceed with the other development around this
@Athishpranav2003
can you please check the change in the Compression module as well
Sure! I will review the Compressable
module!
Once if we merge this we can proceed with the other development around this
To judge if we can merge this only with the support of the module and in_forward
, we need to reach an agreement on updating Forward Protocol Specification.
It would be necessary to update CompressedPackedForward Mode
.
By allowing zstd
as the value for the key compressed
, it would be possible to add Zstd support while keeping compatibility, but I'm not sure yet.
Sure @daipom
If it's needed then I can split the PR into 2 where this will only focus on zstd
and other one for in-forward
@daipom how to proceed in this PR?
Sorry, please give me a few more days 😢
bundle exec rake test TEST=test/plugin/test_compressable.rb 2856ms  Sun 03 Nov 2024 11:29:54 AM IST
/usr/bin/ruby -w -I"lib:test" -Eascii-8bit:ascii-8bit /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb "test/plugin/test_compressable.rb"
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/rubygems_ext.rb:250: warning: method redefined; discarding old encode_with
/usr/local/share/ruby/site_ruby/rubygems/dependency.rb:341: warning: previous definition of encode_with was here
/usr/share/ruby/bundled_gems.rb:75:in `require': libruby.so.3.2: cannot open shared object file: No such file or directory - /usr/lib64/gems/ruby/yajl-ruby-1.4.3/yajl/yajl.so (LoadError)
from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
from /usr/share/gems/gems/yajl-ruby-1.4.3/lib/yajl.rb:1:in `<top (required)>'
from /usr/share/ruby/bundled_gems.rb:75:in `require'
from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
from /home/aggressive_racer1/projects/fluentd/lib/fluent/config/literal_parser.rb:20:in `<top (required)>'
from /usr/share/ruby/bundled_gems.rb:75:in `require'
from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
from /home/aggressive_racer1/projects/fluentd/lib/fluent/config/element.rb:18:in `<top (required)>'
from /usr/share/ruby/bundled_gems.rb:75:in `require'
from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
from /home/aggressive_racer1/projects/fluentd/test/helper.rb:42:in `<top (required)>'
from /home/aggressive_racer1/projects/fluentd/test/plugin/test_compressable.rb:1:in `require_relative'
from /home/aggressive_racer1/projects/fluentd/test/plugin/test_compressable.rb:1:in `<top (required)>'
from /usr/share/ruby/bundled_gems.rb:75:in `require'
from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
from /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb:21:in `block in <main>'
from /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb:6:in `select'
from /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb:6:in `<main>'
rake aborted!
Command failed with status (1): [ruby -w -I"lib:test" -Eascii-8bit:ascii-8bit /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb "test/plugin/test_compressable.rb" ]
/usr/share/gems/gems/rake-13.2.1/exe/rake:27:in `<top (required)>'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli/exec.rb:58:in `load'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli/exec.rb:58:in `kernel_load'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli/exec.rb:23:in `run'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli.rb:455:in `exec'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor/command.rb:28:in `run'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor/invocation.rb:127:in `invoke_command'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor.rb:527:in `dispatch'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli.rb:35:in `dispatch'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor/base.rb:584:in `start'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli.rb:29:in `start'
/usr/share/gems/gems/bundler-2.5.16/exe/bundle:28:in `block in <top (required)>'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/friendly_errors.rb:117:in `with_friendly_errors'
/usr/share/gems/gems/bundler-2.5.16/exe/bundle:20:in `<top (required)>'
/usr/bin/bundle:25:in `load'
/usr/bin/bundle:25:in `<main>'
Tasks: TOP => test => base_test
(See full trace by running task with --trace)
@daipom i addressed your comments but couldnt test it locally due to some issue.
yajl-ruby
is already installed in my system but some other issue
I guess there is a small careless Problem is local testing is not working at all. Could help me with that later
I will check after 18th Got packed with exams now :(
Hmm, these changes would be necessary, but looks like the error in your environment is caused by another reason.
https://github.com/fluent/fluentd/pull/4657#issuecomment-2453298305
/usr/share/ruby/bundled_gems.rb:75:in `require': libruby.so.3.2: cannot open shared object file: No such file or directory - /usr/lib64/gems/ruby/yajl-ruby-1.4.3/yajl/yajl.so (LoadError)
I will check after 18th Got packed with exams now :(
OK! I'm sorry my response was so slow, and it took so long. Good luck with the exams!
@daipom the gem issue got resolved after update,prolly some mismatched versions,
All tests are passing now
@daipom some doubts here
in chuck open method for decompress we dont have any type identification. how to identify the chunk type while open/read method is called https://github.com/fluent/fluentd/blob/78a7972bfe6b421f08472701f04f00515ed24bee/lib/fluent/plugin/buffer/chunk.rb#L201-L226
@daipom some doubts here
in chuck open method for decompress we dont have any type identification. how to identify the chunk type while open/read method is called
I see! The current implementation is based on the following assumption.
Decompressable
moduleIf we add a compression format, we need to change something here. I discussed the direction with @Watson1978 and @kenhys. We believe the following direction seems to be a good one.
Decompressable
module for each compression format
GzipDecompressable
(Rename the current Decompressable
)ZstdDecompressable
The current implementation of Decompressable
looks a little weird to me.
I don't see the point of specifying the format in argument compressed
I don't want to change the signature for compatibility, but I believe a parameter like raw
would be more correct...
Anyway, we only need to consider the possibility that gzip
is specified for compressed
for GzipDecompressable
, and zstd
for ZstdDecompressable
.
(If a different compression format is specified to compressed
, the chunk should ignore it and decompress itself.)
Oh ok Let me again go through and check it
But what is that parameter compress
referring to?
@daipom seems like buffer
is working :)
Will go ahead and do other stuff
@daipom Can you check this out_file
once. Also how to go about UTs for it. Which all UTs should we write for zstd.
@daipom can you check the PR now?
@daipom added UTs as well. PR seems to be complete now let me know if i missed anything
Which issue(s) this PR fixes: Fixes #4162
What this PR does / why we need it: Adds new compression method support to handle messages Docs Changes: TODO Release Note: N/A