graylog-labs / graylog-plugin-mqtt

MQTT Input Plugin for Graylog
https://www.graylog.org/
13 stars 11 forks source link

Plugin stop working after broker restart #7

Open claudyus opened 6 years ago

claudyus commented 6 years ago

I'm testing the MQTT TCP (Raw/Plaintext) input plugin provided after discussion in #6 with discrete success in the last 30 days until it suddenly stop working on 1 August between 6:00 and 7:00.

Apparently the broker was restarted and the plugin wasn't able to reconnect for over a day. You can see the gap in mqtt acquisition here: image

I'm running dockerized version of graylog (ver: v3.0.0-alpha.1+83bd3b5) and mqtt-plugin (ver: 1.2.0-rc.1)

On the grayloag logs we have:

2018-08-01 06:43:01,629 WARN : xenqtt - Connection to broker lost; scheduling a reconnect attempt for channel: MqttClientChannel[localAddress:N/A,remoteAddress:N/A]
2018-08-01 06:43:01,850 ERROR: xenqtt - Failed to connect MqttClientChannel[localAddress:N/A,remoteAddress:N/A]
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_171]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_171]
    at net.xenqtt.message.AbstractMqttChannel.finishConnect(AbstractMqttChannel.java:193) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.doConnect(ChannelManagerImpl.java:362) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.doIO(ChannelManagerImpl.java:312) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.access$000(ChannelManagerImpl.java:47) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl$1.run(ChannelManagerImpl.java:92) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-08-01 06:43:01,859 WARN : xenqtt - Connection to broker lost; scheduling a reconnect attempt for channel: MqttClientChannel[localAddress:N/A,remoteAddress:N/A]
2018-08-01 06:43:01,864 ERROR: xenqtt - Unable to create a new connection.
net.xenqtt.MqttInvocationException: Command failed: NewClientChannelCommand; ROOT CAUSE: Connection refused
    at net.xenqtt.message.AbstractBlockingCommand.await(AbstractBlockingCommand.java:65) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.newClientChannel(ChannelManagerImpl.java:175) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.newClientChannel(ChannelManagerImpl.java:166) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.newClientChannel(ChannelManagerImpl.java:150) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.client.AbstractMqttClient$ClientReconnector.run(AbstractMqttClient.java:791) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_171]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_171]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_171]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-08-01 06:43:02,122 ERROR: xenqtt - Failed to connect MqttClientChannel[localAddress:N/A,remoteAddress:N/A]

It repeated a lot of time until I restarted the graylog input itself:

2018-08-01 07:21:41,218 WARN : xenqtt - Connection to broker lost; scheduling a reconnect attempt for channel: MqttClientChannel[localAddress:N/A,remoteAddress:N/A]
2018-08-01 07:21:41,219 ERROR: xenqtt - Unable to create a new connection.
net.xenqtt.MqttInvocationException: Command failed: NewClientChannelCommand; ROOT CAUSE: Connection refused
    at net.xenqtt.message.AbstractBlockingCommand.await(AbstractBlockingCommand.java:65) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.newClientChannel(ChannelManagerImpl.java:175) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.newClientChannel(ChannelManagerImpl.java:166) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.message.ChannelManagerImpl.newClientChannel(ChannelManagerImpl.java:150) ~[graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at net.xenqtt.client.AbstractMqttClient$ClientReconnector.run(AbstractMqttClient.java:791) [graylog-plugin-mqtt-1.2.0-rc.1.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_171]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_171]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_171]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-08-01 12:53:30,147 ERROR: org.graylog2.shared.buffers.processors.DecodingProcessor - Unable to decode raw message RawMessage{id=e3e78821-9589-11e8-b769-0242ac120004, journalOffset=1265664, codec=gelf, payloadSize=316, timestamp=2018-08-01T12:53:30.146Z} on input <5b4dc8952ab79c0001239c56>.
