spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.57k forks source link

DefaultAfterRollbackProcessor.isProcessInTransaction() flag should be considered in process() method #2878

Open l0co opened 1 year ago

l0co commented 1 year ago

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

2.8.11 (but see in the currrent master as well)

Describe the bug

In DefaultAfterRollbackProcessor there's isProcessInTransaction() flag described as:

    /**
     * Return true to invoke
     * {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
     * in a new transaction. Because the container cannot infer the desired behavior, the
     * processor is responsible for sending the offset to the transaction if it decides to
     * skip the failing record.
     * @return true to run in a transaction; default false.
     * @since 2.2.5
     * @see #process(List, Consumer, MessageListenerContainer, Exception, boolean,
     * ContainerProperties.EOSMode)
     */

But this is not considered in process() method which is a part of the public interface of this class and one can expect to work. So, if you call this method yourself, in case you use isCommitRecovered() flag plus use transactional kafka template, you will end up with "no transaction" exception. This flag is only considered by default from org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#recordAfterRollback.

To Reproduce

With transactional KafkaTemplate set isCommitRecovered() to true and try to execute process() method yourself. This end with a "no transaction" exception.

Expected behavior

The above invocation should be wrapped properly into transaction.

artembilan commented 1 year ago

Any chances that you can contribute that fix: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc? Thanks

l0co commented 1 year ago

Might try to look at this, but for that I'd have to know one thing. This is called non-transactionally from KafkaMessageListenerContainer.ListenerConsumer.batchAfterRollback() and transactionally from KafkaMessageListenerContainer.ListenerConsumer.recordAfterRollback(). Is there any reason for this non-transactional call and should it be preserved? I mean, in case batch processing is used in non-transactional processing, KafkaTemplate would also be non-transactional and in the related code of DefaultAfterRollbackProcessor there will be no transaction, and finally the related code would not be executed (as in the current behavior).

In other words, can we call process() from KafkaMessageListenerContainer.ListenerConsumer.batchAfterRollback() with the same transaction conditions as KafkaMessageListenerContainer.ListenerConsumer.recordAfterRollback()?

That's probably a question to @garyrussell

garyrussell commented 1 year ago

Is there any reason for this non-transactional call

Currently, with a record listener, the failed record can be recovered (e.g. sent to a DLT), in which case, its offset is committed by the DARP by sending the offset to the (new) transaction (we can't mix transactional and non-transactional offset commits).

Recovery is not (currently) possible with a batch listener so there is no need to start a new transaction in that case.

sobychacko commented 11 months ago

@l0co, I can look at this issue (unless you are still considering sending a fix). Could you paste here the minimum required to reproduce this error from an application? Did you create your own AfterRollbackProcessor in the application and then call process on it? Please provide some code snippets; that would speed things up. Thanks!

l0co commented 11 months ago

@sobychacko Hello. I planned to submit something here but the end of the year is a bit hot in my company and I haven't found time yet, so feel free to provide a fix.

Yes, I have my own AfterRollbackProcessor with the following code:

Runnable process = () -> strategy.processor().process(consumerRecords, consumer, container, exception, recoverable, eosMode);
if (strategy.processor().isProcessInTransaction())
    kafkaEventTemplate.doTransactionally(process);
else
    process.run();

So, the processing is delegated to a strategy, but I have to manually wrap it in kafka transactional code. While it should work out-of-the-box. I think the fix can be adding own transactional wrapping in org.springframework.kafka.listener.DefaultAfterRollbackProcessor.process():

    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
            @Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

        if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
                getRecoveryStrategy((List) records, exception), container, this.logger)
                    && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
            // do manual transactional wrapping here --->
                ConsumerRecord<K, V> skipped = records.get(0);
                if (EOSMode.V1.equals(eosMode.getMode())) {
                    this.kafkaTemplate.sendOffsetsToTransaction(
                            Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                                    createOffsetAndMetadata(container, skipped.offset() + 1)));
                }
                else {
                    this.kafkaTemplate.sendOffsetsToTransaction(
                            Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                                    createOffsetAndMetadata(container, skipped.offset() + 1)), consumer.groupMetadata());
                }
        // end transactional wrapping here
        }

        if (!recoverable && this.backOff != null) {
            try {
                ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals, container);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

    }

And I think we are protected from non-transactional (batch) processing by this.kafkaTemplate.isTransactional() condition.

l0co commented 11 months ago

To bo more specific: I use DefaultAfterRollbackProcessor but as a delegate through CompositeKafkaAfterRollbackProcessor which implements composite pattern, to allow to define different rollback strategies by topic.

sobychacko commented 11 months ago

@l0co Thanks for the explanation! I see the issue but cannot reproduce it with my setup. Can you provide a simple standalone app where I can quickly reproduce it? I would like to see it failing on our end before adding the fix. Thanks!

l0co commented 11 months ago

@sobychacko not in this year probably :(

sobychacko commented 10 months ago

@l0co Any updates on the sample application? Thanks.

l0co commented 10 months ago

I will try to prepare something this week

l0co commented 9 months ago

Hello, that was difficult to return to this subject after longer pause, but tried to check what's going on and prepared small tester https://github.com/l0co/spring-kafka-2878-testcase. This is not a bug, though, just some issue with architecture I had with the related code. If you run DemoApplicationTests.shouldProcessEvent() it works, but if you stop with breakpoint here you will see that this is only transactional, because the calling code takes care of it here. So, if you just used DefaultAfterRolllbackProcessor.process() somewhere in your code with transactional Kafka, you'd have an exception, and you have to wrap all calls to your AfterRolllbackProcessor into the same code:

            if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
                this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer,
                                KafkaMessageListenerContainer.this.thisOrParentContainer, e, true,
                                ListenerConsumer.this.eosMode);
                    }

                });
            }

We have a scenario where we call it by hand, and we had to replace it with the code above. While I think this could be just a part of the rollback processor logic, something like this:

    @Override
    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
            @Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

        if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable,
                getFailureTracker()::recovered, container, this.logger)
                    && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
                // NEW CODE HERE ------------------------------------------------------>
                this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

                    @Override
                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        ConsumerRecord<K, V> skipped = records.get(0);
                        this.kafkaTemplate.sendOffsetsToTransaction(
                                Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                                        createOffsetAndMetadata(container, skipped.offset() + 1)
                                ), consumer.groupMetadata());
                    }

                });
        }

        if (!recoverable && this.backOff != null) {
            try {
                ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals, container);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

    }
Wzy19930507 commented 8 months ago

If set the transactionTemplate to DefaultAfterRolllbackProcessor, maybe first that we need ensure DefaultAfterRolllbackProcessor's KafkaAwareTransactionManager same as upstream components.

Such as KafkaMessageContainer's use this.transactionTemplate call DefaultAfterRolllbackProcessor.

Please correct me if there is any problem with what I say.

l0co commented 8 months ago

Do you mean this.transactionTemplate? Yes, this component is not available here but I think we could use kafkaTemplate.executeInTransaction(OperationsCallback<K, V, T> callback) instead.

Wzy19930507 commented 8 months ago

Do you mean this.transactionTemplate?

Yes, and this.transactionTemplate is create by KafkaAwareTransactionManager, maybe we need ensure DefaultAfterRolllbackProcessor and KafkaMessageContainer use the same KafkaAwareTransactionManager?

l0co commented 8 months ago

I'm not so deeply currently in this config, but I'd just use existing kafkaTemplate as I described above. But you might be right as well