giraffi / fluent-plugin-amqp

Use AMQP broker to send or receive messages via FluentD
MIT License
15 stars 31 forks source link

Failed to write data into buffer by buffer overflow #30

Open stachdude opened 8 years ago

stachdude commented 8 years ago

This issue is a continuation of an closed, but not resolved issue on fluentd project which can be viewed here https://github.com/fluent/fluentd/issues/1218

Summarizing - While using a forward output, of around 3k eps streaming data, on receiving end which is configured to accept and send to rabbitmq cluster I get:

2016-09-12 11:06:27 +0000 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-09-12 11:06:27 +0000 [info]: starting fluentd-0.14.6
2016-09-12 11:06:27 +0000 [info]: spawn command to main: /usr/bin/ruby2.3 -Eascii-8bit:ascii-8bit /usr/local/bin/fluentd -c /fluentd/etc/fluent.conf -p /fluentd/plugins --under-supervisor
2016-09-12 11:06:27 +0000 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-09-12 11:06:27 +0000 [info]: starting fluentd-0.14.6 without supervision
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-amqp' version '0.9.1'
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-filter-geoip' version '0.3.0'
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-kafka' version '0.3.1'
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-multiprocess' version '0.2.1'
2016-09-12 11:06:27 +0000 [info]: gem 'fluentd' version '0.14.6'
2016-09-12 11:06:27 +0000 [info]: gem 'fluentd' version '0.14.3'
2016-09-12 11:06:27 +0000 [info]: gem 'fluentd' version '0.12.29'
2016-09-12 11:06:27 +0000 [info]: adding match pattern="**.**" type="amqp"
2016-09-12 11:06:27 +0000 [info]: adding source type="forward"
2016-09-12 11:06:27 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type forward
    port 24224
  </source>
  <match **.**>
    @type amqp
    host "amqp.host"
    port 5672
    user "fluent"
    pass xxxxxx
    vhost "/"
    exchange "fluent.receiver"
    exchange_type "topic"
    durable true
    tag_key true
    tag_header "Tag"
    time_header "ts"
    tls true
    tls_key ""
    tls_cert ""
    tls_ca_certificates [""]
    tls_verify_peer true
    num_threads 4
    @log_level "warn"
    <buffer>
      flush_mode interval
      retry_type exponential_backoff
      flush_thread_count 4
    </buffer>
    <inject>
      tag_key true
    </inject>
  </match>
</ROOT>
2016-09-12 11:06:27 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2016-09-12 11:09:43 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 11:09:43 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="<tag_here>"
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/buffer.rb:186:in `write'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:369:in `block in handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:610:in `write_guard'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:368:in `handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:600:in `execute_chunking'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:544:in `emit_buffered'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:90:in `emit_stream'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:344:in `on_message'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:231:in `block in handle_connection'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:272:in `block (3 levels) in read_messages'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:271:in `feed_each'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:271:in `block (2 levels) in read_messages'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:280:in `block in read_messages'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:559:in `on_read'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/io.rb:123:in `on_readable'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/io.rb:186:in `on_readable'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/loop.rb:88:in `run_once'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/loop.rb:88:in `run'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:190:in `run'
2016-09-12 11:09:43 +0000 [error]: unexpected error on reading data from client address="IP" error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
  2016-09-12 11:09:43 +0000 [error]: suppressed same stacktrace