2018-08-01 12:53:30,154 ERROR: org.graylog2.shared.buffers.processors.DecodingProcessor - Error processing message RawMessage{id=e3e78821-9589-11e8-b769-0242ac120004, journalOffset=1265664, codec=gelf, payloadSize=316, timestamp=2018-08-01T12:53:30.146Z}
java.lang.IllegalArgumentException: GELF message <e3e78821-9589-11e8-b769-0242ac120004> has empty mandatory "short_message" field.
    at org.graylog2.inputs.codecs.GelfCodec.validateGELFMessage(GelfCodec.java:252) ~[graylog.jar:?]
    at org.graylog2.inputs.codecs.GelfCodec.decode(GelfCodec.java:134) ~[graylog.jar:?]
    at org.graylog2.shared.buffers.processors.DecodingProcessor.processMessage(DecodingProcessor.java:150) ~[graylog.jar:?]
    at org.graylog2.shared.buffers.processors.DecodingProcessor.onEvent(DecodingProcessor.java:91) [graylog.jar:?]
    at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:74) [graylog.jar:?]
    at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:42) [graylog.jar:?]
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) [graylog.jar:?]
    at com.codahale.metrics.InstrumentedThreadFactory$InstrumentedRunnable.run(InstrumentedThreadFactory.java:66) [graylog.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
2018-08-02 14:39:42,190 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now STOPPING
2018-08-02 14:39:42,228 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now STOPPED
2018-08-02 14:39:42,255 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now TERMINATED
2018-08-02 14:39:44,534 INFO : org.mongodb.driver.connection - Opened connection [connectionId{localValue:14, serverValue:54406}] to ds139841.mlab.com:39841
2018-08-02 14:39:44,537 INFO : org.mongodb.driver.connection - Opened connection [connectionId{localValue:13, serverValue:54405}] to ds139841.mlab.com:39841
2018-08-02 14:39:44,604 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now STARTING
2018-08-02 14:39:44,715 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now RUNNING
2018-08-02 14:40:01,268 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now STOPPING
2018-08-02 14:40:01,296 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now STOPPED
2018-08-02 14:40:01,329 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now STARTING
2018-08-02 14:40:01,358 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now TERMINATED
2018-08-02 14:40:01,411 INFO : org.graylog2.inputs.InputStateListener - Input [MQTT TCP (Raw/Plaintext)/5b4dc8662ab79c0001239c1b] is now RUNNING
2018-08-02 17:47:23,886 ERROR: org.graylog2.shared.buffers.processors.DecodingProcessor - Unable to decode raw message RawMessage{id=1cd82ac2-967c-11e8-b769-0242ac120004, journalOffset=1496466, codec=gelf, payloadSize=331, timestamp=2018-08-02T17:47:23.884Z} on input <5b4dc8952ab79c0001239c56>.
2018-08-02 17:47:23,897 ERROR: org.graylog2.shared.buffers.processors.DecodingProcessor - Unable to decode raw message RawMessage{id=1cd851d1-967c-11e8-b769-0242ac120004, journalOffset=1496467, codec=gelf, payloadSize=331, timestamp=2018-08-02T17:47:23.885Z} on input <5b4dc8952ab79c0001239c56>.
2018-08-02 17:47:23,899 ERROR: org.graylog2.shared.buffers.processors.DecodingProcessor - Error processing message RawMessage{id=1cd851d1-967c-11e8-b769-0242ac120004, journalOffset=1496467, codec=gelf, payloadSize=331, timestamp=2018-08-02T17:47:23.885Z}
java.lang.IllegalArgumentException: GELF message <1cd851d1-967c-11e8-b769-0242ac120004> has empty mandatory "short_message" field.
    at org.graylog2.inputs.codecs.GelfCodec.validateGELFMessage(GelfCodec.java:252) ~[graylog.jar:?]
    at org.graylog2.inputs.codecs.GelfCodec.decode(GelfCodec.java:134) ~[graylog.jar:?]
    at org.graylog2.shared.buffers.processors.DecodingProcessor.processMessage(DecodingProcessor.java:150) ~[graylog.jar:?]
    at org.graylog2.shared.buffers.processors.DecodingProcessor.onEvent(DecodingProcessor.java:91) [graylog.jar:?]
    at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:74) [graylog.jar:?]
    at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:42) [graylog.jar:?]
    at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) [graylog.jar:?]
    at com.codahale.metrics.InstrumentedThreadFactory$InstrumentedRunnable.run(InstrumentedThreadFactory.java:66) [graylog.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]

5 could facing a similar issue.

tduemesnil commented 5 years ago

We have also the issue that sometimes the mqtt input stops and we have to restart it manually. I don't think that we have restarted the mqtt server in that time. Will verify that. We are on omnibus install Graylog 2.5.1+34194da on Linux 4.4.0-36-generic running on physical hardware