spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

Optionally flush the producer before commiting the transaction #2964

Closed bgK closed 10 months ago

bgK commented 10 months ago

In mixed workloads with both database writes and Kafka records production, flushing the Kafka producer before committing the JDBC + synchronized Kafka transaction would maximize the chances of both transactions completing in the same committed or rolled back state.

Use case:

  1. JDBC / DataSourceTransactionManager transaction start
  2. Database writes
  3. Transactional KafkaTemplate send. The KafkaProducer transaction is automatically started and synchronized to the DS transaction.
  4. More database writes
  5. Proposal: Add here optional automatic Kafka producer flush
  6. JDBC commit
  7. Kafka producer commit

For this use case if the Kafka cluster is unavailable or one of the topics cannot accept writes, and the futures returned by KafkaTemplate are not awaited, then the JDBC transaction is commited and the Kafka transaction fails to commit. With a producer flush before the JDBC commit, the Kafka failure is detected before the JDBC commit and both transactions are rolled back.

Maybe this could be an interesting safeguard for situations were the KafkaTemplate futures are unintentionally not awaited, or to simplify situations where further processing is intended to execute in parallel with the async part of the Kafka send and where passing the futures around is cumbersome.

sobychacko commented 10 months ago

@bgK Interesting proposal. If I understand your proposal correctly, the flush in your step 5 flushes the records that are sent in step 3, right?

From the javadoc of flush:

The post-condition of flush() is that any previously sent record will have completed

Therefore, by adding a flush in step 5, we capture any issues with Kafka publishing and if it fails, no commits will happen (steps 6 and 7 above). I just wanted to make sure that I got that right.

I think this is a reasonable request. Are you willing to submit a PR? If you can, that would be great. Otherwise, we can look at this as part of the 3.2.0 (or beyond) backlog.

bgK commented 10 months ago

Hi,

Therefore, by adding a flush in step 5, we capture any issues with Kafka publishing and if it fails, no commits will happen (steps 6 and 7 above). I just wanted to make sure that I got that right.

Yes, exactly.

So, I tried implementing this and it didn't work. I incorrectly assumed that flush would throw if one of the pending records failed to produce. It doesn't.

In our system we accumulate the futures returned by KafkaTemplate sends in a TransactionSynchronization, and await them in the beforeCommit hook. This has worked well for us for some years now, and I wanted to see if it could be implemented in Spring Kafka directly. Our solution might not scale well for people who want to send very large amounts of records in a single transaction. I incorrectly assumed flush could be used instead.