SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

Consumer stops when partition contains transactions and multiple log segments #1336

Open jpenglert opened 5 years ago

jpenglert commented 5 years ago

Bug Report

Consumer will stop reading messages from a topic-partition that was produced into using transactions AND that has multiple log segments. The consumer just appears to get stuck when it reaches the end of the log segment. Manually advancing the offset of the consumer group by 1 to skip the transactional commit marker enables the consumer to continue.

Use log.roll.ms kafka setting to get kafka to roll log segments between transactional messages in order to reproduce. In the example below I set it to 100 ms

I used the Kafka Java client as the producer to repro this because kafka-node does not support producing with transactions.

There does not appear to be any problems reading from a topic-partition with transactions as long as there is only a single log segment. Something about having multiple log segments where the last message in a segment is a transactional commit marker is causing kafka-node consumer to get stuck.

Here is the Kafka shell output illustrating how the consumer will get stuck reading from a topic that was produced using transactions (topic test.transactions contains two messages and two transaction commit markers while topic test.no.transactions contains two messages that were NO produced inside a transaction):

root@62ccab1a6bed:/# kafka-topics --zookeeper zookeeper:2181 --list
__confluent.support.metrics
__consumer_offsets
__transaction_state
test.no.transactions
test.transactions
root@62ccab1a6bed:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list kafka:9092 --time -1 --topic test.transactions
test.transactions:0:4
root@62ccab1a6bed:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list kafka:9092 --time -1 --topic test.no.transactions
test.no.transactions:0:2
root@62ccab1a6bed:/# kafka-consumer-groups --bootstrap-server kafka:9092 --describe --group test.group.id
Consumer group 'test.group.id' has no active members.

TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.transactions    0          1               4               3               -               -               -
test.no.transactions 0          2               2               0               -               -               -
root@62ccab1a6bed:/# 

Environment

For specific cases also provide

Include Sample Code to reproduce behavior

Consumer typescript code:

import {
   ConsumerGroupStream,
   ConsumerGroupStreamOptions,
} from 'kafka-node';

const kafkaHost = 'kafka:9092';
const kafkaTopic = 'test.transactions';
const kafkaGroupId = 'test.group.id';

const consumerOptions: ConsumerGroupStreamOptions = {
   groupId: kafkaGroupId,
   id: kafkaGroupId,
   kafkaHost: kafkaHost,
   autoCommit: true,
   fromOffset: 'earliest'
};

const consumerStream: ConsumerGroupStream = new ConsumerGroupStream(consumerOptions, [kafkaTopic]);
consumerStream.on('error', e => console.error(e));
consumerStream.on('data', chunk => console.log(`Received data: ${JSON.stringify(chunk)}`));

const timeout = 30000;
console.log(`Consuming for ${timeout} ms ...`);
setTimeout(() => consumerStream.close(() => console.log('consumerStream closed')), timeout);

Producer Kotlin code:

import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.StringSerializer
import java.util.*
import java.util.concurrent.Future

/**
 * Publishes two messages where each message is published within a separate transaction.
 * This will result in the topic containing 4 messages total with a LOG-END-OFFSET of 4:
 *
 * Offsets | Contents
 * ------------------
 * 0        message 1
 * 1        transactional message for message 1
 * 2        message 2
 * 3        transactional message for message 2
 */
fun main() {
    val topic = "test.transactions"
    val producer: Producer<String, String> = createTransactionalProducer()

    publishMessageInTransaction(producer, topic, 1)
    Thread.sleep(1000)
    publishMessageInTransaction(producer, topic, 2)
}

fun publishMessageInTransaction(producer: Producer<String, String>, topic: String, num: Int) {
    producer.beginTransaction()
    try {
        val future: Future<RecordMetadata> = producer.send(ProducerRecord(topic, "transactional message $num"))
        val metadata: RecordMetadata = future.get()
        println("metadata: $metadata")
        producer.commitTransaction()
    } catch (e: Exception) {
        producer.abortTransaction()
        throw e
    }
}

