giraffi / fluent-plugin-amqp

Use AMQP broker to send or receive messages via FluentD
MIT License
15 stars 31 forks source link

Malformed messages result in message buffer queues to fill up #25

Closed warmfusion closed 8 years ago

warmfusion commented 8 years ago

I'm not 100% on the sequence of events that result in this scenario, but heres what I think happens...

  1. Fluent steadily sends messages to AMQ
  2. Something causes that output process to delay enough that the messages write to disk for a little while
  3. The buffered messages contain an event which cannot be properly parsed which get rejected and pushed back into the buffer to be retried
  4. The loop of retrying un-parseable messages causes any further events to get stuck

    Configuration

2016-08-24 08:47:37 +0000 [info]: starting fluentd-0.12.7
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-amqp' version '0.8.1'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-amqp' version '0.8.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-bigquery' version '0.2.12'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-buffer-lightening' version '0.0.2'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-record-reformer' version '0.7.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluentd' version '0.12.7'
2016-08-24 08:47:38 +0000 [info]: using configuration file: <ROOT>
  <filter **>
    type record_transformer
    enable_ruby false
    <record>
      input_tag ${tag}
      last_tag ${tag_parts[-1]}
      hostname sourceserver.priv.example.org
    </record>
  </filter>
  <match **>
    type amqp
    tag_key true
    exchange fluent.fanout
    exchange_type fanout
    hosts ["rmq-tc-vif-01.priv.example.org","rmq-tc-vif-02.priv.example.org"]
    port 5672
    vhost fluent
    user fluent.writer
    pass this_is_a_password_yup
    buffer_type file
    buffer_path /var/log/fluent/matcher-forwarder.*.buffer
    buffer_chunk_limit 8m
    buffer_queue_limit 128
    flush_interval 5s
    retry_wait 10s
  </match>
  <source>
    type tail
    format json
    time_tag timestamp
    tag app_api.access
    path /var/log/nginx/access.log
    pos_file /var/log/fluent/app_api-access.pos
  </source>
  <source>
    type forward
    port 24224
    bind 127.0.0.1
  </source>
  <source>
    type monitor_agent
    bind 127.0.0.1
    port 24220
  </source>
</ROOT>

Message showing error

2016-08-24 08:47:40 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2016-08-24 08:47:48 +0000 error_class="JSON::GeneratorError" error="source sequence is illegal/malformed utf-8" plugin_id="object:14c3514"
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:223:in `generate'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:223:in `generate'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:394:in `dump'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluent-plugin-amqp-0.8.1/lib/fluent/plugin/out_amqp.rb:83:in `block in write'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:117:in `each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:117:in `block in msgpack_each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/plugin/buf_file.rb:64:in `open'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:114:in `msgpack_each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluent-plugin-amqp-0.8.1/lib/fluent/plugin/out_amqp.rb:82:in `write'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:325:in `write_chunk'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:304:in `pop'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/output.rb:321:in `try_flush'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/output.rb:140:in `run'
warmfusion commented 8 years ago

Looks like i might be able to emulate this failure mode using a simple test json object based on this stackoverflow query

sawanoboly commented 8 years ago

I Got what is issue. I think there are two ways, drop or force encode with warning.

warmfusion commented 8 years ago

Dropping with warnings might be the safest approach as forcing encoding might change the meaning of the messages being sent.

If the messages are dropped at least they can be reviewed and fixed by an end-user - just need to make sure they're dropped with a clear indication of why they were dropped.

warmfusion commented 8 years ago

Where about's should I be looking to apply changes?

I'm thinking it might be around here; https://github.com/giraffi/fluent-plugin-amqp/blob/master/lib/fluent/plugin/out_amqp.rb#L84-L90

Is it as simple as wrapping the JSON.dump in a begin/rescue and throwing a log warning for encoding errors?

sawanoboly commented 8 years ago

wrapping the JSON.dump in a begin/rescue and throwing a log

I seemed it's might better.

warmfusion commented 8 years ago

I'm not sure I understand what you mean...?

sawanoboly commented 8 years ago

Ah, sorry. Could you try to wrap JSON.dump ?

warmfusion commented 8 years ago

Going to manually hack this into some of our servers;

        begin
           data = JSON.dump( data ) unless data.is_a?( String )
           log.debug "Sending message #{data}, :key => #{routing_key( tag)} :headers => #{headers(tag,time)}"
           @exch.publish(data, :key => routing_key( tag ), :persistent => @persistent, :headers => headers( tag, time ))
        rescue JSON::GeneratorError => e
           log.error "Failure converting data object to json string: #{e.message}"
           # Debug only - otherwise we may pollute the fluent logs with unparseable events and loop
           log.debug "JSON.dump failure converting [#{data}]"
        end
sawanoboly commented 8 years ago

It looks good enough to avoid crash. Could you create new PR?