confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
7.37k stars 3.11k forks source link

Transactional producer silently fails to commit a transaction. #4697

Open vncorpacc opened 2 months ago

vncorpacc commented 2 months ago

Description

Versions: in the checklist. We are using confluent kafka python bindings. This is largely a copy-paste from https://github.com/redpanda-data/redpanda/issues/17921. It is not clear to me whether the issue lies with a misbehaving broker or a client library. Would appreciate any insights.

Transactional producer silently fails to commit a batch: commit_transaction returns without exceptions but messages are not committed after an inspection of a topic.

The issue was encountered at least 3 times. First 2 times our producer detected an error at an unexpected scenario: after a certain call to produce() (rd_kafka_produce) that is called some time after a begin_transaction(). produce() returned -172 (wrong state). After the inspection of the state of the topic it was obvious that a call to commit_transaction() that preceded the last begin_transaction() succeeded as we crash on any errors there. But in redpanda messages were not really committed - they did not show up on the redpanda console and when we tried to read those supposedly committed messages programmatically - the request timed out. The producer also got offsets for those uncommitted messages in an on_delivery callback from redpanda meaning that they reached redpanda but were not committed. For the 3rd time the producer did throw on commit_transaction but I still include this here as it seems relevant.

confluent_kafka did not output any logs while this was happening. For 2 times when the producer noticed an error only on a produce call confluent_kafka did not log anything so we only have an error code: cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Unable to produce message: Local: Erroneous state"}.

For the 3 time when the producer did fail at commit: Error: _PURGE_QUEUE. Retriable: False. Fatal: False"}

How to reproduce

We couldn't reproduce the issue in any different scenario: it does not fail in our testing environment with the same exact code for the producer. It does however manifests itself reliably in the production environment.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

emasab commented 1 week ago

Can you share some code to reproduce this?

vncorpacc commented 1 week ago

Unfortunately this is not something even we were able to reproduce reliably. It stopped happening after yet another redpanda restart. I am not sure what code you expect to see. The outline would be:

producer = confluent_kafka.Producer(cfg)
producer.init_transactions()
while not stop_requested():
    msgs = get_msgs_to_send()
    batch_requests_results = []
    producer.begin_transaction()
    for topic, value, partition in msgs:
        batch_requests_results.append(None)
        msg_num = len(batch_requests_results) - 1
        def on_delivery(
            err: confluent_kafka.KafkaError, msg: confluent_kafka.Message
        ):
            if err is None or err.code() == confluent_kafka.KafkaError.NO_ERROR:
                batch_requests_results[msg_num] = msg
            else:
                batch_requests_results[msg_num] = err
        producer.produce(topic,value, partition, on_delivery=on_delivery)
    producer.flush()
    offsets_and_metadata = get_offsets_or_raise(batch_requests_results)
    producer.commit_transaction()
    # We got here! And crashed only on a subsequent produce() in the next batch.
    # Those offsets were not reachable afterwards - they were not really committed.
    store_in_db(offsets_and_metadata)