confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
277 stars 3.15k forks source link

Transactional client showing producer fenced error even when only a single producer is used #3928

Open abolipai99 opened 2 years ago

abolipai99 commented 2 years ago

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions

Description

We wrote an application which deploys a transactional client (transactional consumer +. producer) . There is a single transactional producer created with a unique transactional id. After few hours, the producer shows this error due to which the application shuts down:

%4|1657854001.949|GETPID|rdkafka#producer-3| [thrd:main]: Failed to acquire transactional PID from broker TxnCoordinator/3: Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing: retrying
%1|1657854002.147|TXNERR|rdkafka#producer-3| [thrd:main]: Fatal transaction error: Producer fenced by newer instance: Failed to acquire transactional PID from broker TxnCoordinator/3: Broker: There is a newer producer with the same transactionalId which fences the current one (_FENCED)
%0|1657854002.147|FATAL|rdkafka#producer-3| [thrd:main]: Fatal error: Local: This instance has been fenced by a newer instance: Producer fenced by newer instance: Failed to acquire transactional PID from broker TxnCoordinator/3: Broker: There is a newer producer with the same transactionalId which fences the current one

During the run, no other producer instances were created.

How to reproduce

This is the application code which uses the confluent-kafka-python library. (input topics, producer and consumer configurations are taken as user input)


def init_transactions(producer, retry_num):
    logging.info(f"Initiating transaction for producer: {producer}")
    retriable_error=None
    for i in range(retry_num):
        try:
            # Start producer transaction.
            producer.init_transactions()
            return
        except Exception as e:
            if e.args[0].retriable():
                retriable_error=e
                time.sleep(3)
                logging.warn(f'Retriable error in init_transactions - {e}')
                continue
            else:
                logging.error(f'Failed in init_transactions with error - {e}')
                raise(e)

    logging.error(f"Retriable error seen - {retriable_error}: Number of retries : {retry_num} is over")
    raise(retriable_error)

def begin_transaction(producer, retry_num):
    logging.info(f"Beginning transaction for producer: {producer}")
    retriable_error = None
    for i in range(retry_num):
        try:
            # Start producer transaction.
            producer.begin_transaction()
            return
        except Exception as e:
            if e.args[0].retriable():
                retriable_error = e
                logging.warn(f'Retriable error in begin_transactions - {e}')
                continue
            else:
                logging.error(f'Failed in begin_transactions with error - {e}')
                raise(e)

    logging.error(f"Retriable error seen - {retriable_error}: Number of retries : {retry_num} is over")
    raise(retriable_error)

def abort_transaction(producer, consumer, retry_num=5):
    logging.info(f"Aborting transaction for producer: {producer}")
    retriable_error = None
    for i in range(retry_num):
        try:
            producer.abort_transaction()
            '''
            Seeking to the previous offset since transaction was aborted
            '''
            partitions = consumer.assignment()
            offsets = consumer.committed(partitions)
            for tp in offsets:
                consumer.seek(TopicPartition(tp.topic, tp.partition, tp.offset))
            return
        except Exception as e:
            if e.args[0].retriable():
                retriable_error = e
                logging.warn(f'Retriable error in abort_transactions - {e}')
                continue
            logging.info(f"Failed abort_transaction with error - {e}")
            raise(e)

    logging.error(f"Retriable error seen - {retriable_error}: Number of retries : {retry_num} is over")
    raise(retriable_error)

def send_offsets(producer, consumer, retry_num):
    logging.info(f"Sending offsets for producer: {producer}")
    retriable_error = None
    for i in range(retry_num):
        try:
                producer.send_offsets_to_transaction(
                    consumer.position(consumer.assignment()),
                    consumer.consumer_group_metadata())
                return True
        except Exception as e:
            if e.args[0].txn_requires_abort():
                logging.error(f"Abortable error in send_offsets: {e}")
                abort_transaction(producer, consumer)
                return False
            elif e.args[0].retriable():
                retriable_error=e
                logging.warn("Retriable error seen in send_offsets - {}".format(e))
                continue
            else:
                logging.error("Fatal error seen in send_offsets- {}".format(e))
                raise(e)

    logging.error(f"Retriable error seen - {retriable_error}: Number of retries : {retry_num} is over")
    abort_transaction(producer, consumer)
    raise(retriable_error)

def commit_transaction(producer, consumer, retry_num):
    logging.info(f"Committing transaction for producer: {producer}")
    retriable_error = None
    for i in range(retry_num):
        try:
            producer.commit_transaction()
            return True
        except Exception as e:
            if e.args[0].txn_requires_abort():
                logging.error("Abortable error seen - {}".format(e))
                abort_transaction(producer, consumer)
                return False
            elif e.args[0].retriable():
                retriable_error = e
                logging.warn("Retriable error seen - {}".format(e))
                continue
            else:
                logging.error("Fatal error seen - {}".format(e))
                raise(e)

    logging.error(f"Retriable error seen - {retriable_error}: Number of retries : {retry_num} is over")
    abort_transaction(producer, consumer)
    raise(retriable_error)

def main():
     producer = SerializingProducer(producer_mod_conf)
     consumer = DeserializingConsumer(consumer_mod_conf)

     consumer.subscribe([input_topics])

    init_transactions(producer, retry_count)
    begin_transaction(producer, retry_count)

    msg_cnt = 0
    logging.info("=== Starting Consume-Transform-Process loop ===")
    while True:
        # serve delivery reports from previous produce()s
        producer.poll(0)

        # read message from input_topic
        msg = consumer.poll(timeout=50.0)
        if msg is None:
            continue
        if msg.error():
            continue
        msg_cnt += 1

        logging.debug(f"Consumed message: {msg.key()}, {msg.value()}")
        # process message
        processed_key, processed_value = process_input(msg, message_type=message_type)
        logging.debug(f"Processed key: {msg.key()}, Processed value: {msg.value()}")
        try:
            # produce transformed message to output topic
            producer.produce(output_topic, value=processed_value, headers=msg.headers(), key=processed_key, on_delivery=delivery_report)
        except Exception as e:
            logging.error(f"Error seen while producing - {e}")
            abort_transaction(producer, consumer, retry_count)
            begin_transaction(producer, retry_count)
            msg_cnt = 0
            continue
        if msg_cnt % transaction_size == 0:
            # Send the consumer's position to transaction to commit
            # them along with the transaction, committing both
            # input and outputs in the same transaction is what provides EOS.

            should_commit = send_offsets(producer, consumer, retry_count)
            # Commit the transaction
            if should_commit:
                commit_transaction(producer, consumer, retry_count)
            else:
                logging.error("Transaction commit has failed; same transaction will be retried")
            begin_transaction(producer, retry_count)
            msg_cnt = 0

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

mhowlett commented 2 years ago

thanks - i have reproduced, looking into it.

it appears to be a bug in the KIP-360 implementation whereby librdkafka may errantly make a second InitProducerId call.

mhowlett commented 2 years ago

additional InitProducerId calls seem to only occur after a concurrent_transactions error result though. and this is also what the java implementation seems to do. still looking into it.