aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.14k stars 230 forks source link

[QUESTION] Rebalance during transaction breaks exactly-once semantics? #844

Open sauerburger opened 2 years ago

sauerburger commented 2 years ago

I try to understand how to implement exactly-once guarantees with the transactional consume-process-produce example.

I use the example almost verbatim. I've added a few print statements and added a sleep to simulate a long processing duration.

                ...
                print("Start sleep")
                sleep(4)
                print("Done sleep. Start commit")
                await producer.send_offsets_to_transaction(
                    commit_offsets, GROUP_ID)
                print("Commit done")
                ...

Besides this, I added a subscribe() call with my custom ConsumerRebalanceListener to see what's happening.

class MyRebalanceListener(ConsumerRebalanceListener):
    def on_partitions_assigned(self, partitions):
        print("Assigned:", sorted([p.partition for p in partitions]))

    def on_partitions_revoked(self, partitions):
        print("Revoked:", sorted([p.partition for p in partitions]))

My understanding is that the send_offsets_to_transaction() should fail if it tries to commit to partitions whose assignment was revoked. To stress test this, I'm adding a worker when the current worker is sleeping, i.e after the output was send but before the offsets were committed. I see log output that indicates

(Processing of event)
Start sleep
Done sleep. Start commit
Heartbeat failed for group processing-group because it is rebalancing
Revoked: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Assigned: [0, 2, 4, 6, 8]
Commit done

So it seems that the commit succeeded after new partitions were assigned. In the target topic I see duplicated messages, one produced by each worker.

Where does this go wrong? Why doesn't it give me exactly-once guarantees? I can provide the runnable example if this makes it more clear. Shouldn't the transaction fail/be aborted if a rebalance happens in the background?

sauerburger commented 2 years ago

Was someone able to reproduce this? Am I wrong to assume exactly-once semantics?

sauerburger commented 2 years ago

After reading Improved Robustness and Usability of Exactly-Once Semantics in Apache Kafka, I'm now convinced that this is not implemented in aiokafka. Initially, exactly-once semantics required static partition assignments. With KIP-447, dynamic assignments were made possible. To fence against zombie producers (basically what I stage with the example above), the sendOffsetsToTransaction call needs to receive GroupMetadata which includes the generationId. However, in aiokafka, only the group id is used, which is not enough to fence against commits after the rebalance.

CMIIW, if exactly-once semantics are strictly required with dynamic partition assignments, the only option at the moment is the Java implementation.

vmaurin commented 1 year ago

@sauerburger As I understood, the zombie fencing is managed by the "transactional id" settings that is not really explained in the aiokafka example. On my implementation of an exactly-once semantics of aiokafka, I did create a producer pool, that is implementing ConsumerRebalanceListener so I am creating a producer with a transactional id specific to an input topic-partition. This way, I am expecting the "zombie" producer to get a rejection while submitting the "commit" message to the transaction topics.

With my loop, I am getting aiokafka.errors.ProducerFenced: ProducerFenced: There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms) with a similar example