fun createTransactionalProducer(): Producer<String, String> {
    val props = Properties()
    props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "kafka:9092"
    props[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = "true"
    props[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "test.transaction.id"
    props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java

    val producer = KafkaProducer<String, String>(props)
    producer.initTransactions()

    return producer
}

Include output with Debug turned on

/usr/local/bin/node /usr/local/lib/node_modules/npm/bin/npm-cli.js run start --scripts-prepend-node-path=auto

node dist/index.js

kafka-node:KafkaClient Connect attempt 1 +0ms kafka-node:KafkaClient Trying to connect to host: kafka port: 9092 +2ms kafka-node:KafkaClient test.group.id createBroker kafka:9092 +1ms Consuming for 30000 ms ... kafka-node:ConsumerGroupStream _read called +11ms kafka-node:ConsumerGroupStream consumerGroup is not ready, calling consumerGroup.connect +0ms kafka-node:ConsumerGroup Connecting test.group.id +1ms kafka-node:ConsumerGroupStream commit ignored. no commits to make. +0ms kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +4ms kafka-node:KafkaClient test.group.id sending versions request to kafka:9092 +1ms kafka-node:KafkaClient broker socket connected {"host":"kafka","port":9092} +2ms kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms kafka-node:KafkaClient Received versions response from kafka:9092 +6ms kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":1,"usable":false},"22":{"min":0,"max":1,"usable":false},"23":{"min":0,"max":1,"usable":false},"24":{"min":0,"max":1,"usable":false},"25":{"min":0,"max":1,"usable":false},"26":{"min":0,"max":1,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":1,"usable":false},"29":{"min":0,"max":1,"usable":false},"30":{"min":0,"max":1,"usable":false},"31":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":1,"usable":false},"34":{"min":0,"max":1,"usable":false},"35":{"min":0,"max":1,"usable":false},"37":{"min":0,"max":1,"usable":false},"38":{"min":0,"max":1,"usable":false},"39":{"min":0,"max":1,"usable":false},"40":{"min":0,"max":1,"usable":false},"41":{"min":0,"max":1,"usable":false},"42":{"min":0,"max":1,"usable":false},"produce":{"min":0,"max":6,"usable":2},"fetch":{"min":0,"max":8,"usable":2},"offset":{"min":0,"max":3,"usable":0},"metadata":{"min":0,"max":6,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":4,"usable":2},"offsetFetch":{"min":0,"max":4,"usable":1},"groupCoordinator":{"min":0,"max":2,"usable":0},"joinGroup":{"min":0,"max":3,"usable":0},"heartbeat":{"min":0,"max":2,"usable":0},"leaveGroup":{"min":0,"max":2,"usable":0},"syncGroup":{"min":0,"max":2,"usable":0},"describeGroups":{"min":0,"max":2,"usable":0},"listGroups":{"min":0,"max":2,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":2,"usable":0},"createTopics":{"min":0,"max":3,"usable":1},"deleteTopics":{"min":0,"max":2,"usable":false},"describeConfigs":{"min":0,"max":2,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +1ms kafka-node:KafkaClient broker is now ready +0ms kafka-node:KafkaClient broker is now ready +0ms kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'kafka', coordinatorPort: 9092, coordinatorId: 1 } +4ms kafka-node:KafkaClient test.group.id refreshBrokerMetadata() +2ms kafka-node:KafkaClient found 1 connected broker(s) +1ms kafka-node:ConsumerGroupRecovery tryToRecoverFrom connect { BrokerNotAvailableError: Broker not available: Could not find broker at new BrokerNotAvailableError (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9) at KafkaClient.Client.sendGroupRequest (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:186:15) at KafkaClient.Client.sendJoinGroupRequest (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:197:8) at /Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/consumerGroup.js:543:21 at nextTask (/Users/user/src/bugs/ntg-1309/node_modules/async/dist/async.js:5324:14) at next (/Users/user/src/bugs/ntg-1309/node_modules/async/dist/async.js:5331:9) at /Users/user/src/bugs/ntg-1309/node_modules/async/dist/async.js:969:16 at KafkaClient.wrappedFn (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/kafkaClient.js:481:14) at KafkaClient.Client.invokeResponseCallback (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:593:10) at KafkaClient.Client.handleReceivedData (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:569:10) message: 'Broker not available: Could not find broker' } +1ms kafka-node:ConsumerGroupRecovery RECOVERY from connect: test.group.id retrying in 1000 ms { BrokerNotAvailableError: Broker not available: Could not find broker at new BrokerNotAvailableError (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9) at KafkaClient.Client.sendGroupRequest (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:186:15) at KafkaClient.Client.sendJoinGroupRequest (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:197:8) at /Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/consumerGroup.js:543:21 at nextTask (/Users/user/src/bugs/ntg-1309/node_modules/async/dist/async.js:5324:14) at next (/Users/user/src/bugs/ntg-1309/node_modules/async/dist/async.js:5331:9) at /Users/user/src/bugs/ntg-1309/node_modules/async/dist/async.js:969:16 at KafkaClient.wrappedFn (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/kafkaClient.js:481:14) at KafkaClient.Client.invokeResponseCallback (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:593:10) at KafkaClient.Client.handleReceivedData (/Users/user/src/bugs/ntg-1309/node_modules/kafka-node/lib/baseClient.js:569:10) message: 'Broker not available: Could not find broker' } +1ms kafka-node:KafkaClient test.group.id updated internal metadata +9ms kafka-node:KafkaClient test.group.id updated internal metadata +10ms kafka-node:ConsumerGroup Connecting test.group.id +984ms kafka-node:ConsumerGroupStream commit ignored. no commits to make. +0ms kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'kafka', coordinatorPort: 9092, coordinatorId: 1 } +3ms kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["test.transactions"],"version":0,"id":"test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be"}],"generationId":3,"groupProtocol":"roundrobin","leaderId":"test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be","memberId":"test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be"} from test.group.id +3s kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ 'test.transactions' ], version: 0, userData: undefined, id: 'test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be' } ] +1ms kafka-node:ConsumerGroup Using group protocol roundrobin +0ms kafka-node:ConsumerGroup loadingMetadata for topics: [ 'test.transactions' ] +2ms kafka-node:ConsumerGroup mapTopicToPartitions { 'test.transactions': [ '0' ] } +3ms kafka-node:Roundrobin topicPartition: {"test.transactions":["0"]} +0ms kafka-node:Roundrobin groupMembers: [{"subscription":["test.transactions"],"version":0,"id":"test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be"}] +0ms kafka-node:Roundrobin members [ 'test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be' ] +0ms kafka-node:Roundrobin subscribers { 'test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be': [ 'test.transactions' ] } +0ms kafka-node:Roundrobin round robin on topic partition pairs: [ { topic: 'test.transactions', partition: '0' } ] +1ms kafka-node:ConsumerGroup SyncGroup Request from test.group.id-b87a3385-40b2-43a9-858f-6b2501e2b9be +0ms kafka-node:ConsumerGroup SyncGroup Response +26ms kafka-node:ConsumerGroup test.group.id owns topics: { 'test.transactions': [ 0 ] } +0ms kafka-node:ConsumerGroup test.group.id fetchOffset Response: {"test.transactions":{"0":-1}} +4ms kafka-node:ConsumerGroup No saved offsets +0ms kafka-node:ConsumerGroup test.group.id defaultOffset Response for earliest: {"test.transactions":{"0":0}} +10ms kafka-node:ConsumerGroup generationId 3 +4ms kafka-node:ConsumerGroup startFetch is true +0ms kafka-node:KafkaClient test.group.id createBroker kafka:9092 +1ms kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms kafka-node:KafkaClient waitUntilReady [BrokerWrapper kafka:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms kafka-node:ConsumerGroup test.group.id is leader scheduled new topic/partition check +0ms kafka-node:ConsumerGroup test.group.id started heartbeats at every 10000 ms +0ms kafka-node:KafkaClient test.group.id sending versions request to kafka:9092 +1ms kafka-node:KafkaClient Received versions response from kafka:9092 +3ms kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":1,"usable":false},"22":{"min":0,"max":1,"usable":false},"23":{"min":0,"max":1,"usable":false},"24":{"min":0,"max":1,"usable":false},"25":{"min":0,"max":1,"usable":false},"26":{"min":0,"max":1,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":1,"usable":false},"29":{"min":0,"max":1,"usable":false},"30":{"min":0,"max":1,"usable":false},"31":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":1,"usable":false},"34":{"min":0,"max":1,"usable":false},"35":{"min":0,"max":1,"usable":false},"37":{"min":0,"max":1,"usable":false},"38":{"min":0,"max":1,"usable":false},"39":{"min":0,"max":1,"usable":false},"40":{"min":0,"max":1,"usable":false},"41":{"min":0,"max":1,"usable":false},"42":{"min":0,"max":1,"usable":false},"produce":{"min":0,"max":6,"usable":2},"fetch":{"min":0,"max":8,"usable":2},"offset":{"min":0,"max":3,"usable":0},"metadata":{"min":0,"max":6,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":4,"usable":2},"offsetFetch":{"min":0,"max":4,"usable":1},"groupCoordinator":{"min":0,"max":2,"usable":0},"joinGroup":{"min":0,"max":3,"usable":0},"heartbeat":{"min":0,"max":2,"usable":0},"leaveGroup":{"min":0,"max":2,"usable":0},"syncGroup":{"min":0,"max":2,"usable":0},"describeGroups":{"min":0,"max":2,"usable":0},"listGroups":{"min":0,"max":2,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":2,"usable":0},"createTopics":{"min":0,"max":3,"usable":1},"deleteTopics":{"min":0,"max":2,"usable":false},"describeConfigs":{"min":0,"max":2,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms kafka-node:KafkaClient broker is now ready +0ms Received data: {"topic":"test.transactions","value":"transactional message 1","offset":0,"partition":0,"highWaterOffset":4,"key":null,"timestamp":"2019-10-08T16:29:48.401Z"} kafka-node:ConsumerGroupStream _read called +9ms kafka-node:ConsumerGroupStream committing [ { topic: 'test.transactions', partition: 0, offset: 1, metadata: 'm' } ] +0ms kafka-node:ConsumerGroupStream setting committing to false +5s kafka-node:ConsumerGroupStream commit ignored. no commits to make. +21s kafka-node:ConsumerGroup test.group.id leaving group +0ms kafka-node:KafkaClient close client +24ms consumerStream closed kafka-node:KafkaClient test.group.id socket closed kafka:9092 (hadError: false) +3ms kafka-node:KafkaClient clearing kafka:9092 callback queue without error +0ms kafka-node:KafkaClient test.group.id socket closed kafka:9092 (hadError: false) +0ms kafka-node:KafkaClient clearing kafka:9092 callback queue without error +0ms

Process finished with exit code 0

jpenglert commented 5 years ago

Related: