spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.11k stars 1.52k forks source link

Provide a Reactive alternative to KafkaOperations and KafkaTemplate #43

Closed sdeleuze closed 5 years ago

sdeleuze commented 8 years ago

KafkaOperations and KafkaTemplate does not currently provide a way to perform truly non-blocking operations since they are using java.util.concurrent.Future where the only way to retrieve the result is a blocking get() method.

Providing a Reactive alternative to these classes using Reactive Streams and Reactor Core types, like in our new Reactive HTTP client (which is a Reactive alternative to RestTemplate), in the Reactive Spring Data work that is about to start (see this ReactiveMongoOperations draft) or in the new Cloud Foundry Java client would enable truly async non-blocking and Reactive usage of spring-kafka and would be a huge improvement in term of scalability, especially when used with Reactive Spring Web or Reactive Spring Data support, or even with the upcoming Reactive support in Spring Cloud Stream 1.1.

Based on all the work we have done on this topic on Spring Reactive to prepare the Reactive support of Spring Framework 5, I would advise to use:

I am available with @smaldini and the Spring Reactive team to help bridging Kafka client API with Flux and Mono types. I think it should be doable (and efficient) by using Kafka 0.9 new Producer API and new Consumer API.

artembilan commented 8 years ago

@sdeleuze ,

Thank you for the interest for this project and your energy to share the Reactive experience everywhere!

So, I don't want to say that you should provide the solution here or even answer immediately (just with your convenience), but anyway I'd like to discuss this one. Just because it isn't clear what you are going to do here.

  1. KafkaTemplate is fully based on the Kafka 0.9 Producer API:
Future<RecordMetadata> future;
if (this.producerListener == null) {
    future = this.producer.send(producerRecord);
}

where their Future is just like FutureRecordMetadata implementation.

  1. KafkaTemplate provides only send operations. Yes, that's why it is only Kafka Producer based.
  2. Since Future can be simple adapted by the Mono, I don't see so much benefits with just moving the single code line Mono.fromFuture(this.producer.send(producerRecord)); into the KafkaTemplate instead of doing that from the target user's code.

Just curious what is in your mind on the matter. For better understanding the entire Flux and Mono story.

Thank you one more time!

sdeleuze commented 8 years ago

Hey @artembilan,

That's my pleasure to help if I can, and indeed I see a lot of value in using Kafka in Spring applications!

I think Kafka 0.9 producer API and documentation can be very confusing and I think I understand now better why KafkaTemplate returns a Future.

KafkaProducer API has 2 send() variants:

  1. Future<RecordMetadata> send(ProducerRecord<K,V> record)
  2. Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

The first variant documentation says "Asynchronously send a record to a topic." This is not false, but very misleading since as soon as you call future.get() to be able to get the RecordMetadata that will block the current thread. Even if you convert it later to another truly async type like Flux, Mono or CompletableFuture, getting the result of a Future will always be blocking.

It took me some times to understand how we could use KafkaProducer in a truly async non-blocking way. The answer seems to be in the 2nd variant Javadoc : you should do nothing with the Future return value and instead retreive the metadata thanks to the callback parameter. This kind of API is extremely confusing, IMO having the second variant returning no value at all (void) would have been far less confusing.

Maybe KafkaTemplate should provide a way to really perform async non-blocking operations. Possible solution are to expose a callback, ListenableFuture, CompletableFuture or Mono based API (using internaly the 2nd variant + the callback parameter) instead of the current Future based approach that is blocking (no choice) as soon as you get the metadatas.

Exposing a Mono based API would be IMO the best solution, it would be much more powerful, future proof, truly async, composable and easy to use especially in a Spring Framework 5 and Reactive Spring Cloud Stream context + would provide backpressure support and various other capabilities bring by such composition API.

I guess the Mono should be created using the callback parameter of the second send() variant, without using the Future return value at all + using something like Mono.fromCallable(), but to be validated by @smaldini.

For a better understanding of the full Mono and Flux story, maybe this comment I wrote could give you more context about the various stages we went through while working on Spring Reactive.

Any thoughts?

garyrussell commented 8 years ago

Hi @sdeleuze - notice that the template has a ProducerListener abstraction that allows the user to ignore the Future and get a callback instead - via the ProducerListenerInvokingCallback, we provide more contextual data than Kafka alone (sent payload etc).

By default, a simple logging callback is used, which just logs errors.

Code here.

sdeleuze commented 8 years ago

Thanks for pointing me to that @garyrussell.

So indeed that allows to register a common global listener, but still forces users to perform a blocking operation that could have huge performance impact if they want to do something like just being notified when the ACK has been received or do something specific with the metadata on each send() invocation right ?

artembilan commented 8 years ago

@sdeleuze ,

thank you for your investigation and clues! Really appreciate :smile:

I wonder you mean something like this: https://github.com/artembilan/spring-kafka/commit/11385d638a797c14a2d57f393fd52f4fc24d5622

Pay attention that I use MonoProcessor because I wasn't able to find anything else for deferred Mono completion. From other side pay attention, please, to the additional testMono(). I tried to consume() the Mono from the reactiveSend() the same way, but looks like, if we complete MonoProcessor from different Thread (like in case of the KafkaProducer), the consumption is skipped when onNext() happens before consume().

Any ideas?

CC @smaldini

smaldini commented 8 years ago

@artembilan when you're around we can get on screenhero :+1: Sounds like a bug, MonoProcessor is basically what used to be called Promise and is effectively the only way to send at most event arbitrary (like the FluxProcessor components you played with). MonoProcessor caches the terminal result as well so further subscribe consume the available result immediately.

garyrussell commented 8 years ago

Just FYI; we are planning the 1.0 release for late April early May.

I don't believe we can require Java 8 for that version.

Perhaps we should develop the 2.0 version in parallel?

sdeleuze commented 8 years ago

Indeed I thought that may be a blocking point for you, so why not:

That would be cleaner than providing separated ReactiveFoo classes IMO, since Mono is basically a much more powerful ListenableFuture.

artembilan commented 8 years ago

Hello, guys!

Thank you for your activity and sorry for delay. This is just a late morning for me :smile:

So, regarding Java 8. We can just can introduce a new ReactiveKafkaTemplate class and mark it with @UsesJava8.

But I agree about the 2.0 story and replacing ListenableFuture. We just need to figure out what's wrong with my solution :smile:

BTW, I wasn't able to reproduce it with this test-case:

    @Test
    public void testMono() throws Exception {
        MonoProcessor<String> promise = MonoProcessor.create();
        final CountDownLatch onNextCountDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(() -> {
            promise.onNext("test");
            onNextCountDownLatch.countDown();
        });
        assertThat(onNextCountDownLatch.await(10, TimeUnit.SECONDS)).isTrue();

        final CountDownLatch successCountDownLatch = new CountDownLatch(1);
        promise.consume(v -> successCountDownLatch.countDown());
        assertThat(successCountDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
    }

so, my assumption regarding multi-threading may be wrong...

Adding the Thread.sleep(1000); to the Callback before recordMetadataMonoProcessor.onNext(metadata); makes my test happy.

What is bad here with Reactor (again) it is difficult to debug :cry: .

So, @smaldini , I'd be glad to Screenhero with you on the matter! Feel free to ping me when you have spare time

artembilan commented 8 years ago

After some Screenhero session with @smaldini we localized the problem in the MonoProcesson, but not clear yet what is the race condition, when we subscribe just a bit late after onNext:

1459439786969 ON_NEXT: Thread[kafka-producer-network-thread | producer-1,5,main]
ConsumerRecord(topic = templateTopic, partition = 0, offset = 3, key = 22, value = Mono)
1459439786977 ADDED CONSUMER
garyrussell commented 8 years ago

@sdeleuze - here's my first cut at an AsyncKafkaTemplate (1.0) returning ListenableFuture.

Let me know if you have any comments/suggestions before I complete the tests and docs.

https://github.com/spring-projects/spring-kafka/pull/54

sdeleuze commented 8 years ago

@garyrussell I think we should avoid to expose an API that returns raw Future because this is a trap. I like the original intent to provide an async API (that make perfectly sense for Kafka) in KafkaTemplate, but with Future you end up with a sync behavior as soon as you want to know when the future is complete or get the value it emits.

Since ListenableFuture implements Future, my proposal would be to change methods returning Future in KafkaTemplate to methods returning ListenableFuture using the implementation you have in your PR. And the good thing with ListenableFuture is that it still implements Future.get() that allows to provide a sync behavior, that's why I think you could also remove all the Kafka.sync*() methods, and just explain in the Javadoc that users can call KafkaTemplate.send("foo", "bar").get() if they want such behavior (IMO this should be only used for test or demo).

That would also enable us to add easily a ReactiveKafkaTemplate without requiring a 2.0 (we just have to add an optional dependency to Reactor Core based on @artembilan work) where Mono is used instead of ListenableFuture, and maybe with some Flux based methods to handle that in a streaming way.

That would make a pretty nice and clear spring-kafka public API where KafkaTemplate provides a real async non-blocking API, that you can easily turn into a sync one by using get(), and a Reactive API that could be introduced when we want (1.1 maybe ?) as an additional ReactiveKafkaTemplate that won't break KafkaTemplate API provided in 1.0.

Any thoughts?

garyrussell commented 8 years ago

@sdeleuze Sorry - yes, I should have made it clear that my intention was to replace the existing template with this one, which I just hacked together as a PoC to see if it was in line with your thoughts.

sdeleuze commented 8 years ago

Oh in that case perfect :-) And yes the implementation is exactly what I had in mine with the use of SettableListenableFuture + Kafka Producer callback 👍

