spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

Kafka offsets are not commited when Container pollTimeout >> ListenerInvoker recordsPollTimeout #140

Closed adamdec closed 8 years ago

adamdec commented 8 years ago

Hi, In reference to #135

Problem for live-lock still exists for version 1.0.1.RELEASE I have tested it using your test case "testBatchAck" with ContainerProperties:

        containerProps.setSyncCommits(true);
        containerProps.setAckMode(AckMode.BATCH);
        containerProps.setPollTimeout(10000);
        containerProps.setAckOnError(false);

As abbadon123 already suggested the simple fix would be to add:

    catch (WakeupException e) {
        if (!this.autoCommit) {
           processCommits();
        }
        this.unsent = checkPause(this.unsent);                  
    }

Regards

artembilan commented 8 years ago

Thank you @adamdec , for having a look into this!

I'm appreciate all your feedback and let me describe the problem as I see it to steal your a bit more.

When the this.consumer.poll(this.containerProperties.getPollTimeout()) is longer than listener's batch process and, therefore, there is no more records to proceed normally withing the main loop, we always end up in the catch (WakeupException e) after

if (this.isManualAck || this.isBatchAck) {
    this.consumer.wakeup();
}

initiation in the invokeListener process. Since that catch (WakeupException e) doesn't have processCommits() operation, we commit BATCH only in the container's stop() procedure. Or... after pollTimeout, of course. And that's why, I guess, @abbadon123 's testBatchAck() test passes.

Correct me if I have missed something.

artembilan commented 8 years ago

Well, after some hacking I'd say that it would be better to leave the main loop for poll as is and remove extra code from ListenerInvoker:

if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
    ListenerConsumer.this.consumer.wakeup();
}

If we don't have records to process we don't have any chance for consumer to wait for them in the topic and just wake up it for nothing. As you noticed correctly each second:

ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
                                    TimeUnit.SECONDS);

I don't see reason to break the poll process so often.

Although I won't mind if we move:

if (!this.autoCommit) {
    processCommits();
}

before the this.consumer.poll() task in the main loop just in case really to avoid so long time to wait after that poll.

From here I even not sure that need that this.consumer.wakeup();, unless satisfy pause/resume functionality...

artembilan commented 8 years ago

If we would like to release today, we should pay attention to this one.

CC @garyrussell

garyrussell commented 8 years ago
if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
    ListenerConsumer.this.consumer.wakeup();
}

I think we still need that for batch ack and count ack - otherwise the batch/count ack will be delayed until the next poll expires. I suppose Time ack is ok to be a "bit late".

But, yes, we should either move processCommits to a finally block or do it before the poll.

adamdec commented 8 years ago

If you move processCommits() before poll than (I think) we force "at most once" commit policy. If the consumer crashes before its position catches up to the last committed offset, then all messages in that gap will be “lost,” but you can be sure no message will be handled more than once.

On the other hand putting it to finally will cause that we will have "at least once" offset commit policy.

Please correct me if I am wrong.

adamdec commented 8 years ago

I think we still need that for batch ack and count ack - otherwise the batch/count ack will be delayed until the next poll expires. I suppose Time ack is ok to be a "bit late

I think it will give us nothing until we move processCommits() to finally {} or before poll...live-lock will still be there ;/

garyrussell commented 8 years ago

... until we move ...

Correct; that's what I meant.

artembilan commented 8 years ago

From my perspective it is idempotent to have it before the poll or in the finally. We even can have it in both places, but that doesn't matter. We can fail for any unexpected reason. And yes, we can handle in the listener the same message twice or more time. See #104 for more information. The problem with acks that we can do them only in the same polling thread. Therefore we can only poll or commit offsets. And from here it doesn't matter in which order we call them, especially when we break poll via consumer.wakeup().

I won't mind for any other objections, because I really still don't feel well with Kafka :smile:

garyrussell commented 8 years ago

I am not following the discussion about before poll() Vs. finally - I don't see how any messages can be "lost". I am also not sure what you mean by "consumer crashes" since all Exceptions are caught. If the JVM crashes then uncommitted offsets will be replayed on the next start, regardless of before poll() Vs. finally.