reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
606 stars 225 forks source link

Provide option for KafkaReceiver's graceful shutdown #378

Open cjlee38 opened 6 months ago

cjlee38 commented 6 months ago

Motivation

Over the past few days, I've been looking for a way to shutdown gracefully KafkaReceiver, but couldn't find any proper way to handle this. I read related issues (https://github.com/reactor/reactor-kafka/issues/247, https://github.com/reactor/reactor-kafka/issues/51, https://github.com/reactor/reactor-kafka/issues/196) or SO questions but they don't work as I expected (This might be caused by my bad understanding of reactor or kafka, so please excuse me)

Example code snippets :

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).thenReturn(record) }
    .concatMap { record -> 
        record.receiverOffset().acknowledge()
        record.receiverOffset().commit() 
    }
    .subscribe()

This is typical case.

  1. receive record
  2. process record
  3. ack and commit (can be omitted when using auto-commit)

Desired solution

So, I think this is very common case : when I re-start my application(which is based on spring framework), consumers stop fetching records, and ongoing(I mean, already fetched records) flux keeps processing and also commits, and then complete the flux.

However, just disposing the disposable would not work as expected, because there is possibility that processed record not be committed.

Considered alternatives

There is no concrete idea to implement this, but things to consider are next.

  1. The Scheduler interface of reactor provides disposeGracefully method. image

    These methods (1 2) can be replaced with this (or selected by option)

  2. add sink.emitComplete() in ConsumerEventLoop#stop
  3. It looks like ConsumerEventLoop keeps polling from broker without hesitation and emit records into sink. If it's right, when producing numerous records in an instant would cause some problems. For example, let's say 10,000 records are produced, and consumer fetched them all within a few seconds. Besides OOM issue, flux needs to wait until all records are drained for desired graceful shutdown. I think emitting records should have some delays.

Additional context

In case of my ignorance, please let me know. Any other opinions would be appreciated. Thanks

KafkaProServerless commented 4 months ago

upvote

ajax-semenov-y commented 4 months ago

upvote

minzhang-haus commented 4 weeks ago

upvote