oleewere / fluent-plugin-azurestorage-gen2

Fluentd output plugin for Azure Datalake Storage Gen2 (append support)
MIT License
8 stars 4 forks source link

need help to Store fluentd logs in azure datalake as parquet.gz format #19

Open wasifshareef opened 4 months ago

wasifshareef commented 4 months ago

Hi,

I will appreciate if you can help me to conform if fluentd-plugin-azurestorage-gen2 supports fluentd logs shipped to azure datalake as perquet.gz format.

I tried with below configuration for store but it doesnot store files in perquet.gz . I believe perquet.gz format is supported with fluentd-s3 plugin

@type azurestorage_gen2 time_as_integer true ################################################################## azure_oauth_tenant_id xxxxxxxx azure_oauth_app_id xxxxxxxxx azure_oauth_secret xxxxxxxxx azure_oauth_identity_authority https://login.microsoftonline.com azure_storage_account xxxxx azure_container xxxxx auto_create_container true # store_as gzip store_as parquet compress parquet parquet_compression_codec gzip format single_value azure_object_key_format %{path}%{time_slice}/%{uuid_flush}.%{file_extension} path xxxxxx ################################################################## check_object false @type json @type file path /var/log/td-agent/azure-bro timekey 2m timekey_wait 1m timekey_use_utc true chunk_limit_size 10m flush_thread_count 8 flush_interval 10s queue_limit_length 32 flush_mode interval retry_max_interval 30 retry_forever true total_limit_size 15GB time_slice_format %Y/%m/%d/%H
wasifshareef commented 4 months ago

@oleewere , I will appreciate if you can provide some help here ,Thanks

oleewere commented 4 months ago

@wasifshareef - s3 has that implementation, here it does not but i will try to add that feature on the weekend

wasifshareef commented 4 months ago

@oleewere , Thanks very much. I appreciate your help and support here

oleewere commented 4 months ago

you can check this change: https://github.com/oleewere/fluent-plugin-azurestorage-gen2/pull/20 - maybe test the file on your code if you think

wasifshareef commented 4 months ago

Hi @oleewere , I was able to update below location with azurestorage_gen2_compressor_parquet.rb file you shared. Can you please share sample conf file to know how you updated the store as value there in conf file.

root@3982bba4b341:/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-azurestorage-gen2-0.3.5/lib/fluent/plugin# ls -lrt total 52 -rw-r--r-- 1 root root 33258 Jun 10 18:09 out_azurestorage_gen2.rb -rw-r--r-- 1 root root 792 Jun 10 18:09 azurestorage_gen2_compressor_lzo.rb -rw-r--r-- 1 root root 796 Jun 10 18:09 azurestorage_gen2_compressor_lzma2.rb -rw-r--r-- 1 root root 1280 Jun 10 18:09 azurestorage_gen2_compressor_gzip_command.rb -rw-rw-r-- 1 1000 1000 2932 Jun 23 05:33 azurestorage_gen2_compressor_parquet.rb root@3982bba4b341:/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-azurestorage-gen2-0.3.5/lib/fluent/plugin#

wasifshareef commented 4 months ago

Does this look good to you. Do we need to define schema type and file aswell ? . i tried with this conf parameters ,but infortunately its still save files in azure datalake as .txt . I will appreciate if you can provide your inputs here , Thanks!

     store_as                         parquet
     <compress>
             parquet_compression_codec gzip
             record_type json
           #   schema_type avro
           # schema_file /path/to/log.avsc
     </compress>
oleewere commented 4 months ago

did you see any of these logs? "Use 'text' instead" ? (as only textcompressor should use that extension) I wont have that much free time on the weekend so possibly validate this properly on the next week

wasifshareef commented 4 months ago

I have tried but its still sending logs to azure as normal txt files . Sure , that will be really helpful if you can test and update here I appreciate your support here . Thanks.

wasifshareef commented 4 months ago

Hi @oleewere , I will appreciate if you can help to fix the parquet format store issue

oleewere commented 4 months ago

