Describe the bug
When enable transactionCoordinator by setting kafkaTransactionCoordinatorEnabled=true , The exception keeps occured in kafka client
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 10000 milliseconds while awaiting InitProducerId
And debug msg:
[2023-01-17 01:07:14,223] DEBUG [Producer clientId=producer-xgq0e21b, transactionalId=xgq0e21b] Received INIT_PRODUCER_ID response from node 1599868786 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-xgq0e21b, correlationId=62): InitProducerIdResponseData(throttleTimeMs=0, errorCode=16, producerId=-1, producerEpoch=-1) (org.apache.kafka.clients.NetworkClient)
The errocode=16 means NOT_COORDINATOR
This is caused by inconsistently function to generate the partitionId in the transaction , there are two place to use partition id
1) Get topic broker for TransactionCoordinator(based on __transaction_state-{partition-id}) , the partition id is generated by
TransactionCoordinator
public static int partitionFor(String transactionalId, int transactionLogNumPartitions) {
return MathUtils.signSafeMod(
transactionalId.hashCode(),
transactionLogNumPartitions
);
}
2) Get TransactionMeta from cache according to the partition id
TransactionStateManager
public int partitionFor(String transactionalId) {
return Utils.abs(transactionalId.hashCode()) % transactionTopicPartitionCount;
}
For Example, when setting props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "qlf8pp81") in kafka client , The partitionId in the two place FOR "qlf8pp81" is different :
16 for getting topic broker for TransactionCoordinator
34 for getting TransactionMetadata from Transaction Cache
The inconsistently function to generate the partitionId will trigger the exception in transactionStateManager
Map<String, TransactionMetadata> metadataMap = transactionMetadataCache.get(partitionId);
if (metadataMap == null) {
return new ErrorsAndData<>(Errors.NOT_COORDINATOR, Optional.empty());
}
To Reproduce
Steps to reproduce the behavior:
Enable transaction by setting :kafkaTransactionCoordinatorEnabled=true and transactionLogNumPartitions=50
Choose the different partition Id generated for same transaction id , eg: qlf8pp81, xgq0e21b。 Ensure that there is no transactionMetadata for the txid in transactionMetadataCache(Map<Integer {partitionId}, Map<tx_id, TransactionMetadata>>) , you can add debug info in TransactionStateManager to print the cache :
log.debug("The Cache content before Add Transaction State: {}", transactionMetadataCache);
The content in cache for example: {2={}, 22={}, 6={}, 26={}}
Kafka Example use transaction , kernel node as below
.....
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xgq0e21b") ;
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.initTransactions();
producer.beginTransaction();
Future metadataFuture = producer.send(new ProducerRecord<>("cptest", "world"));
....
producer.commitTransaction();
producer.close();
Run the test , the exception will occure , add the debug in transactionStateManater
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - The partitionId : 44 for transactionId: xgq0e21b
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - The Cache content before Add Transaction State: {2={}, 22={}, 6={}, 26={}}
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - Transaction Metadata Cache content for partition Id:44 for metadataMap:null
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - There is not coordiantor for partitionId: 44
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - The Content in transactionMetadataCache : {2={}, 22={}, 6={}, 26={}}
Expected behavior
Use the same function to generate the partition id (Function: partitionFor(String transactionalId) )in TransactionCoordinator and TransactionStateManager . The bug is resolved.
When load bundle to a broker , kop.NamespaceBundleOwnershipListenerImpl will trigger to put the empty TransactionMetadataMap for the transaction state partition belong to the the bundle.
Describe the bug When enable transactionCoordinator by setting kafkaTransactionCoordinatorEnabled=true , The exception keeps occured in kafka client
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 10000 milliseconds while awaiting InitProducerId
And debug msg:
[2023-01-17 01:07:14,223] DEBUG [Producer clientId=producer-xgq0e21b, transactionalId=xgq0e21b] Received INIT_PRODUCER_ID response from node 1599868786 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-xgq0e21b, correlationId=62): InitProducerIdResponseData(throttleTimeMs=0, errorCode=16, producerId=-1, producerEpoch=-1) (org.apache.kafka.clients.NetworkClient)
The errocode=16 means NOT_COORDINATOR
This is caused by inconsistently function to generate the partitionId in the transaction , there are two place to use partition id 1) Get topic broker for TransactionCoordinator(based on __transaction_state-{partition-id}) , the partition id is generated by
2) Get TransactionMeta from cache according to the partition id
For Example, when setting props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "qlf8pp81") in kafka client , The partitionId in the two place FOR "qlf8pp81" is different : 16 for getting topic broker for TransactionCoordinator 34 for getting TransactionMetadata from Transaction Cache
The inconsistently function to generate the partitionId will trigger the exception in transactionStateManager
To Reproduce Steps to reproduce the behavior:
Enable transaction by setting :kafkaTransactionCoordinatorEnabled=true and transactionLogNumPartitions=50
Choose the different partition Id generated for same transaction id , eg: qlf8pp81, xgq0e21b。 Ensure that there is no transactionMetadata for the txid in transactionMetadataCache(Map<Integer {partitionId}, Map<tx_id, TransactionMetadata>>) , you can add debug info in TransactionStateManager to print the cache : log.debug("The Cache content before Add Transaction State: {}", transactionMetadataCache); The content in cache for example: {2={}, 22={}, 6={}, 26={}}
Kafka Example use transaction , kernel node as below ..... props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xgq0e21b") ; KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); producer.initTransactions(); producer.beginTransaction(); Future metadataFuture = producer.send(new ProducerRecord<>("cptest", "world"));
....
producer.commitTransaction();
producer.close();
Run the test , the exception will occure , add the debug in transactionStateManater
Expected behavior Use the same function to generate the partition id (Function: partitionFor(String transactionalId) )in TransactionCoordinator and TransactionStateManager . The bug is resolved.
When load bundle to a broker , kop.NamespaceBundleOwnershipListenerImpl will trigger to put the empty TransactionMetadataMap for the transaction state partition belong to the the bundle.