reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 229 forks source link

Consumption is paused indefinitely #297

Open Arun-test opened 2 years ago

Arun-test commented 2 years ago

Hi, We have seen an issue where the property maxDeferredCommit is set to > 0 and, when used with auto acknowledge, is causing the consumer to pause indefinitely. We haven't been able to reproduce this locally, but when set to 0, we don't see the issue appearing.

Based on the discussion in this stack overflow , we feel that if this property is intended for manual commit, then it shouldn't be causing an issue when auto acknowledge is used.

Expected Behavior

The property maxDeferredCommit should be ignored if auto acknowledge is used.

Actual Behavior

The property maxDeferredCommit, when set to > 0, is causing the consumer to be paused for auto acknowledgment.

Steps to Reproduce

@Test
void reproCase() {

}

Possible Solution

Your Environment

crigas1 commented 1 year ago

we face a similar issue. We use reactor-kafka version 1.3.13. On our prod environment we observe that the consumption for certain partitions can pause indefinitely without any exceptions or errors in the logs after a few days of working ok. The ReceiverOptions we explicitly set is commitInterval: PT1S. We use both auto and manual commits. After we enabled the reactor.kafka DEBUG logs, we observed that we got the following log: "message":"Paused - back pressure","logger_name":"reactor.kafka.receiver.internals.ConsumerEventLoop". After that log the consumption from that topic(just one partition) paused for several hours and we had to restart the app in order to start consuming again. maxDeferredCommits is set to 0.

garyrussell commented 1 year ago

That implies that the application stopped processing records that were already in the pipeline; pausing due to back pressure (that message) is simply based on the size of the pipeline, a different message is logged if the pause is due to missing deferred commits. The consumer will be resumed the next time the onRequest() method is called (when space is freed up in the pipeline).

ajax-lizogubenko-s commented 1 year ago

Seems we are facing the same issue on 1.3.19 Why am I sure that it's not the pipeline stuck? We have perf test setup, 16 consumers with same business logic and with pattern matching 40 single-partitioned topics each. We are firing same 800 messages into each topic, and we can see that one of these 16 consumers paused by backpressure, and stuck forever with lag about 300 messages, no resuming. Other 15 consumers proceeds ok with no lag left. Same logic, same messages, but one consumer being stuck. Last debug log says: Paused partitions -> Async committing offsets -> nothing This behaviour reproduced not regular, sometimes run passes ok

garyrussell commented 1 year ago

Please provide an MCRE that exhibits this behavior.

crigas1 commented 1 year ago

in our case the problem was with a misconfigured groupBy that was followed by a flatmap with lower concurrency. The pipeline was hanging after 2-3 days of working normally. After we enabled trace logging we saw that the problem was not with reactor-kafka but downstream.

ajax-lizogubenko-s commented 1 year ago

@crigas1 could you please provide some details on your configs, for groupBy and flatMap. Perhaps that's our case also, want to doublecheck, cause it's difficult to reproduce

crigas1 commented 1 year ago

@ajax-lizogubenko-s in our case the problem was that the groupBy operator was (mistakenly) creating 47 groups, but the following flatMap only had concurrency of 24. (don't remember the exact numbers). This potential hanging is also mentioned in the groupBy javadoc.

Our issue stopped happening when we fixed the groupBy mapper to generate 24 groups (the same as the concurrency of the subsequent flatmap).

I hope this helps.