garyrussell commented 8 years ago

Updated PR here: https://github.com/spring-projects/spring-kafka/pull/57

sdeleuze commented 8 years ago

As discussed previously, Kafka Streams is going to be shipped with Kafka 0.10.0, as detailed in this great blog post.

While I think both KStream (Kafka Streams) and Flux (Reactor) will have there own advantages and are interesting to support, one open question we will have to solve is : should we build Reactive Streams support directly based on low level Kafka Producer / Consumer API or on top of Kafka Streams by converting KStream to Reactor Flux for example ...

mbogoevici commented 8 years ago

As far as I can tell, the KStream interface is mainly designed with a functional programming model in mind, but not necessarily reactive pull. That it follows a push model driven by the consumer thread becomes clearer when looking at https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L311. So, at a first glance, I do not believe that this fits well with building a Reactive Streams wrapper around it. I do see an interesting opportunity of hooking in Flux/Mono by adapting stream component pipelines, though so let's keep the discussion open.

artembilan commented 7 years ago

Just for the full picture: https://github.com/reactor/reactor-kafka

adamdec commented 7 years ago

Hi, do you plan to integrate reactor-kafka into spring-kafka?

garyrussell commented 7 years ago

Yes, that is the plan, after it stabilizes.

PedroAlvarado commented 7 years ago

@garyrussell Help me understand this better. Fast forward a number of releases, there will be built-in support for both kstreams and reactor-kafka. Moreover, the kstreams integration will not be wrapped around reactive-streams story. Correct?

garyrussell commented 7 years ago

Correct.

lpandzic commented 6 years ago

Is this targeted for 2.0?

artembilan commented 6 years ago

I don't think so. The Spring Kafka 2.0 is targeted for the September 21, just after Spring Framework 5.0 GA And I don't see any plans for Reactor Kafka GA release.

@ilayaperumalg , any thoughts, please?

More over there is still need some work to do from our side on the matter. So, in my best feeling this task is going to be deferred to the 2.1 version.

rotilho commented 6 years ago

Hello @artembilan! Are there still any plans for a reactive kafka operations?

artembilan commented 6 years ago

They are, indeed, but we haven't started 2.1 branch yet.

Any contribution is welcome, of course!

hartmut-co-uk commented 6 years ago

@artembilan meaning 2.2? https://github.com/spring-projects/spring-kafka/releases

Is there any timeline for this? Thanks.

artembilan commented 6 years ago

Sorry, I don't have resource in my mind to jump to this subject.

If you or anyone else can contribute the feature it will be great and I will be glad to review merge.

Only what I feel now is like 2.2 probably will be based on the Spring Framework 5.1 and therefore will be release somewhere after that one. Although I might be wrong and @garyrussell has some other vision for the subject.

garyrussell commented 6 years ago

We currently have higher priorities; community contributions would be most welcome.

hartmut-co-uk commented 6 years ago

Hi, many thanks for the update! ..didn't mean to add pressure - I'll try to find time to have a closer look.