hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
832 stars 153 forks source link

Simplify use of reactive primitives #541

Closed alexanderankin closed 1 year ago

alexanderankin commented 1 year ago

Problem or use case

the api is very difficult to abstract, and requires coupling with the deprecated RxJava2 and its replacement, the reactivesreams spec.

Preferred solution or suggestions

I think it makes sense to just use reactor, but perhaps a thoughtful investigation into the particular use cases can add alternative methods which return datastructures with a bit of overhead but a simplified API that is not tied to a particular implementation (but just the concepts of Single/Flowable, Mono/Flux, etc)

I would just like to discuss the degree of possibility of such an addition in this project, i think the implementation details can be determined once the motivation is established, to simplify the API.

I can give an example. one of the subscribe apis returns a FlowableWithSingle<F, S> where S is the single type and F is the Flowable type. This could have instead been addressed in the business logic rather than the async abstraction layer, like so:

@lombok.Data class TwoThings<S, F> { Single<S> single; Flowable <F> flowable; }

abstract class PerformsIO { abstract Single<TwoThings<String, Integer>> subscribeAndReturnAckAndPublishes(); }

This alone will go a long way to letting users avoid the fluent api, which anyone using this for work will need to do as they will need to configure the client using static properties from a secrets manager.

alexanderankin commented 1 year ago

The attached is the result of my simplification of a part of the hivemq-hq-issued sample code. its not perfect but it does demonstrate what i am talking about:

attached ```java package pkg; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.rx.FlowableWithSingle; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import reactor.adapter.rxjava.RxJava2Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.util.concurrent.atomic.AtomicReference; /* // build.gradle: plugins { id 'java' } dependencies { implementation platform('io.projectreactor:reactor-bom:2020.0.18') implementation 'io.projectreactor:reactor-core' implementation 'com.hivemq:hivemq-mqtt-client:1.3.0' implementation 'org.slf4j:slf4j-simple:1.7.30' implementation 'io.projectreactor.addons:reactor-adapter:3.4.8' compileOnly 'org.projectlombok:lombok:1.18.24' annotationProcessor 'org.projectlombok:lombok:1.18.24' } */ @Slf4j public class Example { public static void main(String[] args) { Mqtt5RxClient mqtt5RxClient = Mqtt5Client.builder() .buildRx(); Mono connAckMono = RxJava2Adapter.singleToMono(mqtt5RxClient.connectWith() // setting the settings is localized // this means we can abstract the fluent api and use static properties .cleanStart(false) .applyConnect()) .thenReturn(mqtt5RxClient); /* this is a promise for the result of a subscribe operation. it is a promise, because it will happen after it is connected (see above). it is an atomic reference because com.hivemq.client.rx.FlowableWithSingle#doOnSingle is annotated with com.hivemq.client.annotations.CheckReturnValue underneath of those two layers, it is just a simple pojo but the 'pojo' extends Flowable **and** Publisher */ Mono>> refMono = connAckMono.map(client -> new AtomicReference<>( client.subscribePublishesWith() // we need to set more settings here .topicFilter("abc") .qos(MqttQos.AT_LEAST_ONCE) .applySubscribe())); // now the 'pojo' (FWS) is just a regular pojo - this is what i am advocating for to be integrated into the core public API Mono resultMono = refMono.map(ref -> new AckWithPublishes() .setAck(Mono.create(s -> ref.set(ref.get().doOnSingle(s::success)))) .setPublishes(RxJava2Adapter.flowableToFlux(ref.get()))); resultMono.flatMapMany(result -> { return result.getAck() .doOnNext(ack -> { log.info("life is good: {}", ack); }) .onErrorMap(Mqtt5SubAckException.class, e -> { log.error("life is not good: {}", e.getMqttMessage().getReasonString().orElse(null)); return new IllegalStateException(e); }) .thenMany(result.getPublishes()) .doOnNext(message -> log.info("new message: {}", message)); }) .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } @Accessors(chain = true) @Data public static class AckWithPublishes { Mono ack; Flux publishes; } } ```
SgtSilvio commented 1 year ago

Hi @alexanderankin Thank you for your detailed description.

I think it makes sense to just use reactor

You can already use reactor as the API, see here: https://hivemq.github.io/hivemq-mqtt-client/docs/installation/#optional-features

I can give an example. one of the subscribe apis returns a FlowableWithSingle<F, S> where S is the single type and F is the Flowable type.

There is an explicit reason why the API was designed like that. We can not just split into a separate Flowable and Single. Subscribing to the Single would create the subscription on the MQTT level and messages matching this subscription will immediately be delivered to the client. When then subscribing to the Flowable, some messages might be missed. Basically this creates a lot of cases resulting in this race condition.

SgtSilvio commented 1 year ago

deprecated RxJava2

The next version of the client will upgrade to RxJava 3 (and of course keep the already available reactor API). The next version will come once we get to it which might actually happen soon.

pglombardo commented 1 year ago

Since there haven't been any updates in 6 months I will close out this issue. If anything remains, please feel free to re-open or file another issue. Thanks @alexanderankin for opening this discussion!