Closed cosmo0920 closed 5 years ago
We can send events into different kinesis_streams with the following configuration:
<source>
@type tail
path "data.json"
tag "in.json"
read_from_head true
<parse>
@type "json"
</parse>
</source>
<match in.json>
@type kinesis_streams
stream_name "${$.key.nested}"
aws_key_id xxxxxx
aws_sec_key xxxxxx
<buffer $.key.nested>
@type "memory"
</buffer>
</match>
Sending data:
{"key":{"nested":"cosmo0920-fluentd-test1","message":"hoge"}}
{"key":{"nested":"cosmo0920-fluentd-test2","message":"huga"}}
{"key":{"nested":"cosmo0920-fluentd-test1","message":"hoo"}}
{"key":{"nested":"cosmo0920-fluentd-test2","message":"bar"}}
{"key":{"nested":"cosmo0920-fluentd-test2","message":"baz"}}
result:
cosmo0920-fluent-test1 #=> 2 events
cosmo0920-fluent-test2 #=> 3 events
I've confirmed that this PR works on real AWS environemnt (ap-northeast-1) and added test cases for built-in placeholders.
@riywo @simukappu Could you kindly take a look?
any update on this feature? 🤔
@mathetake Could you try this PR's patch in your AWS environment? I guess the plugin developers are waiting users feedback.
Just built this and gave it a try. Got this...
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/seahorse/client/plugins/raise_response_errors.rb:15:in
call'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/jsonvalue_converter.rb:20:in call' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/idempotency_token.rb:17:in
call'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/param_converter.rb:24:in call' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/response_paging.rb:10:in
call'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/seahorse/client/plugins/response_target.rb:23:in call' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/seahorse/client/request.rb:70:in
send_request'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-kinesis-1.19.0/lib/aws-sdk-kinesis/client.rb:1731:in put_records' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/out_kinesis_streams.rb:49:in
block in write'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis_helper/api.rb:91:in batch_request_with_retry' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis.rb:142:in
block (2 levels) in write_records_batch'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis_helper/api.rb:86:in split_to_batches' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis.rb:140:in
block in write_records_batch'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in open' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in
open'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis.rb:138:in write_records_batch' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/out_kinesis_streams.rb:45:in
write'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/output.rb:1122:in try_flush' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/output.rb:1428:in
flush_thread_run'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/output.rb:458:in block (2 levels) in start' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin_helper/thread.rb:78:in
block in thread_create'
2019-10-01 13:55:42 +0000 [warn]: #0 [out_kinesis_streams] failed to flush the buffer. retry_time=12 next_retry_seconds=2019-10-01 14:26:44 +0000 chunk="593d9a91629510cc781f68360c8c5119" error_class=Aws::Kinesis::Errors::ValidationException error="1 validation error detected: Value '${$.kubernetes.annotations.kinesisstream}' at 'streamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9.-]+"`
@roscoecairney Could you share your configuration? I guess that built-in placeholder is not working as expected.
Hi, sure thing. I'm pulling in docker logs, using the kubernetes filter to enrich them, grepping out the ones without a given annotation and then using that annotation for the kinesis stream name.
@type tail
@id in_tail_container_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
annotation_match [".*"]
</filter>
<filter kubernetes.**>
@type grep
<regexp>
key $.kubernetes.annotations.kinesis_stream
pattern /.+/
</regexp>
</filter>
<match kubernetes.**>
@type kinesis_streams
@id out_kinesis_streams
region "#{ENV['FLUENT_KINESIS_STREAMS_REGION'] || nil}"
aws_key_id "#{ENV['FLUENT_KINESIS_AWS_KEY_ID'] || nil}"
aws_sec_key "#{ENV['FLUENT_KINESIS_AWS_SEC_KEY'] || nil}"
stream_name "${$.kubernetes.annotations.kinesis_stream}"
<buffer>
flush_interval 1
chunk_limit_size "#{ENV['FLUENT_KINESIS_STREAMS_CHUNK_LIMIT_SIZE'] || '1m'}"
flush_thread_interval 0.1
flush_thread_burst_interval 0.01
flush_thread_count 15
</buffer>
</match>```
Aha! Got it. Could you try the following buffer configuration?
<buffer $.kubernetes.annotations.kinesis_stream>
flush_interval 1
chunk_limit_size "#{ENV['FLUENT_KINESIS_STREAMS_CHUNK_LIMIT_SIZE'] || '1m'}"
flush_thread_interval 0.1
flush_thread_burst_interval 0.01
flush_thread_count 15
</buffer>
Fluentd built-in placeholder requests to include target key information with buffer attributes.
Thanks @cosmo0920 - that's working perfectly. Hope this gets merged in soon.
@roscoecairney Thank you for your testing! @cosmo0920 Could you write these configuration examples into README to avoid these configuration issue?
Thanks for notifying it. I've added notes for built-in placeholders usage.
@cosmo0920 Could you confirm to the formal statement below?
"By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license."
We appreciate your understanding and strong contribution!
@cosmo0920 Could you confirm to the formal statement below?
"By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license."
Yep. I've written these patches under the terms of the Apache license 2.0. These patches are my original work. There is no referring GPL code. So it should be licensed as Apache license as well. I don't insist my patches as GPL 3.0 license.
Great, thank you! Finally, could you squash your commits into one or a few?
Sure. But this squashed merge documentation may help you: https://help.github.com/en/articles/configuring-commit-squashing-for-pull-requests
I've squashed this PR into one commit.
Thank you! Proceed to our checking. Please wait for a moment.
Related to https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/165.
We can handle to send different stream nams with extract_placeholders API which is the brand new Fluentd v1 API.
BTW, how to write test code for this feature?