apache / pulsar-client-reactive

Reactive client for Apache Pulsar
https://pulsar.apache.org/
Apache License 2.0
60 stars 19 forks source link

Consider providing Reactive Streams library implementation specific interfaces #10

Closed lhotari closed 2 years ago

lhotari commented 2 years ago

In Micronaut, there's a good example of this.

HttpClient https://docs.micronaut.io/latest/api/io/micronaut/http/client/HttpClient.html

ReactorHttpClient extends HttpClient: https://micronaut-projects.github.io/micronaut-reactor/latest/api/io/micronaut/reactor/http/client/ReactorHttpClient.html

With pulsar-client-reactive, the challenge is about the nested interface layers and how to make that seamlessly work.

One possibility would be to have an interface method that adapts the current instance to some other Reactive Streams implementation library type.

One option would be that it's completely decoupled from the Reactive Streams library implementation type

public interface ReactiveMessageSender<T> {

    Publisher<MessageId> sendMessages(Publisher<MessageSpec<T>> messageSpecs);

        <R> R adapt(Class<R> adaptedSenderType);
}
public interface ReactorMessageSender<T> {

    Mono<MessageId> sendMessage(Mono<MessageSpec<T>> messageSpec);

    Flux<MessageId> sendMessages(Flux<MessageSpec<T>> messageSpecs);

}

Internally there would be some SPI (Service Provider Interface) which Reactive Streams library support modules could implement. This way it would be easy to add support for various Reactive Streams implementation libraries where the types of the implementation library would be directly supported.

lhotari commented 2 years ago

The challenge is to build it in a way where the core doesn't depend on other Reactive Streams implementations. (The core could depend on Reactor, so this applies to other implementations.)

lhotari commented 2 years ago

I'm closing this as "wontfix". The solution for #1 is sufficient.

I made a broad experiment of a full blown approach where the whole API is mirrored for a specific Reactive Streams implementation, hiding the Project Reactor types completely. This experiment is in branch https://github.com/lhotari/pulsar-client-reactive/tree/lh-rs-impl-experiment .

The value of the wrapping is very minimal. All Reactive Streams libraries have interoperability with RS Publisher. Hiding this translation is just going to be more messy than using code for doing it.

One possible solution to the problem is to use Project Reactor's .as method (<P> P as(Function<? super Flux<T>, P> transformer)) to convert a Flux or Mono to another type.

For example, converting to RxJava3, would be as simple as .as(Flowable::from) or .as(Single::from). Similar chaining with .as method could be done for Akka Streams and any other Reactive Streams implementation.