2016-09-12 11:09:43 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 11:09:43 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fluent.warn"
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/buffer.rb:186:in `write'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:369:in `block in handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:610:in `write_guard'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:368:in `handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:600:in `execute_chunking'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:544:in `emit_buffered'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:90:in `emit_stream'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:81:in `emit'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:165:in `block in log_event_loop'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:163:in `each'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:163:in `log_event_loop'
2016-09-12 11:09:43 +0000 [error]: failed to emit fluentd's log event tag="fluent.warn" event={"message"=>"[Fluent::AMQPOutput] failed to write data into buffer by buffer overflow"} error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
2016-09-12 11:09:43 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 11:09:43 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fluent.warn"
warmfusion commented 8 years ago

Ok; reading through the other ticket, I suspect this is basically caused by a performance bottleneck on the out_amqp causing the in_forward to stack messages up in memory causing the system to fail.

Is the consumption rate fairly constant; could you use file buffering to keep some events to help balance the throughput.

    buffer_type file
    buffer_path /var/tmp/fluent.matcher*
    buffer_chunk_limit 8m
    buffer_queue_limit 128
stachdude commented 8 years ago

That didn't help..

2016-09-12 17:23:07 +0000 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-09-12 17:23:07 +0000 [info]: starting fluentd-0.14.6
2016-09-12 17:23:07 +0000 [info]: spawn command to main: /usr/bin/ruby2.3 -Eascii-8bit:ascii-8bit /usr/local/bin/fluentd -c /fluentd/etc/fluent.conf -p /fluentd/
plugins --under-supervisor
2016-09-12 17:23:07 +0000 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-09-12 17:23:07 +0000 [info]: starting fluentd-0.14.6 without supervision
2016-09-12 17:23:07 +0000 [info]: gem 'fluent-plugin-amqp' version '0.9.1'
2016-09-12 17:23:07 +0000 [info]: gem 'fluent-plugin-filter-geoip' version '0.3.0'
2016-09-12 17:23:07 +0000 [info]: gem 'fluent-plugin-kafka' version '0.3.1'
2016-09-12 17:23:07 +0000 [info]: gem 'fluent-plugin-multiprocess' version '0.2.1'
2016-09-12 17:23:07 +0000 [info]: gem 'fluentd' version '0.14.6'
2016-09-12 17:23:07 +0000 [info]: gem 'fluentd' version '0.14.3'
2016-09-12 17:23:07 +0000 [info]: gem 'fluentd' version '0.12.29'
2016-09-12 17:23:07 +0000 [info]: adding match pattern="**.**" type="amqp"
2016-09-12 17:23:07 +0000 [info]: adding source type="forward"
2016-09-12 17:23:07 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type forward
    port 24224
  </source>
  <match **.**>
    @type amqp
    host "amqp.host"
    port 5672
    user "fluent"
    pass xxxxxx
    vhost "/"
    exchange "fluent.receiver"
    exchange_type "topic"
    durable true
    tag_key true
    tag_header "Tag"
    time_header "ts"
    tls true
    tls_key ""
    tls_cert ""
    tls_ca_certificates [""]
    tls_verify_peer true
    num_threads 4
    buffer_type "file"
    buffer_path /var/tmp/fluent.matcher*
    buffer_chunk_limit 8m
    buffer_queue_limit 128
    @log_level "warn"
    <buffer>
      flush_mode interval
      retry_type exponential_backoff
      @type file
      path /var/tmp/fluent.matcher*
      flush_thread_count 4
      chunk_limit_size 8m
      queue_length_limit 128
    </buffer>
    <inject>
      tag_key true
    </inject>
  </match>
</ROOT>
2016-09-12 17:23:07 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2016-09-12 17:28:00 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 17:28:00 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 17:28:00 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="<tag_here>"
2016-09-12 17:28:00 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fl
uent.warn"
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/buffer.rb:186:in `write'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:369:in `block in handle_stream_simple'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:610:in `write_guard'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:368:in `handle_stream_simple'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:600:in `execute_chunking'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:544:in `emit_buffered'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:90:in `emit_stream'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:81:in `emit'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/buffer.rb:186:in `write'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:369:in `block in handle_stream_simple'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:165:in `block in log_event_loop'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:163:in `each'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:163:in `log_event_loop'
2016-09-12 17:28:00 +0000 [error]: failed to emit fluentd's log event tag="fluent.warn" event={"message"=>"[Fluent::AMQPOutput] failed to write data into buffer 
by buffer overflow"} error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:610:in `write_guard'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:368:in `handle_stream_simple'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:600:in `execute_chunking'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:544:in `emit_buffered'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:90:in `emit_stream'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:344:in `on_message'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:231:in `block in handle_connection'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:272:in `block (3 levels) in read_messages'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:271:in `feed_each'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:271:in `block (2 levels) in read_messages'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:280:in `block in read_messages'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:559:in `on_read'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/io.rb:123:in `on_readable'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/io.rb:186:in `on_readable'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/loop.rb:88:in `run_once'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/loop.rb:88:in `run'
  2016-09-12 17:28:00 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:190:in `run'
2016-09-12 17:28:00 +0000 [error]: unexpected error on reading data from client address="IP" error_class=Fluent::Plugin::Buffer::BufferOverflowError err
or="buffer space has too many data"
  2016-09-12 17:28:00 +0000 [error]: suppressed same stacktrace
2016-09-12 17:28:00 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 17:28:00 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fl
uent.warn"
  2016-09-12 17:28:00 +0000 [warn]: suppressed same stacktrace
2016-09-12 17:28:00 +0000 [error]: failed to emit fluentd's log event tag="fluent.warn" event={"message"=>"[Fluent::AMQPOutput] failed to write data into buffer 
by buffer overflow"} error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
2016-09-12 17:28:00 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 17:28:00 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fl
uent.warn"
warmfusion commented 8 years ago

Hmm...

I'd need to explore the specifics a bit more and create some sort of stress-test harness to reproduce the error.

having a read of the buffer code, it seems that error is raised when the 'staged' and 'queued' byte sizes is creater than the limit given by total_limit_size (Bytes) (default 512 * 1024 * 1024 or chunk * queue)

Maybe try setting total_limit_size in your buffer configuration to something larger than the default 512mb - however if the issue is simply a throughput concern theres not a whole lot that may be possible.

As far as I understand it, its not possible to provide back-pressure from output back to input to reduce the consumption rate.

Could you give me an example of the kind of message you're sending - are the messages a consistent length, are they bursting or fairly constant time, are they simple text or are there binary elements?

What does your AMQ broker show - can you compare the amount of messages going into the fowarder with the amount going into the broker - what rate gets sent to the broker?

warmfusion commented 8 years ago

@stachdude - Do you have any further information that might help answer my questions above?

stachdude commented 8 years ago

@warmfusion I will, just need couple more days to sort out some stuff at work first.

warmfusion commented 8 years ago

Can you try again using the latest versions of both Fluent and the fluent-ampq-plugin - Theres been quite a few changes in both systems in the last few months that might have solved your problem?

warmfusion commented 7 years ago

@stachdude - I'm not sure I can diagnose this issue with the information we've got so far.