hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
855 stars 158 forks source link

subscribeWithPuplishes.subscribeSIngleFuture interferes when further processing publishes #544

Open DerSchwilk opened 2 years ago

DerSchwilk commented 2 years ago

When calling subscribeSingleFuture() on a FlowableWithSingle<MqttXPublish, MqttXSubAck> i would expect that the original flowable behaves the same as if I would process it with the subscribeSingleFuture() call. If I run the subscribeSingleFuture() the flow should still process all MqttPublishes received by the subscription.

We publish >20 message to the client. When running subscribeSingleFuture() on the FlowableWithSingle and manually acking the publishes on the original flowable, the original flowable only emits 20 elements, capped by the max in-flight messages of mosquitto.

To Reproduce

In our scenario we want to evaluate the SubAck prior to processing and consuming the publishes.

  1. Publish n messages with QoS > 0 to a mosquitto instance with max in-flight configured to < n
  2. Consume the messages by an RXClient with subscribePublishes(Mqtt5Subscribe, true)
  3. Store the returned FlowableWithSingle in a variable
  4. Run subscribeSingleFuture() on it
  5. Acknowledge the messages in a mapping stage on the stored FlowableWithSingle
  6. Consume the stored FlowableWithSingle

`final var flowableWithSingle = client.subscribePublishes(Mqtt5Subscribe.builder().topicFilter(TOPIC).qos(MqttQos.EXACTLY_ONCE).build(), true);

    flowableWithSingle.subscribeSingleFuture();

    flowableWithSingle.doOnNext(n -> {
        LOGGER.info("Got next");
        n.acknowledge();
    }).ignoreElements().blockingAwait();`

Without calling the subscribeSingleFuture() the messages are acked correctly and every message is received.

Details

thjaeckle commented 2 years ago

@DerSchwilk and I encountered that behaviour while including the reactive API of HiveMQ client in Eclipse Ditto. We run our system-tests with Eclipse Mosquito, that's where we noticed the strange behaviour change when using FlowableWithSingle.