reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

Rebalancing during offset commit results in event processing termination #266

Closed wjur closed 2 years ago

wjur commented 2 years ago

Expected Behavior

When the rebalancing happens during the commit, I expect a temporary, retryable error and a commit retry. If the retry is not possible, then I expect the uncommitted messages to be read again at some point of time (maybe by a different consumer).

Actual Behavior

In our production environment, it happens that some service instances stop reading events from Kafka after the rebalancing. In the logs we observe a couple warn and error messages. Most notably:

Member [censored] sending LeaveGroup request to coordinator 
[censored] (id: [censored] rack: null) due to the consumer is being closed
Commit failed
reactor.core.Exceptions$ErrorCallbackNotImplemented: 
    org.apache.kafka.common.errors.RebalanceInProgressException: 
    Offset commit cannot be completed since the consumer is undergoing 
    a rebalance for auto partition assignment. You can try completing 
    the rebalance by calling poll() and then retry the operation.
  Caused by: org.apache.kafka.common.errors.RebalanceInProgressException: 
    Offset commit cannot be completed since the consumer is undergoing
     a rebalance for auto partition assignment. You can try completing
      the rebalance by calling poll() and then retry the operation.
org.apache.kafka.common.metrics.Metrics - 
  Closing reporter org.apache.kafka.common.metrics.JmxReporter
org.apache.kafka.common.metrics.Metrics - 
  Metrics reporters closed

Possible root cause

It seems that when ConsumerCoordinator::sendOffsetCommitRequest throws RebalanceInProgressException (source) it is not handled in any way (no retry, no swallowing), but propagated and as a result closes the sink that is used to push polled events.

The code path that misbehaves

I went through the code and think I know what is happening here. The first log, especially the part the consumer is being closed led me to ConsumerCoordinator::close. I checked where it is called, and it turns out the only interesting place is the DefaultKafkaReceiver::withHandler method (source)

The close() method is used in the last parameter of Flux.usingWhen method, which is a asyncCleanup parameter (source). According to the Reactor's documentation it is called only when the Flux finishes (emits the last event or an error). Let's check when the Flux finishes, then. The Flux is created by scheduler -> function.apply(scheduler, handler) lambda. In our case function calls receive() which looks like follows (source):

public Flux<ConsumerRecords<K, V>> receive() {
  return sink.asFlux().doOnRequest(consumerEventLoop::onRequest);
}

It means, that the returned Flux finishes when sink generates the last event or emits an error.

The interactions with sink happen in ConsumerEventLoop. There is one call of emitNext and 4 emitError calls. No emitEmpty means that the Flux is infinite. Which is expected because we don’t expect event stream to be finite. Let’s take a look at these four emitError calls.

All three CloseEvent::run, PollEvent::run and SubscribeEvent::run before calling emitError call log.error, but the logs created there are not visible in our system, so I assume the call happened somewhere else.

The last candidate call is in CommitEvent (source). The mayRetry becomes false because the exception is org.apache.kafka.common.errors.RebalanceInProgressException and not RetriableCommitFailedException (since Predicate<Throwable> isRetriableException = RetriableCommitFailedException.class::isInstance; (source)). The callbackEmitters must be empty since they are added to commitArgs only in DefaultKafklaListener::receiveAtmostOnce which we don’t call anywhere (we call receive() only). Besides, the method logs the expected Commit failed. As a result, the Flux created by received() terminates with an error, the consumer closes and no more events are poll()-ed from Kafka.

Possible Solution

Just before ConsumerCoordinator::sendOffsetCommitRequest (source) returns a failure it has a comment that states the error is not fatal, so I assume the exception need some special handling at some point so that it not terminates the sink, but I'm just a reactor-kafka internals newbie. Just pointing out that it might be possible ;)

if (rebalanceInProgress()) {
    // if the client knows it is already rebalancing, we can use RebalanceInProgressException instead of
    // CommitFailedException to indicate this is not a fatal error
    return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the " +
        "consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance " +
        "by calling poll() and then retry the operation."));
}

Environment

garyrussell commented 2 years ago

You should avoid forced rebalances - it means some code is holding up a consumer thread in excess of the max.poll.interval, which is undesirable.

See this issue for how to configure the pipeline to recover.

garyrussell commented 2 years ago

To avoid the rebalance if you have a slow consumer; you should use .publishOn(), to hand the work off to a different thread; this frees up the event loop scheduler to continue polling; eventually, back pressure will kick in and the consumer will continue to be polled, returning no records, until the back pressure is relieved.

wjur commented 2 years ago

Thank you for the reply, @garyrussell. Unfortunately it is not the case of a slow consumer. We are not even close to the max.poll.interval threshold, and we're are using a different thread pool for polling and processing the events (using .publishOn()).

I didn't describe our environment in any way, my fault, and this is why we might not understand each other. We run in the software in a distributed environment. The application that consumes the events (with the same consumer group id) runs in multiple instances. The cluster scales up and down. It can happen that another instance is added/removed to the cluster before the previous rebalancing finished. Just a usual thing on a distributed environment, and that's why I think it could be covered by the library.

garyrussell commented 2 years ago

But...

