tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Unexpected metadata causes IllegalStateException #302

Closed plameniv closed 5 years ago

plameniv commented 5 years ago

We ran into an java.lang.IllegalStateException while testing the EoS implementation and we are not sure what is causing it.

The scenario is as follows: in a loop, we create a KafkaJs client and a producer and we write a number of messages in a single transaction, then discard the client.

What we observe is that after a variable number of iterations the following exception happens:

kafka1_1  | [2019-02-28 19:34:07,949] ERROR TransactionMetadata(transactionalId=transactional-id, producerId=382001, producerEpoch=877, txnTimeoutMs=60000, state=Empty, pendingState=Some(Ongoing), topicPartitions=Set(), txnStartTimestamp=1551382447950, txnLastUpdateTimestamp=1551382447936)'s transition to TxnTransitMetadata(producerId=382001, producerEpoch=877, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(test-topic-1551382421868-1), txnStartTimestamp=1551382447942, txnLastUpdateTimestamp=1551382447942) failed: this should not happen (kafka.coordinator.transaction.TransactionMetadata)
kafka1_1  | [2019-02-28 19:34:07,953] ERROR [KafkaApi-0] Error when handling request {transactional_id=transactional-id,producer_id=382001,producer_epoch=877,topics=[{topic=test-topic-1551382421868,partitions=[1]}]} (kafka.server.KafkaApis)
kafka1_1  | java.lang.IllegalStateException: TransactionalId transactional-id failed transition to state TxnTransitMetadata(producerId=382001, producerEpoch=877, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(test-topic-1551382421868-1), txnStartTimestamp=1551382447942, txnLastUpdateTimestamp=1551382447942) due to unexpected metadata

(further detail below).

The Kafka cluster remains up, however, a subsequent run of the reproducer results in KafkaJSNumberOfRetriesExceeded after a number of retries on the CONCURRENT_TRANSACTIONS error. This is regardless of if we use the same topic.

Reproducer:

const ip = require("ip");
const kafkajs = require("kafkajs");

const id = Date.now();
const transactionalId = "transactional-id";
const host = process.env.HOST_IP || ip.address();
const broker = `${host}:9092`;
const topic = `test-topic-${id}`;
const acks = -1;
const maxInFlightRequests = 1;
const connectionTimeout = 1000;
const key = `sink-1-${id}`;

async function setupAdmin() {
    const client = new kafkajs.Kafka({ clientId: "reproducer", brokers: [broker] });
    const admin = client.admin();
    try {
        await admin.connect();
        await admin.createTopics({
            waitForLeaders: true,
            topics: [{ topic, numPartitions: 4, configEntries: [] }],
        });
    } finally {
        await admin.disconnect();
    }
    console.log("ADMIN SETUP DONE");
}

function createMessages(num) {
    const messages = [];
    for (let ii = 0; ii < num; ii++) {
        messages.push({ key, value: `${ii}` });
    }
    return messages;
}

async function runOneProducer(num) {
    const messages = createMessages(num);
    const client = new kafkajs.Kafka({
        clientId: "reproducer",
        brokers: [broker],
        connectionTimeout,
        maxInFlightRequests,
    });

    const producer = client.producer({
        // An idempotent producer enforces EoS messaging
        // idempotent: transactional ? true : false,
        idempotent: true,
        transactionalId,
    });
    await producer.connect();

    const transaction = await producer.transaction();
    try {
        await transaction.send({
            acks,
            topic,
            messages,
        });
    } catch (e) {
        try {
            await transaction.abort();
        } catch (e) {
            throw e;
        }
        throw e;
    }
    await transaction.commit();
    await producer.disconnect();
}

async function runProducerReplicator() {
    await setupAdmin();
    const repeats = 5000;
    const numMessages = 128;
    for (let ii = 0; ii < repeats; ii++) {
        await runOneProducer(numMessages);
        if (ii % 20 === 0) {
            console.log(ii);
        }
    }
}

runProducerReplicator();

Kafka Log:

kafka1_1  | [2019-02-28 19:34:07,944] INFO [TransactionCoordinator id=0] Initialized transactionalId transactional-id with producerId 382001 and producer epoch 876 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)
kafka1_1  | [2019-02-28 19:34:07,936] INFO [TransactionCoordinator id=0] Initialized transactionalId transactional-id with producerId 382001 and producer epoch 877 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)
kafka1_1  | [2019-02-28 19:34:07,949] ERROR TransactionMetadata(transactionalId=transactional-id, producerId=382001, producerEpoch=877, txnTimeoutMs=60000, state=Empty, pendingState=Some(Ongoing), topicPartitions=Set(), txnStartTimestamp=1551382447950, txnLastUpdateTimestamp=1551382447936)'s transition to TxnTransitMetadata(producerId=382001, producerEpoch=877, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(test-topic-1551382421868-1), txnStartTimestamp=1551382447942, txnLastUpdateTimestamp=1551382447942) failed: this should not happen (kafka.coordinator.transaction.TransactionMetadata)
kafka1_1  | [2019-02-28 19:34:07,953] ERROR [KafkaApi-0] Error when handling request {transactional_id=transactional-id,producer_id=382001,producer_epoch=877,topics=[{topic=test-topic-1551382421868,partitions=[1]}]} (kafka.server.KafkaApis)
kafka1_1  | java.lang.IllegalStateException: TransactionalId transactional-id failed transition to state TxnTransitMetadata(producerId=382001, producerEpoch=877, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(test-topic-1551382421868-1), txnStartTimestamp=1551382447942, txnLastUpdateTimestamp=1551382447942) due to unexpected metadata
kafka1_1  |     at kafka.coordinator.transaction.TransactionMetadata.throwStateTransitionFailure(TransactionMetadata.scala:390)
kafka1_1  |     at kafka.coordinator.transaction.TransactionMetadata.completeTransitionTo(TransactionMetadata.scala:326)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply$mcV$sp(TransactionStateManager.scala:534)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)
kafka1_1  |     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
kafka1_1  |     at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:525)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)
kafka1_1  |     at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
kafka1_1  |     at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
kafka1_1  |     at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
kafka1_1  |     at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
kafka1_1  |     at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:488)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply$mcV$sp(TransactionStateManager.scala:614)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:591)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1.apply(TransactionStateManager.scala:591)
kafka1_1  |     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
kafka1_1  |     at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
kafka1_1  |     at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:585)
kafka1_1  |     at kafka.coordinator.transaction.TransactionCoordinator.handleAddPartitionsToTransaction(TransactionCoordinator.scala:272)
kafka1_1  |     at kafka.server.KafkaApis.handleAddPartitionToTxnRequest(KafkaApis.scala:1744)
kafka1_1  |     at kafka.server.KafkaApis.handle(KafkaApis.scala:128)
kafka1_1  |     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
kafka1_1  |     at java.lang.Thread.run(Thread.java:748)
kafka2_1  | waiting for kafka to be ready
kafka3_1  | waiting for kafka to be ready

KafkaJs Log:

ADMIN SETUP DONE
0
{"level":"ERROR","timestamp":"2019-02-28T19:33:42.609Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":5,"size":20}
20
…
180
{"level":"ERROR","timestamp":"2019-02-28T19:33:48.652Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":4,"size":20}
200
…
860
{"level":"ERROR","timestamp":"2019-02-28T19:34:07.963Z","logger":"kafkajs","message":"[Connection] Response AddPartitionsToTxn(key: 24, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The server experienced an unexpected error when processing the request","correlationId":7,"size":48}
{"level":"ERROR","timestamp":"2019-02-28T19:34:07.964Z","logger":"kafkajs","message":"[Producer] The server experienced an unexpected error when processing the request","retryCount":0,"retryTime":342}
{"level":"ERROR","timestamp":"2019-02-28T19:34:07.971Z","logger":"kafkajs","message":"[Connection] Response EndTxn(key: 26, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":8,"size":10}
(node:14623) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 884): KafkaJSProtocolError: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
(node:14623) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

KafkaJs Log after second run of reproducer:

ADMIN SETUP DONE
{"level":"ERROR","timestamp":"2019-02-28T19:35:18.152Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":4,"size":20}
{"level":"ERROR","timestamp":"2019-02-28T19:35:18.525Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":5,"size":20}
{"level":"ERROR","timestamp":"2019-02-28T19:35:19.148Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":7,"size":20}
{"level":"ERROR","timestamp":"2019-02-28T19:35:20.358Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":8,"size":20}
{"level":"ERROR","timestamp":"2019-02-28T19:35:22.333Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":9,"size":20}
{"level":"ERROR","timestamp":"2019-02-28T19:35:26.489Z","logger":"kafkajs","message":"[Connection] Response InitProducerId(key: 22, version: 0)","broker":"10.51.16.56:9092","clientId":"reproducer","error":"The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing","correlationId":11,"size":20}
(node:14652) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 9): KafkaJSNumberOfRetriesExceeded: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
(node:14652) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
plameniv commented 5 years ago

The following comment in KAFKA-7088 mentions the same IllegalStateException:

https://issues.apache.org/jira/browse/KAFKA-7088?focusedCommentId=16769912&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16769912

tulios commented 5 years ago

That's interesting, so you are properly committing or aborting but discarding the client after one commit. I will take a look, thanks.

Nevon commented 5 years ago

Running this locally and looking at the logs, this seems to be why the transaction state transition fails:

https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala#L324

# Old state
TransactionMetadata(transactionalId=transactional-id, producerId=1, producerEpoch=1804, txnTimeoutMs=60000, state=Empty, pendingState=Some(Ongoing), topicPartitions=Set(), txnStartTimestamp=1551689126534, txnLastUpdateTimestamp=1551689126521)

#New state
TxnTransitMetadata(producerId=1, producerEpoch=1804, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(test-topic-1551689052613-3), txnStartTimestamp=1551689126527, txnLastUpdateTimestamp=1551689126527)

