confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
118 stars 895 forks source link

Producer goes to retry-backoff by send_offsets_to_transaction() thus degrading performance #967

Open Traktormaster opened 4 years ago

Traktormaster commented 4 years ago

Description

For some reason the send_offsets_to_transaction() call will be 50+ times slower when there is more than one message in the input topic of the transactor.

How to reproduce

I've made a reproduction demo that has two modes of operation. In one the issue is present and in the other it is absent.

The readme in that repository contains detailed information about the environment, the issue and the demo. There are log excerpts from the clients and the broker.

Checklist

Please provide the following information:

Traktormaster commented 4 years ago

I've tried setting retry.backoff.ms to 1 and it considerably improves the performance. So the time is mostly spent by the transacting producer waiting for a retry-delay.

I've added the DEBUG level broker logs to the repository. In the parallel one we can plainly see that something is not right according to the broker:

...
[2020-10-15 10:55:46,619] DEBUG [TransactionCoordinator id=1001] Returning CONCURRENT_TRANSACTIONS error code to client for eos-transactions.py's AddPartitions request (kafka.coordinator.transaction.TransactionCoordinator)
...

The real question is why does it happen? Why does retrying always succeed without any change? Or how to avoid it?

Traktormaster commented 4 years ago

I've found two more conditions that make the problem seemingly subside.

1. Transact more than one message

If I change the line of the demo-transactor

if True:  # DEMO-CHANGE: transact every processed message one-by-one

to

if msg_cnt > 1:  # DEMO-CHANGE: transact every other message

the process will finish in parallel mode without the concurrent errors.

2. Transact messages one-by-one but not too fequently

If I add just a little delay for the processing phase time.sleep(0.01) the transactions will succeed. Even with the added delay all transactions finish faster than in serial mode.

New conclusions

My guess is that the broker will somehow not finish the previous transaction by the time the new one is submitted. This will cause the "concurrent operation" error. I'm really not knowledgeable on Kafka internals though.

Is there a minimum time delay requirement between transactions even when they are coming from the same producer?

edenhill commented 3 years ago

Your conclusion is right, transaction commits are asynchronous and may not have finished by the time commit_transaction() returns (successfully).

It sounds like you want to be able to modify the retry interval, can you file a librdkafka issue for that?

edenhill commented 3 years ago

Please try out v1.6.0 first (will be released soon), since it has a bunch of transaction changes.

Traktormaster commented 3 years ago

I've re-examined the issue with v1.6.0, and it seems to be identical to v1.5.0.

Async behavior is unclear for commit_transaction()

Your conclusion is right, transaction commits are asynchronous and may not have finished by the time commit_transaction() returns (successfully).

This behavior is not clear from the documentation of commit_transaction(). In my opinion the following part sounds like if the opposite was true actually:

...
Note: This function will block until all outstanding messages
are delivered and the transaction commit request has been
successfully handled by the transaction coordinator, or until
the timeout expires, which ever comes first. On timeout the
application may call the function again.
...

There's a slight difference in meaning. The docstring says that the transaction is acknowledged by the relevant broker and we may continue to work, but not too fast! Because if we commit another transaction (even by the same producer) too soon it will fence itself off somehow.

Transaction commit specific retry interval?

It sounds like you want to be able to modify the retry interval, can you file a librdkafka issue for that?

Since the configuration option of retry.backoff.ms seems to be global for all protocol communication maybe having one specific for the commit-retry could be better, but I'm not even sure where the issue should be handled necessarily.

The bottom line is that if a producer has submitted a transaction successfully, the follow-up transaction by the same producer will fail because of timing issues exclusively from the viewpoint of the application.

I guess this whole ordeal is because of the complex distributed state of the transaction, where until convergence nobody knows (not even individual brokers) if the follow-up transaction will be eligible for processing soon.

If the brokers could know that a transaction request might become valid in a short time then they should queue the incoming transaction and not tell the producer to fence itself off. This would be handling the problem on the broker.

Handling or mitigating in the client can be done as I've outlined before: make transaction batches larger so the commit of the previous one will (truly) finish before the next one is ready to be submitted. In the case where this still happens setting the retry.backoff.ms to 1 can help a tremendous amount.


I'm interested what the proper way would be to avoid this issue.

I'm very satisfied with the current solution in terms of performance. Batching the processing of many small messages avoids the problem and improves performance on its own as well.

However if I wanted to exclusively not batch the messages (commit them one-by-one) this issue would constantly be present and hinder the throughput.