tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.73k stars 526 forks source link

When a new consumer is added, the rebalance is completed multiple times triggering the EachMessage function #1097

Open 521707 opened 3 years ago

521707 commented 3 years ago

Describe the bug When a new consumer is added, the rebalance is completed multiple times triggering the EachMessage function I want to wait for the end of the previous one when executing the next EachMessage Or if you rebalance and execute a new EachMessage, notify me to delete the old one.

Under the same task, I should wait for the current task to complete before performing the next task; If rebalance exists, all processes reenter immediately, regardless of whether the current task has ended or not How do I ensure that only one task is running?

To Reproduce

  1. The consumer is executing EachMessage
  2. A new consumer has joined
  3. Rebalancing is in progress. The old user may still be executing EveryMessage. After the rebalancing is complete, everyone will immediately execute EachMessage, and the old user will have 2 EachMessage running

Expected behavior After rebalancing, the old EachMessage should be destroyed instead of both being retained and executed. When you need to run a new message after a rebalance, you should make sure that the previous one has finished executing

Observed behavior try to make sure that only one task is running by asking if the previous message has been completed in a loop

  for (let message of batch.messages) {
                if (!isRunning() || isStale()) break
                if (service.inService) {
                    Logger.debug(`Unable to run multiple:${partition}-${message.offset}`)
                    await new Promise((resolve) => {
                        let time = setInterval(() => {
                            if (!service.inService) {
                                clearInterval(time)
                                resolve(true)
                            }
                        }, 1000)
                    })
                }
// ....
}