hi @wasifshareef - in the next few days i will have more time so i think i can take a look

wasifshareef commented 4 months ago

@oleewere , Thanks very much, I appreciate your support here !

oleewere commented 4 months ago

@wasifshareef - moved changes to out_azurestorage_gen2.rb on the PR, can you check that implementation again ? also not it requires columnify to be installed (based on the s3 implementation)

wasifshareef commented 4 months ago

@oleewere , can you please confirm the syntax for having parquet format in fluentd.conf file , i tried to use the example fluentd,conf file you made available

@type dummy dummy {"hello":"world"} tag mytag

<match **> @type azurestorage_gen2 azure_storage_account mystorageabfs azure_container mycontainer azure_instance_msi /subscriptions/mysubscriptionid azure_client_id azure_object_key_format %{path}-%{index}.%{file_extension} azure_oauth_refresh_interval 3600 time_slice_format %Y%m%d-%H file_extension log # only used with store_as none path "/cluster-logs/myfolder/${tag[1]}-#{Socket.gethostname}-%M" auto_create_container true store_as parquet format single_value local_testing true local_testing_folder /fluentd/test
<buffer tag,time> @type file path /var/log/fluent/azurestorage-buffer timekey 1m timekey_wait 0s timekey_use_utc true chunk_limit_size 64m

However i am getting error :- /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-azurestorage-gen2-0.3.5/lib/fluent/plugin/out_azurestorage_gen2.rb:801:in configure': undefined methodparquet_compression_codec' for nil:NilClass (NoMethodError)

Can you please help here

Thanks.,

oleewere commented 4 months ago

@wasifshareef - updated the PR to provided an example with simple json (similar configs as for the s3 plugin)

wasifshareef commented 4 months ago

HI @oleewere , Thanks for providing an example . I am getting below error when i use example you mentioned

2024-07-07 15:29:50 +0000 [warn]: #0 bad chunk is moved to /tmp/fluent/backup/worker0/object_8ac/61ca9f6fde3786d894f458f6fe3b0b49.log 2024-07-07 15:29:55 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=Fluent::UnrecoverableError error="failed to execute columnify command. stdout= stderr=2024/07/07 15:29:55 Failed to write: reflect: call of reflect.Value.Type on zero Value\n status=#<Process::Status: pid 22805 exit 1>" 2024-07-07 15:29:55 +0000 [warn]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-azurestorage-gen2-0.3.5/lib/fluent/plugin/out_azurestorage_gen2.rb:834:in compress' 2024-07-07 15:29:55 +0000 [warn]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-azurestorage-gen2-0.3.5/lib/fluent/plugin/out_azurestorage_gen2.rb:165:inwrite' 2024-07-07 15:29:55 +0000 [warn]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.16.3/lib/fluent/plugin/output.rb:1225:in try_flush' 2024-07-07 15:29:55 +0000 [warn]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.16.3/lib/fluent/plugin/output.rb:1538:inflush_thread_run' 2024-07-07 15:29:55 +0000 [warn]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.16.3/lib/fluent/plugin/output.rb:510:in block (2 levels) in start' 2024-07-07 15:29:55 +0000 [warn]: #0 /opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.16.3/lib/fluent/plugin_helper/thread.rb:78:inblock in thread_create'

oleewere commented 4 months ago

@wasifshareef with the sample (+ installed columnify) it's looking good for me,

2024-07-07 15:57:04 +0000 [info]: #0 starting fluentd worker pid=36 ppid=8 worker=0
2024-07-07 15:57:04 +0000 [info]: #0 fluentd worker is now running worker=0
2024-07-07 15:57:11 +0000 [info]: #0 Compressed data written to local file: /fluentd/test/fluentd_output_1720367831.parquet
2024-07-07 15:57:21 +0000 [info]: #0 Compressed data written to local file: /fluentd/test/fluentd_output_1720367841.parquet

wondering if any version is different on your env that could matter ( ? ), seems the error coming from columnify when it called on the local tmp file