jurek7 / logstash-input-mqtt

Logstash MQTT input plugin.
Other
24 stars 6 forks source link

Error when too many messages arrives #5

Open ghost opened 6 years ago

ghost commented 6 years ago

Hi,

I'm doing a lot of testes using MQTT (Mosquitto) and this plugin with logstash. Almost everything works fine. I've found one scenario where a strange behavior is found.

If I submit 10,000 messages (with logstash down), when it comes up, all the messages seems to be processed but after the last one I get this error which kills my logstash process:

[2018-05-02T17:43:31,164][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `error' for nil:NilClass>, :backtrace=>["/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/subscriber.rb:69:in `add_subscription'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/handler.rb:126:in `handle_suback'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/handler.rb:57:in `handle_packet'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/handler.rb:47:in `receive_packet'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:163:in `block in loop_read'", "org/jruby/RubyFixnum.java:299:in `times'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:161:in `loop_read'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:175:in `mqtt_loop'", "/Users/casmeiron/Developer/tools/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/paho-mqtt-1.0.7/lib/paho_mqtt/client.rb:128:in `block in daemon_mode'"]}
[2018-05-02T17:43:31,415][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException: (SystemExit) exit

I'm running logstash using -f parameter.

Follow my configuration sample:

input {
    mqtt {
        topic           => "/teste"
        host            => "localhost"
        username        => "aaa"
        password        => "bbb"
        client_id       => "teste"
        clean_session   => false
        qos             => 1
    }
}

output {
    stdout { codec => rubydebug }
}

Any thoughts?

Thanks in advance!

jurek7 commented 6 years ago

Hi @casmeiron,

I have couple of questions:

  1. How did you submit(and store) messages on Mosquitto ? I suppose that you used QoS, what level ?
  2. Could you please share mosquitto config and scripts for sending messages ?
  3. What is reproducibility rate of this issue ?
  4. Are you able to perform the same test with underlying MQTT client https://github.com/RubyDevInc/paho.mqtt.ruby/ ?
ghost commented 6 years ago

Hi @jurek7,

Sure I can answer these questions, thanks for writing me back.

How did you submit(and store) messages on Mosquitto ? I suppose that you used QoS, what level ?

I'm using eclipse-mosquitto docker (latest) image. The messages are stored using a simple java program (paho client).

    @Test
    void publish( ) throws Exception {
        MqttClient mqttClient = new MqttClient( "tcp://localhost:1883", "java-teste" );
        mqttClient.setCallback( this );

        MqttConnectOptions options = new MqttConnectOptions( );
        options.setAutomaticReconnect( true );
        options.setCleanSession( false );
        options.setUserName( "aaa" );
        options.setPassword( "bbb".toCharArray( ) );

        mqttClient.connect( options );

        MqttMessage msg;
        for ( int i = 0; i < 10000; i++ ) {
            msg = new MqttMessage( ( "{\"data\": " + i + "}" ).getBytes( ) );
            msg.setQos( 1 );
            mqttClient.publish( "/teste", msg );
        }

        mqttClient.disconnect( );
        mqttClient.close( false );
    }

As you can see in the code, I'm using QOS=1.

Could you please share mosquitto config and scripts for uploading messages ?

Sure, follow attached my mosquitto configuration file. (I had to rename it to .txt so github could allow the attachment). mosquitto.txt

What is reproducibility rate of this issue ?

100%, I just need to send some 10,000 messages (with my consumer [logstash] turned OFF) and then start it.

Are you able to perform the same test with underlying MQTT client https://github.com/RubyDevInc/paho.mqtt.ruby/ ?

I'm not a ruby programmer but I could try that.

ghost commented 6 years ago

It looks there is an error in referencing global logger inside PahoMqtt library :

paho-mqtt-1.0.7/lib/paho_mqtt/subscriber.rbline 69: @logger.error("The packet id is invalid, already used.") if PahoMqtt.logger? but should be PahoMqtt.logger.error(...

that's for the message: "undefined method `error' ". Since last commit has enabled the logger that started to occur. So for now we have to disable logger: lib/logstash/inputs/mqtt.rb line:47 PahoMqtt.logger = @logfile comment it.

ghost commented 6 years ago

I've updated plugin to disable logger by default. Please, verify how your case is behaving now. Possibly there will be still an issue but logged differently - that would be an issue inside https://github.com/RubyDevInc/paho.mqtt.ruby/ library. So if that's still not working you could report problems there.

ghost commented 6 years ago

Perfect, I will give it a shot today and let you guys know what happened!

Best Regards.

ghost commented 6 years ago

Hi,

Just did the same test. It failed (like expected) with the follow error:

[2018-05-08T10:46:51,267][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: org.jruby.exceptions.RaiseException: (PacketException) PahoMqtt::PacketException

Should I open an issue on paho.mqtt.ruby library?

Thanks!

jurek7 commented 6 years ago

@casmeiron yes, please report also to paho.mqtt.ruby. Please also add details from @tomc78 comment: https://github.com/jurek7/logstash-input-mqtt/issues/5#issuecomment-387022738

Since it seems that he found root cause of this issue. Thanks.

ghost commented 6 years ago

Did that, thanks!

ghost commented 6 years ago

Hi @jurek7, @tomc78 They have updated the paho mqtt jruby library to version 1.0.10 that should fix the problem, could you guys release a new version of your logstash-plugin that uses the mentioned one?

Thanks in advance!

jurek7 commented 6 years ago

I'd prefer to wait a bit. See: https://github.com/RubyDevInc/paho.mqtt.ruby/issues/38

ghost commented 6 years ago

Hi @jurek7 ,

You're right, they've created another problem with the implemented workaround for our issue.

Let's wait.

Thanks.

jurek7 commented 6 years ago

Hi @casmeiron,

Could you please redo your tests on latest version(1.0.5) ? paho.mqtt library was updated to 1.0.12

ghost commented 6 years ago

Hey @jurek7 ,

I repeated the test but it wasn't successful, instead I've got a bunch of errors:

E, [2018-06-05T16:03:31.977759 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.981117 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.984160 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.987298 #2077] ERROR -- : Writing queue is full, slowing down
[2018-06-05T16:03:31,986][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x54af63f5@/Users/casmeiron/Developer/tools/logstash-6.2.4/logstash-core/lib/logstash/pipeline.rb:247 run>"}
E, [2018-06-05T16:03:31.990343 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.993383 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.996457 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:31.999872 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.003279 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.005930 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.010806 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.013682 #2077] ERROR -- : Writing queue is full, slowing down
E, [2018-06-05T16:03:32.016814 #2077] ERROR -- : Writing queue is full, slowing down

First time I see this error. A total of 10,000 messages were sent and the error start appearing after the message number 991.

ghost commented 6 years ago

Btw I waited like 10 minutes and the error won't go away.

jurek7 commented 6 years ago

Could you please attach logstash config and logstash.yml ?

wt., 5.06.2018, 21:06 użytkownik Paulo Reis notifications@github.com napisał:

Btw I waited like 10 minutes and the error won't go away.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/jurek7/logstash-input-mqtt/issues/5#issuecomment-394825058, or mute the thread https://github.com/notifications/unsubscribe-auth/AAjy5vH2AwKP7iaO9IwoK4rPHTuKoDMAks5t5ta-gaJpZM4TwIe2 .

ghost commented 6 years ago

Hi @jurek7 ,

Sure, follow my configuration:

input {
    mqtt {
        topic           => "/test"
        host            => "localhost"
        username        => "aaa"
        password        => "bbb"
        client_id       => "test"
        clean_session   => false
        qos             => 1
    }
}

output {
    stdout { codec => rubydebug }
}

The logstash.yml is untouched:


# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
# path.data:
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
config.reload.automatic: true
#
# How often to check if the pipeline configuration has changed (in seconds)
#
config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and 
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of 
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
# path.logs:
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins: []
jurek7 commented 6 years ago

Hi @casmeiron,

I reproduced your issue.

p-goudet commented 6 years ago

@casmeiron From that the log, I could guess that the logstash pipeline is ending while mqtt handling operation are still running. I am currently checking how I could improve that in paho.

jurek7 commented 6 years ago

I think that terminating pipeline is some kind of side effect. The question on these log: E, [2018-06-05T16:03:31.977759 #2077] ERROR -- : Writing queue is full, slowing down E, [2018-06-05T16:03:31.981117 #2077] ERROR -- : Writing queue is full, slowing down

It prints this message with a very short interval between.

p-goudet commented 6 years ago

This message is print because the internal writing buffer is full. I think that it due to a acknowledgment packet. I think it should be a warning more than an error in logger level. I am thinking on this issue.

jurek7 commented 6 years ago

I see but the problem is that after this happen MQTT messages are not consumed anymore.

ghost commented 6 years ago

Hi @jurek7 ,

They have updated the paho.mqtt ruby implementation to version 1.0.12. Could you help me to test with this release? Let me know the steps I must perform to try it.

Thanks.

jurek7 commented 6 years ago

The same on 1.0.12