spring-cloud / spring-cloud-function

Apache License 2.0
1.04k stars 616 forks source link

Reactor Kafka and Spring Cloud Function example #779

Closed roger751 closed 2 years ago

roger751 commented 2 years ago

Is there any support/ example for using Reactor Kafka with Spring Cloud Function? I understood that Spring Cloud Function supports Project Reactor I couldn't manage to find any example. I think it would be helpful if some reactive examples appeared in the samples section of this repository.

olegz commented 2 years ago

@sobychacko can you please follow up on this?

sobychacko commented 2 years ago

@roger751 Here is a sample. However, I see that you have found this issue and your update. We will discuss how we can address this use case. Most likely, as part of the 4.0.0 version of Spring Cloud Stream.

roger751 commented 2 years ago

@sobychacko @olegz thanks. The example you have referenced is based on the spring-cloud-stream-binder-kafka. It's pretty unclear to me how does the SCS kafka binder consumes from kafka in a support for project reactor, without consuming with Reactor Kafka. I'm pretty confused with the whole situation, so I'll some it up for some questions (any answer will be appreciated):

  1. Does spring-cloud-stream-binder-kafka based on Spring-Kafka?
  2. If so, so if I use a project reactor with spring-cloud-stream-binder-kafka like in the example that you've added, do I consume behind the scenes with Spring-Kafka in an async way (futures based, @async and all of that)?
  3. Is it even possible to consume from kafka with reactor-kafka and use spring cloud function in the current situation?
  4. What is the current recommended way to consume a Kafka topic, if I want high performance? Is there any alternative currently for using reactor and consume from kafka?
  5. If you would create a project now, would you consume the topic with Spring Cloud Functions that use reactor kafka & core?
  6. Spring Cloud Stream & its Kafka Binder can be reactive?
  7. When does the 4.0.0 is planned to be released (approximately of course, if you can tell)?

Thank you very much

sobychacko commented 2 years ago

@roger751 Before I answer your individual questions, let me re-iterate the reactor situation with the current Kafka binder: Without proper implementation of this issue in Kafka binder, we will not get full reactive support. The reason is that the Kafka binder is based on MessageChannel, not reactive types. With that said, if you want to use Project Reactor with Spring Cloud Function and Kafka, you need to write code on your own to do that until we implement it formally in the binder.

Here are my takes on the questions you asked.

1.spring-cloud-stream-binder-kafka is fully based on Spring for Apache Kafka. All the foundational elements in the binder come from Spring Kafka. Binder adds connectivity, opinionated primitives on top of the functional programming model through spring-cloud-function.

  1. You still consume behind the scenes through spring-kafka in an async manner, but when you involve reactor like that, at the moment there is an impedance mismatch in the binder. We need to build a dedicated binder based on reactor-kafka for that which is the main focus of the other issue we pointed out.
  2. As mentioned above, you need to hand-craft code to do that at the moment.
  3. I think the answer here is highly dependent on your current use case. You can achieve really good performance with te message channel based Kafka binder, but you need to run benchmarks and adopt to your particular situation.
  4. You can do that, but you need to do what I said in 3. I wouldn't go that route, but stick with the message channel based binder, but again based on your requirements you may want to write a different stack using reactor-kafka.
  5. Without the other issue in the binder addressed, we cannot claim reactive capabilities for the binder. This is because from the moment records are delivered to the consumer by the binder, you are no longer in the reactive world, but rather handled through the message channel. In order to properly handle this we need to make sure that consuming layer is also reactive, thus a solution is needed based on reator-kafka - which is most likely a new binder variant for kafka (based on reactor-kafka).
  6. We are planning 4.0.0 for 2022. This is going to be based on Spring Framework 6.0 and Spring Boot 3.0. We are hoping to get a first milestone out in early 2022 (but no final dates have been set for this yet).

Hope that answers some of your questions.

I will link this issue from the other issue for more visibility and feedback.

roger751 commented 2 years ago

Thank you very much. This answers all I wanted, but it raises to me 2 more things:

  1. Instead of Spring Cloud Function and SCSt, do you think it is possible to implement a reactor-kafka/core based application that uses Spring Boot, Spring Data and Spring Cloud features? e.g. spring cloud k8s. The application won't use Spring Integration. Do you see any traps?
  2. In a case that I want to be able to upgrade to the futuristic binder when it'll be released, what would you say that will probably be easier to refactor - a reactor kafka based application as I've described in question 1, or a non-reactive Spring Cloud Stream application?
artembilan commented 2 years ago

Sorry for my 5 fingers. Re.

The application won't use Spring Integration

It is still OK to start your application from the reactor-kafka KafkaReceiver (or just ReactiveKafkaConsumerTemplate) and feed the result Flux into a FluxMessageChannel from Spring Integration. The rest of your flow would still be the same: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#flux-message-channel. Or you just can start the flow from that Flux. See IntegrationFlows:

    /**
     * Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
     * and subscribe it to the provided {@link Publisher}.
     * @param publisher the {@link Publisher} to subscribe to.
     * @return new {@link IntegrationFlowBuilder}.
     */
    @SuppressWarnings("overloads")
    public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {

The rest of the story to explain I leave to @sobychacko .

Thanks

sobychacko commented 2 years ago
  1. You can do that with reactor-kafka and boot. That is fine I think, but keep in mind that you are not using any SCFn/SCSt features here at all. Here are some examples from reactor-kafka. This is the part I said above basically that you need to implement the feature on your own until there is a formal one from the binder. See the comment from Artem above about FluxMessageChannel. That would also be a valid approach.
  2. If you are running a reactor kafka based application, then that will be easier to refactor to a futuristic reactor-based binder.

Btw - you are more than welcome to contribute any PR's for this.

artembilan commented 2 years ago

If we talk about a contribution, it is not so hard these days to implement a reactive channel adapter. The Inbound part is just simply an extension of the MessageProducerSupport with the subscribeToPublisher(Publisher<? extends Message<?>> publisher) call from the start(). See MongoDbChangeStreamMessageProducer for example. The Outbound part is just a ReactiveMessageHandler implementation. Of course if we talk about a solution based on Spring Integration and its reactive bits with FluxMessageChannel and ReactiveStreamsConsumer...

roger751 commented 2 years ago

@sobychacko thank you ! I'm not sure that I will have enough time for such a contribution. So I currently try to find a solution for my use case first.

@artembilan If I want a Spring Cloud application that consumes from reactor kafka, do you think it will be the easiest and safest way to go is by implementing a reactive channel adapter (the MessageProducerSupport)? If that isn't an easier option than passing a flux from reactive kafka to some spring data component, I'm failing to see the value of it. Will Spring Integration give any added value and features that you see as relevant in this use case?

roger751 commented 2 years ago

@artembilan another question - the solution you've just mentioned is the same for the user's implementation as for the contribution's implementation? In case of implementing your suggestion, will it be a valid contribution PR , that will bring the ideal situation to Spring Cloud Stream Kafka Binder?

sobychacko commented 2 years ago

@roger751 We are closing this issue since we have the binder issue to tackle this feature already. This issue is already linked from there, so we have all the context and information there. We will continue any discussion in the binder issue.

artembilan commented 2 years ago

End-user solution could be whatever is convenient for you. What is the fix for Spring Cloud Stream framework still not clear: it might fully not be based on Spring Integration. So, you need to decide for yourself if you stick with integration flow solution with whatever reactive capabilities are present there, or try to come up with something function-based. Either way I don't see how the current Kafka Binder can help you to keep Spring Cloud Stream model if you are seeking a reactive solution which is not present over there yet...

Please, let's continue discussion in the issue Soby has just mentioned.