Rebalancing during offset commit

A normal rebalance does not cause an error when committing offsets; the rebalance is suspended until the next poll; you should only get an error on a commit after a forced rebalance.

wjur commented 2 years ago

Thank you for the reply, @garyrussell. I'm not sure if I'm reading you correctly. Let me explain further what we're doing. We create a KafkaReceiver, use the receive() method, group by topic partition, switch the thread, process records in the partitions and then acknowledge the processed records' offsets.

kafkaReceiver.receive()
    .groupBy(message -> message.receiverOffset().topicPartition())
    .publishOn(processingContext.scheduler)
    .flatMap(partition -> partition
        .concatMap(record -> 
            processRecord(record) // the business logic is processed here
                .doOnNext(processedRecord -> 
                    processedRecord.receiverOffset().acknowledge()); // ack
    ))
    .subscribe();

Are we using reactive-kafka incorrectly in any way by having such a processing pipeline? Could you please take a look and check whether we do something we should not do?

normal rebalance

What does it mean? What would a "not normal" rebalance be? In a distributed environment, the fact that service instances are added to a cluster or removed (or add and removed at the same time) is normal. We don't do anything that others would not do.

the rebalance is suspended until the next poll

What do you mean? What makes it is suspended? What happens if another consumer joins the group and one of the instances is committing offsets? It seems that there must be some kind of race between the two that is not handled correctly.

I'm not sure, but it seems you are telling me that this should not happen, but it happens. The internals tried to commit an offset, got an error because of rebalancing happening, and not handled it in any what? Why would I handle it on my own? I spent much time reading the code to give you as much detail as I can, and you seem not to care and give me the same repeat/retry workaround.

Finally, I don't fully agree that this is a duplicate of the issue you linked because we don't know the other issue's root cause. And here we know what error and where it is thrown. There might be plenty of scenarios that result the same - that the processing flux stops.

garyrussell commented 2 years ago

A normal rebalance is when another consumer joins or leaves the group.

The actual rebalance processing is coordinated across the instances and is performed during the next poll (after any current offsets are committed); you should not get any exceptions when committing offsets when a a normal rebalance is pending.

If you get an error on a commit; it generally means that this consumer has not polled recently and the broker has kicked it out of the group.

If you can provide detailed logs of one of these events, we can see what's wrong.

I am happy to reopen it if you can provide concrete information that there is a bug in the framework.

oemergenc commented 2 years ago

Hi, i am running into the same problem in a very similar environment. When ever our cluster scales up our consumer instances we got commit errors which triggers our altering system. This is very annoying. Here are some logs, which can be seen when ever a new consumer joins. Basically this error messages triggers our altering system:


2022-05-18 20:51:11.676 ERROR 91603 --- [oundedElastic-3] reactor.core.publisher.Operators         : Operator called default onErrorDropped

org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.

Here is the full log: consumer_rebalance_error.log

I can mitigate this kind of error by adding on onErrorContinue operator like this to my receive flux:

kafkaReiceiver.receive()
... 
.onErrorContinue(RebalanceInProgressException::class.java) { throwable, _ ->
   log.warn(throwable) { "Rebalance in progress" }
}

However i think this should be handled by the kafka receiver itself or is there something i am doing wrong?

garyrussell commented 2 years ago

A related issue https://github.com/reactor/reactor-kafka/issues/240

Unfortunately, this is a side-effect of using reactive workloads; aside from having to resume after the commit exception, the root cause is trying to commit offsets that were in the pipeline during/after the rebalance.

In any case, those records will be redelivered (the the current or a new instance). To avoid the commit errors, we need to assign some kind of epoch to commits and discard those from an earlier epoch, with the epoch changing each time a rebalance occurs.

polrsg commented 2 years ago

A related issue #240

Unfortunately, this is a side-effect of using reactive workloads; aside from having to resume after the commit exception, the root cause is trying to commit offsets that were in the pipeline during/after the rebalance.

In any case, those records will be redelivered (the the current or a new instance). To avoid the commit errors, we need to assign some kind of epoch to commits and discard those from an earlier epoch, with the epoch changing each time a rebalance occurs.

In my use case, the problem with this isn't the exception of the commit. The problem is the fact that, lets assume that, consumer A is processing events 1,2,3 from partition x (in this exact order), and then a rebalance happens, so consumer B starts consuming partition x also and finish reading events 1,2,3 before consumer A. e.g. B finish processing 1,2,3 before A finished to process event 2.

In that case even if the commit will fail, I'll override event 3 with event 2 and consumer B will consume event 4 without reprocessing event 3. That can be a bad situation if I have to process the messages in order. There is something that can fix that?

The only solution that comes to my mind is tracking each message when it received from Kafka (in some kind of queue like the acknowledges messages within partition) then release them when a commit is performed, and when a partition revoked happens it will wait until all the tracked messages are released. in that way the rebalance will be finished only when all the current messages in the consumer will be committed. do you think it makes sense?

garyrussell commented 2 years ago

There is no way to avoid that (across consumers) in the framework, A doesn't know yet that B has already processed it; you need to make your downstream code idempotent - i.e. have A check whether this record has already been processed by some other instance.

