spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

What's the polling logic behind the scene by using reactive consumer and consuming in parallel? #1122

Closed zhizlu-sh closed 3 years ago

zhizlu-sh commented 3 years ago

Configuration I use:

kafka:
    default:
      consumer:
        ackMode: MANUAL

Sample Code(I change it a little for below testing) of consumer with concurrency 2

@Bean
fun consumeMessage(): Function<Flux<Message<String>>, Mono<Void>> {
    return java.util.function.Function { flux: Flux<Message<String>> ->
        flux.asFlow().flatMapMerge(2) {
            kotlinx.coroutines.flow.flow {
                try{
                    //do something
                }
                finally {
                    val acknowledgment: org.springframework.kafka.support.Acknowledgment? =
                        it.headers.get(
                            org.springframework.kafka.support.KafkaHeaders.ACKNOWLEDGMENT,
                            org.springframework.kafka.support.Acknowledgment::class.java
                        )
                    //manual commit
                    acknowledgment?.acknowledge()
                    emit(it)
                }
            }
        }.asFlux().then()
    }
}

Here is the first test I did for offset override:

  1. Create One topic with one partition, and I put 4 message [1,2,3,4] there, then the lag is 4.
  2. Change the code to if it's 3, then it won't commit.
  3. Run the code
  4. Lag become 0, because 4 has been committed, so the broker thought 3 has been successfully handled.

Here is the second test I did for concurrency:

  1. Create One topic with one partition, and I put 4 message [1,2,3,4] there, then the lag is 4.
  2. Change the code to if it's 2, delay(5000).
  3. Run the code
  4. I see out put as [1,2,3,4]. As I create two pipe(parallel), one pipe has been stuck by 2, but other pipe is smooth, so 1,3,4 go through that

According to the document of manual ACK below, looks like it polls bunch of message, and feed to reactive consumer? But from above testing, looks like it will pull much more than the currency I set? Also I tried set configuration.max.poll.records: 2, but the situation is still same as second test.

/**

  • Listener is responsible for acking - use a
  • {@link org.springframework.kafka.listener.AcknowledgingMessageListener}; acks
  • will be queued and offsets will be committed when all the records returned by
  • the previous poll have been processed by the listener. */ MANUAL,

My question is that if I only have one partition, and I want better throughput, but I don't want offset override, how can I do? Two solutions as far as I can think of:

  1. don't do parallel on one partition, I should configure multiple partitions 2.If I set concurrency to 2, only commit after 2 message have been successfully handled

For one, I have to configure multiple configurations to implement it in reactive way For two, I need to understand the logic behind scene, so maybe I could re-use the logic of "Manual Ack"

Sorry for such a long question, but I want to know more so I could know what's the best way to satisfy my requirement

garyrussell commented 3 years ago

Since Kafka only maintains a committed offset, performing async operations (such as using Reactor) causes a great deal of difficulties.

It is not recommended at all because offsets can be committed out of order.

The best way to achieve higher throughput is to use multiple partitions.

That said, he upcoming 2.8.0 release of spring-kafka supports out-of-order commits, by deferring commits until all the gaps are filled.

sobychacko commented 3 years ago

@zhizlu-sh See the advice from @garyrussell above. Try to remove the usage of the reactor library on top of the message-channel-based Kafka binder that is non-reactive. That seems to be the root of the issues that you are running into. Closing this issue now. Feel free to re-open if you find more issues.