monix / monix-kafka

Monix integration with Kafka
Apache License 2.0
123 stars 38 forks source link

Implements a KafkaConsumerResource to solve consumer already closed i… #306

Open paualarco opened 3 years ago

paualarco commented 3 years ago

Resolves https://github.com/monix/monix-kafka/issues/240 by introducing a new class KafkaConsumerResource which essentially is an improved way of creating and closing a KafkaConsumer than directly form KafkaConsumerObservable, the problem with the latter one was on the last element of the observable, the consumer was closed after emitting the last element and signaling on complete, which lead to an exception when trying to commit the last consumer record. This is fixed with the resource usage, since the consumer will not be closed after the latest element is consumed.

jvican commented 2 years ago

I can review this if you're interested :)

paualarco commented 2 years ago

I can review this if you're interested :)

Thanks, that would be helpful! I've updated the description :) Although in order to proceed I think we would need approval from one maintainer too.

Avasil commented 2 years ago

Hi @paualarco sorry for ignoring this

Looking at the bug example:

    KafkaConsumerObservable
      .manualCommit[String, String](kafkaConfig, List(topic))
      .timeoutOnSlowUpstreamTo(5.seconds, Observable.empty)
      .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) }
      .mapEval(completeBatch => completeBatch.commitAsync())
      .headOrElseL(List.empty)

I feel like the real issue is that completeBatch.commitAsync() returns before the commit is completed? IIRC the underlying consumer is not closed until the downstream completes and it feels like it should complete after the last commit is done. Is the issue reproducible with these changes https://github.com/monix/monix-kafka/pull/250 ?

paualarco commented 2 years ago

@Avasil no problem,

I don't think that's the case, so the poll heartbeat consumer would fix the problem of partition reassignment by rebalancing, which was caused by slow downstream consumers. However in this case the issue is different, it happens to be that when the observable is cancelled (in the example because it is reaching a certain timeout), the consumer is being closed too, but that's not desirable when there are elements that have not yet been committed. Thus, when they do, they return an error indicating that the consumer was already closed.

It gets fixed when the resource is responsible of closing the consumer, and not from the Observable onComplete.

For the poll heartbeat to have been caused this issue, the max.poll.interval.ms would have needed to be higher than 300sec (as the default value), but the test were not as high :)