I also tried to use these arrays to make sure that only one task was running

 const addQueue = (function () {
        const list = []
        let state = 'end'
        return async (call) => {
            if (state === 'end') {
                state = 'start'
                await call()
                while (list.length) {
                    const fn = list.shift()
                    await fn()
                }
                state = 'end'
            } else {
                list.push(call)
            }
        }
    })()
 // ....
 for (let message of batch.messages) {
                    if (!isRunning() || isStale()) break 
                    await new Promise((resolve, reject) => {
                        addQueue(async () => {

                            // await handel...
                            resolve(true)
                        })
                    })
 // ....

Each error results in a doubling of the number of runs being made

let number = 0
consumer.run({
        async eachBatch(payload) {
            const { isRunning, isStale, batch } = payload
            for (let message of batch.messages) {
                if (!isRunning() || isStale()) break
                number++
                if (number % 3) throw 'err' 
            }
        }
    })

Environment: "kafkajs": "^1.15.0",

Additional context

const demo = async (index: number, topicName) => {
     const kafka: Kafka = new Kafka(KafkaConfig)
    const consumer = kafka.consumer({
        groupId: topicName + '12',
        sessionTimeout: 1000 * 60 * 30,
        heartbeatInterval: 1000 * 60 * 10
    })
    await consumer.connect()
    await consumer.subscribe({ topic: topicName, fromBeginning: true, })

    const service = {
        inService: false,
        handleMessage: async (value: any, call = () => { }) => {
            service.inService = true
            const time = setInterval(call, 1000)
            await new Promise(r => setTimeout(r, Math.random() * 50000))
            clearInterval(time)
            service.inService = false
        }
    }

    consumer.run({
        eachBatchAutoResolve: true,
        autoCommitThreshold: 1,
        autoCommit: true,
        async eachBatch(payload) {
            const { isRunning, isStale, resolveOffset, heartbeat, batch, } = payload
            const { partition, topic } = batch
            for (let message of batch.messages) {
                if (!isRunning() || isStale()) break
                if (service.inService) {
                    Logger.debug(`Unable to run multiple:${partition}-${message.offset}`)
                }
                Logger.log(`current task:${partition}-${message.offset}`)

                let error;
                await service.handleMessage('', async () => {
                    try {
                        console.log(`heartbeat:${partition}-${message.offset}`)
                        !error && await heartbeat()
                    } catch (err) {
                        console.error('heartbeat-error')
                        error = err
                    }
                })
                if (error) throw error

                Logger.log(`end:${partition}-${message.offset}`, ' number')
                await resolveOffset(message.offset)
                await payload.commitOffsetsIfNecessary(payload.uncommittedOffsets())
                await heartbeat()
            }

        },
    })
}

After waiting for the above demo to execute eachMessage, run the demo again, it will be rebalanced, and the first function will have 2 executions of eachMessage

// Wait for the top to start running before starting a process.

Process 1 partial log

{"level":"INFO","timestamp":"2021-05-17T03:10:30.244Z","logge] Starting","groupId":"city12"} {"level":"INFO","timestamp":"2021-05-17T03:11:30.382Z","loggeGroup] Consumer has joined the group","groupId":"city12","memc-bb14-87dc8e333d7e","leaderId":"mt-shop-a9a400a8-c253-483c-b,"memberAssignment":{"city":[0,1,10,11,12,13,14,15,16,17,18,1,3,30,31,32,33,34,35,36,37,38,39,4,40,41,42,43,44,45,46,47,48RoundRobinAssigner","duration":60021} [Nest] 12548 - 2021/05/17 上午11:11:30 current task:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 [Nest] 12548 - 2021/05/17 上午11:11:37 [ number] end:32- [Nest] 12548 - 2021/05/17 上午11:11:37 current task:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 [Nest] 12548 - 2021/05/17 上午11:12:18 [ number] end:32- [Nest] 12548 - 2021/05/17 上午11:12:18 current task:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 [Nest] 12548 - 2021/05/17 上午11:12:33 [ number] end:32- [Nest] 12548 - 2021/05/17 上午11:12:33 current task:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 [Nest] 12548 - 2021/05/17 上午11:13:03 [ number] end:32-4 +30155ms {"level":"ERROR","timestamp":"2021-05-17T03:13:03.313Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":11,"size":28} {"level":"ERROR","timestamp":"2021-05-17T03:13:03.352Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":12,"size":28} [Nest] 12548 - 2021/05/17 上午11:13:03 current task:42-0 +176ms {"level":"ERROR","timestamp":"2021-05-17T03:13:03.488Z","logger":"kafkajs","message":"[Runner] The coordinator is not aware of this member, re-joining the group","groupId":"city12","memberId":"mt-shop-a9a400a8-c253-483c-bb14-87dc8e333d7e","error":"The coordinator is not aware of this member","retryCount":0,"retryTime":292} heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 [Nest] 12548 - 2021/05/17 上午11:13:43 [ number] end:42-0 +40314ms {"level":"ERROR","timestamp":"2021-05-17T03:14:03.498Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator number] end:42- is not aware of this member","correlationId":16,"size":34} 4:03.498Z","logg {"level":"ERROR","timestamp":"2021-05-17T03:1e OffsetCommit(k4:03.500Z","logger":"kafkajs","message":"[Cont.com:9092","clinection] Response OffsetCommit(key: 8, versionot aware of thin: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator 4:03.500Z","logg is not aware of this member","correlationId":e OffsetCommit(k18,"size":34} t.com:9092","cli [Nest] 12548 - 2021/05/17 上午11:14:03 cunot aware of thirrent task:29-0 +20017ms {"level":"INFO","timestamp":"2021-05-17T03:14rrent task:29-0:03.821Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","gr:03.821Z","loggeoupId":"city12","memberId":"mt-shop-955ab6d3-mer has joined tdaf4-471f-863a-b07e26d97bc6","leaderId":"mt-shop-955ab6d3-dafhop-955ab6d3-daf4-471f-863a-b07e26d97bc6","is-955ab6d3-daf4-4Leader":true,"memberAssignment":{"city":[0,1,rAssignment":{"c11,12,13,14,15,20,22,24,25,26,28,29,30,32,33,,30,32,33,39,4,439,4,41,44,45,48,49,7]},"groupProtocol":"RounAssigner","duratdRobinAssigner","duration":60332} [Nest] 12548 - 2021/05/17 上午11:14:03 Unable to run multable to run multiple:1-0 +45ms [Nest] 12548 - 2021/05/17 上午11:14:04 current task:1-0 rrent task:1-0 +271ms heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 ... [Nest] 12548 - 2021/05/17 上午11:14:14 [ number] end:29-0 +10211ms heartbeat:1-0 [Nest] 12548 - 2021/05/17 上午11:14:14 current task:29-1 +36ms heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 ... [Nest] 12548 - 2021/05/17 上午11:14:39 [ number] end:29-1 +25011ms [Nest] 12548 - 2021/05/17 上午11:14:39 current task:29-2 +2ms heartbeat:1-0 heartbeat:29-2 [Nest] 12548 - 2021/05/17 上午11:14:40 [ number] end:29-2 +1157ms heartbeat:1-0 [Nest] 12548 - 2021/05/17 上午11:14:40 current task:29-3 +3ms heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 ... [Nest] 12548 - 2021/05/17 上午11:14:53 [ number] end:1-0 +13007ms [Nest] 12548 - 2021/05/17 上午11:14:53 current task:1-1 +3ms heartbeat:29-3 [Nest] 12548 - 2021/05/17 上午11:14:54 [ number] end:29-3 +1030ms [Nest] 12548 - 2021/05/17 上午11:14:54 current task:29-4 +3ms heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 ... [Nest] 12548 - 2021/05/17 上午11:15:03 [ number] end:1-1 +8481ms [Nest] 12548 - 2021/05/17 上午11:15:03 current task:1-2 +2ms heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 ... [Nest] 12548 - 2021/05/17 上午11:15:20 [ number] end:1-2 +17388ms {"level":"ERROR","timestamp":"2021-05-17T03:15:20.471Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":29,"size":28} {"level":"ERROR","timestamp":"2021-05-17T03:15:20.672Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":30,"size":28} [Nest] 12548 - 2021/05/17 上午11:15:20 current task:15-0 +204ms {"level":"ERROR","timestamp":"2021-05-17T03:15:20.674Z","logger":"kafkajs","message":"[Runner] The coordinator is not aware of this member, re-joining the group","groupId":"city12","memberId":"mt-shop-955ab6d3-daf4-471f-863a-b07e26d97bc6","error":"The coordinator is not aware of this member","retryCount":0,"retryTime":283} heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 ...

521707 commented 3 years ago

@t-d-d @Nevon @tulios help

t-d-d commented 3 years ago

@521707 I don't have time to look at this in detail. But first thing I would try is to not call consumer.commitOffsets() from with your eachMessage() function - I'm not sure if that will work.

Instead, if you want to commit after every message use autoCommit: true and autoCommitThreshold: 1.

521707 commented 3 years ago

@521707 I don't have time to look at this in detail. But first thing I would try is to not call consumer.commitOffsets() from with your eachMessage() function - I'm not sure if that will work.

Instead, if you want to commit after every message use autoCommit: true and autoCommitThreshold: 1.

I updated the problem and used autoCommit: true and autoCommitThreshold: 1

There are still outcomes that I don't want , I added the log

tong3jie commented 3 years ago

did you solved?

mark-b-ab commented 2 years ago

I solved that by handling batches myself. If on heartbeat I receive rebalance error - I stop fetching, wait to finish the job (make sure that your session interval bigger that time to process job), commit offsets and start rejoin.

mdmuhtasimfuadfahim commented 1 year ago

Hi @mark-b-ab. Can you please provide me the code? Thanks in advance.