A common technique is to store the topic/partition/offset along with the data; if the current offset in the DB is >= the offset of the current record then simply discard it.

polrsg commented 2 years ago

@garyrussell What about tracking the messages when they are received?

That makes sense because the partition assignment happens only after all of the consumers onPartitionRevoked callback is finish to run (so I can wait asynchronously for all of the messages to be committed before the rebalance finished).

Do you see a problem with this approach? I'm asking because I've manage to implement that mechanism of message tracking in a closed source project that was written in c#, but not in the reactive paradigm.

garyrussell commented 2 years ago

You can't hold up the rebalance; it is running on the consumer's scheduler thread.

To solve the issue I pointed you to, I am planning on adding an epoch to the Receiver and the ReceiverRecords where the epoch will change when a rebalance occurs. Then, you will be able to check if a record is from the current epoch and discard it if not.

polrsg commented 2 years ago

from my experience, you actually can hold the rebalance because in the low level client (kafka consumer) the onPartitionRevoked is called as side effect of the consumption thread, and the group coordinator is waiting until all the consumers callbacks are been finished to run, and only after that it letting the consumer group leader to assign the partition within the consumer group. I'll try later to find a reference for that in the documentation. but that makes sense to wait there until all the messages are been processed

garyrussell commented 2 years ago

Right, but if you hold it off too long, the broker will forcibly kick the consumer out of the group and assign the partitions to another instance anyway.

polrsg commented 2 years ago

that true, but for that I can set max.poll.timeout to be bigger then the rebalance duration (to be several minutes).

In my opinion, make sure that all the messages are processed in order without edge cases, is more valuable then that the group coordinator will detect a dead consumer within seconds instead of minutes (at least at some cases).

garyrussell commented 2 years ago

Good point.

polrsg commented 2 years ago

So do you think that tracking the messages when they're received and then async wait for them in onPartitionRevoked until they committed could be a solution? Or am I missing something?

garyrussell commented 2 years ago

I think we can add logic to the CommitBatch to simply keep a count of uncommitted records and wait for a (configurable) amount of time in onPartitionsRevoked for that count to go to zero before calling the commitEvent. The actual logic will, of course, depend on the ack mode.

garyrussell commented 2 years ago

@polrsg I have a prototype that works as expected; I will create a PR tomorrow and you can pull it locally if you want to give it a try before the PR gets reviewed and merged.

polrsg commented 2 years ago

@garyrussell Wow, impressive! Two more points comes to my mind about the implementation:

  1. You say that you will wait until the count of the unacknowledged messages will be zero. To my understanding, the unacknowledged messages are just the messages in the gap between acknowledges events (e.g. if I acknowledge event 1 and 5 the unacknowledged count will be 3 - 2,3,4). But what will happens if I'm processing 10 messages and acknowledge messages 1,2,3? Will event 4-10 count as unacknowledged? To my understanding they will not (fix me if I am wrong). Its important because in that situation I'll expect to wait in onPartitionRevoked until all 10 messages are committed.

  2. I think that there shouldn't be a configured time to wait in onPartitionRevoked, and instead just wait until all of the unacknowledged messages count will be zero. That's will be fine because in that way we'll ensure that if a rebalance will be finished successfully, these messages are kept in their original order. The max.poll.timeout will make sure that the onPartitionRevokedwill not get stuck forever. If a consumer cannot finish to processes within the time of the max.poll.timeout, in my opinion it is ok to kick him and alert that as a problem.

What do you think? And thanks very much for your time!!

garyrussell commented 2 years ago
  1. If you are acknowledging them out of order you should be using the new "out of order commits" feature; otherwise your commits will likely get out of whack. https://projectreactor.io/docs/kafka/release/reference/#_out_of_order_commits where the framework defers the later commits until the gaps are filled. Regardless, all we have to do is each time we put the results of the poll in the pipeline, we increment the counter by the number of records and, when each record is ack'd, decrement the counter; the code in onPartitionsRevoked() then waits in a loop, committing any new offsets received. until it goes to zero or the timer expires.
  2. We can't wait forever by default; the application might have bugs that prevent it from acking all records. There would be nothing to stop you from setting the time to Long.MAX_VALUE if you really want to wait forever, but I wouldn't recommend it.
polrsg commented 2 years ago
  1. Ok I got that, you answer me by telling that the unacknowledged count is increase by one when a new message is polled (its cover the case I mention above) I thought It only calculated by the gap size when using out of order commits (I'll dip more in the implementation in the future).

  2. I got that, you don't want that if an application is acknowledging in order, if one ack will fail the consumer will never finish the onPartitionRevoked and will be kicked. I'm using the out of order commits so In my case a bug in acknowledging is critical and consider a fatal because I'll never be able to commit (the gap will be forever) so that makes sense to me to kick the consumer when max.poll.timeout finish (but setting the time to Long.MAX_VALUE will be fine)

in conclusion, its all makes sense and I think that this feature will solve the edge case of two consumers working on the same partition after a rebalance. And in that way we'll make sure that a partition is assigned to a new consumer only after the previous one that consumes it was finished and commits all his messages.

thanks!! @garyrussell