RubyDevInc / paho.mqtt.ruby

Eclipse Public License 1.0
31 stars 19 forks source link

PacketException when too many messages arrives #31

Open ghost opened 6 years ago

ghost commented 6 years ago

Hello,

The issue was firstly reported here: https://github.com/jurek7/logstash-input-mqtt/issues/5

When using the 'logstash-input-mqtt', which uses this library, I found out this problem. As related by the plugin's authors, its related to paho.mqtt.ruby library and not their library.

So I hope you guys can help me to find out what's going on.

Let me know if I can provide any more details.

Thanks in advance!

p-goudet commented 6 years ago

Hello,

I merged the fix on the logger reference from @tomc78. A new minor version (1.0.8) have been released today. This included the fix about the logger reference. However, unfortunately I think that I would not solve your issue. The raised exception is due to a mismatch on the id of a SUBACK packet.

Could you give me more details about the test sequence, especially the subscribe process?

ghost commented 6 years ago

Hi @p-goudet,

Ok, I will try to provide more information.

The test case its simple and easy to reproduce.

I'm using the paho-mqtt java client to produce the messages with qos=1 and a total of 10k messages are published. The consumer is turned off when the producer is running.

After producing the 10k messages, I start the consumer. I'm using logstash software to do that 4 me, using the mqtt-input plugin, which its very simple (the source code can be viewed here: https://github.com/jurek7/logstash-input-mqtt/blob/master/lib/logstash/inputs/mqtt.rb) and relies on this library to work.

The follow configuration is given to the plugin, which is forwarded to this library:

                topic           => "/teste"
                host            => "localhost"
                username        => "aaaa"
                password        => "bbbb"
                client_id       => "teste"
                clean_session   => false
                qos             => 1

Here's how the configuration is forwarded:

@client = RecoverableMqttClient.new({
            :host => @host,
            :port => @port,
            :persistent => @persistent, # keep connection persistent
            :mqtt_version => @mqtt_version,
            :clean_session => @clean_session,
            :client_id => @client_id,
            :username => @username,
            :password => @password,
            :ssl => @ssl,
            :will_topic => @will_topic,
            :will_payload => @will_payload,
            :will_qos => @will_qos,
            :will_retain => @will_retain,
            :retry_reconnection_max_count => @reconnect_retries,
                        :retry_reconnection_sleep_time => @reconnect_sleep_time,
        })

When a message is received, I just print it's content (JSON). This is accomplished by an output plugin in logstash, as easy as this:

output {
    stdout { codec => rubydebug }
}

Mosquitto MQTT its being used as mqtt server, running in docker. That's it, there's nothing special about the setup. I can easily reproduce with 100% accuracy the error.

The consumer consumes all the messages (90% sure about it) and when it finishes, the given exception is thrown. Maybe the MQTT Client is tryin' to "commit" (ack) all the messages together and the package ends up being bigger than expected? If I send less messages, let's say 1k, I don't see that error. The content of all the messages are like this:

{"data": 0}

It holds between 11 and 15 bytes so something around 150.000 bytes are sent/received.

Let me know if you need any more detailed information.

Thanks very much for looking that up.

Best Regards.

ghost commented 6 years ago

Hi guys,

Any thoughts on that?

p-goudet commented 6 years ago

Hello @casmeiron ,

Yes, I could find the reason why some packet could not be reach. When the client request for a publish, the packet are not send directly, they are first buffered, and then send in a background process which handle the writing and the reading on the sockets. The current implementation might drop packet if the sending buffer is full. I am changing this approach, when the sending buffer is full, the client would wait for a short delay (currently 2ms) before retrying to send. This solution is being implemented, the result I could observe from samples seems to fix this issues (correctly sending 10000 publish packets with qos = 1 ). As soon as it would pass the test I would submit a new version that should be 1.0.10.

ghost commented 6 years ago

Hi @p-goudet ,

That's great thanks very much for the help. Understood why the error its happening, and the workaround isn't a problem too. Yes, I'm indeed gonna wait till the tests are OK.

Best Regards.

p-goudet commented 6 years ago

Hello @casmeiron,

I have submit the version 1.0.10 right now. The client should be able to send any packet now, some error might be displayed in log if the sending frequency is to high. As said previously, the current version would wait for short delay if the sending buffer is full, and then retry to push into the sending buffer. Could you tell me if the version 1.0.10 fixed this issues or if other point remains unsolved?

ghost commented 6 years ago

Hi @p-goudet ,

Sorry I didn't get your comment. No it did not fix it, and even with the version 1.0.12 the problem appears, under a different hood: https://github.com/jurek7/logstash-input-mqtt/issues/5#issuecomment-394964980

ghost commented 6 years ago

Hi @p-goudet ,

Any news on this? Currently I'm unable to use this implementation of paho 'cause of this problem. Would love to see any updates.

Thanks.

p-goudet commented 6 years ago

I sorry I haven't been really active in development recently. I would try to tackle this issue again soon.

ghost commented 6 years ago

Hi @p-goudet , any news about this one?

Thanks!

daviduebelacker commented 3 years ago

Ping! ? ;-) any news?

We did run into the same Problem, sometimes we get an invalid SUBACK for a QOS1 subscription.