Not sure how it's possible, but it looks like the current state has a txnStartTimestamp that's higher than the txnStartTimestamp of the state that it's trying to transition to.

Nevon commented 5 years ago

Reading the Kafka code, I'm confused as to how this works. If I'm not mistaken, the timestamps are all set on the broker side:

https://github.com/apurvam/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L261

If the request is handled by a different broker, or if the system clock changes, I would expect this to fail, as the clocks won't be synchronized. They even removed one such check here: https://github.com/apache/kafka/pull/3286/files

Looking at the logs from the reporter, there is something curious:

kafka1_1  | [2019-02-28 19:34:07,944] INFO [TransactionCoordinator id=0] Initialized transactionalId transactional-id with producerId 382001 and producer epoch 876 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)
kafka1_1  | [2019-02-28 19:34:07,936] INFO [TransactionCoordinator id=0] Initialized transactionalId transactional-id with producerId 382001 and producer epoch 877 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)

Note how epoch 877 is initialized at 2019-02-28 19:34:07,936 and 876 at 2019-02-28 19:34:07,944. This means that at least according to the logs, epoch 876 was initialized 8ms after 877. If we then look at the difference in txStartTimestamp between the previous state and the one that is being transitioned to, the previous state is 8ms newer than the one that's being transitioned to.

Now, maybe this is a coincidence or I'm misinterpreting the logs, but this looks fishy to me.

Nevon commented 5 years ago

Just a note that I learned about recently, which may explain why we're seeing rather many CONCURRENT_TRANSACTIONS errors.

When we commit a transaction, the coordinator just writes a PrepareCommit message to the log, and then later it asynchronously writes the CompleteCommit message. This means that if you initialize another transaction before everything has been written, you'll get a CONCURRENT_TRANSACTIONS error.

I created #304, as it's related.


I've also been trying to reproduce this in different scenarios.

This certainly indicates that it is an issue in KafkaJS, but my suspicion is that it's timing-related, and running my Java consumer is A LOT slower than KafkaJS. So either I'm doing something weird, or there's something going on in the Java client that's waiting for something.

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.clients.producer.*;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaTest {

  @Test
  @DisplayName("Idempotent producer 0.11.0.2")
  public void producer() throws Exception {
    long noOfProducers = 5000;
    long noOfMessages = 128;
    String topic = "test-topic";

    Properties producerConfig = new Properties();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.3.220.56:9092");
    producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
    producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id");
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");

    for (int producers = 0; producers < noOfProducers; producers++) {
      Producer<String, String> producer = new KafkaProducer<>(producerConfig);
      producer.initTransactions(); // initiate transactions
      try {
        producer.beginTransaction(); // begin transactions
        for (int i = 0; i < noOfMessages; i++) {
          Future<RecordMetadata> result = producer
              .send(new ProducerRecord<String, String>(topic, "key-" + i, "message-" + i));
        }
        producer.commitTransaction();

      } catch (KafkaException e) {
        // For all other exceptions, just abort the transaction and try again.
        producer.abortTransaction();
      }

      producer.close();
    }

  }
}
tulios commented 5 years ago

@plameniv we ran several tests using your reproducer, and the results are inconclusive. When running against a local Kafka (non-dockerized), it works all the time. We can only reproduce the problem when running on a dockerized Kafka, and like @Nevon said it is related to a timing issue. We can't reproduce on the Java client because it can't go through the same flow fast enough. I've added a sleep call to your reproducer (right after disconnect), and managed to get it to work all the time. Kafka is removing this check, and it will make commit a blocking call, which should fix this for good.

Do you have a use case where you have to trash the producer all the time? When using transactions you should keep the same producer, so I'm assuming you are just testing the edge cases. I don't think this is a blocker for the 1.5.0 release, but I'll wait for your answer.

Thanks again for the great issues.

sklose commented 5 years ago

agreed, this is an edge case and not a blocker for the 1.5.0 release

tulios commented 5 years ago

I will release 1.5.0 then, @sklose @plameniv thanks for shaping up this release!

jishnu-purath commented 2 years ago

@plameniv we ran several tests using your reproducer, and the results are inconclusive. When running against a local Kafka (non-dockerized), it works all the time. We can only reproduce the problem when running on a dockerized Kafka, and like @Nevon said it is related to a timing issue. We can't reproduce on the Java client because it can't go through the same flow fast enough. I've added a sleep call to your reproducer (right after disconnect), and managed to get it to work all the time. Kafka is removing this check, and it will make commit a blocking call, which should fix this for good.

Do you have a use case where you have to trash the producer all the time? When using transactions you should keep the same producer, so I'm assuming you are just testing the edge cases. I don't think this is a blocker for the 1.5.0 release, but I'll wait for your answer.

Thanks again for the great issues.

Hi @tulios , Do you know in which version of Kafka they have removed this check. I am facing this while trying to use transactional producer. Just wanted to know upgrading the version would solve this.