hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
824 stars 153 forks source link

MqttAsyncClient sometimes blocking indefinitely on publish #620

Open tmbull opened 4 months ago

tmbull commented 4 months ago

Checklist

❓ Question

Hi all, I have a Vertx application that consumes messages from Kafka and publishes messages to MQTT. Frequently, I see threads get stuck in the "WAITING" state while publishing messages out. I looked through prior issues, and it seems that there are several cases where this can occur if the client is not connected before publishing, or the client gets disconnected before the publish is ack'ed. I do not believe that is the case here, as I do not see any messages indicating that the client has disconnected.

I do not believe this is a bug, as we have several other applications using the HiveMQ MqttClient to publish messages and we never encounter this issue. I have inspected the code, but to be honest, I am not very familiar with RxJava. I plan to dig into that next, but I was wondering if there are any other obvious cases or race conditions I look out for. Thank you for your time.

📎 Additional context

I would like to provide a sample project, but this is a proprietary code base and, as of yet, I have been unable to reproduce this code in a "toy" project. However, here is a sample stack trace:

"vert.x-eventloop-thread-0" #21 prio=5 os_prio=0 cpu=84.25ms elapsed=135.84s tid=0x00007fdac4601000 nid=0x1b in Object.wait()  [0x00007fdaad038000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(java.base@11.0.16/Native Method)
    - waiting on <0x00000006951d1a00> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables)
    at java.lang.Object.wait(java.base@11.0.16/Unknown Source)
    at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables.add(MqttPublishFlowables.java:53)
    - waiting to re-lock in wait() <0x00000006951d1a00> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables)
    at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle.subscribeActual(MqttAckSingle.java:56)
    at io.reactivex.Single.subscribe(Single.java:3666)
    at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
    at io.reactivex.Single.subscribe(Single.java:3666)
    at com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture.<init>(RxFutureConverter.java:113)
    at com.hivemq.client.internal.rx.RxFutureConverter.toFuture(RxFutureConverter.java:43)
    at com.hivemq.client.internal.mqtt.MqttAsyncClient.publish(MqttAsyncClient.java:243)
    at com.my.code.MqttClient.publishMessageWithRetryInternal(MqttClient.java:258)
    at com.avalara.edge.common.mqtt.MqttClient$$Lambda$1608/0x0000000840939440.handle(Unknown Source)
    at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:948)
    at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:919)
    at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
    at io.vertx.core.impl.ContextBase.emit(ContextBase.java:297)
    at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:207)
    at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:937)
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(java.base@11.0.16/Unknown Source)
Mystery406 commented 1 month ago

Same as #554