tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

`eachBatchAutoResolve: false` and using of transaction`tx` results in an infinite number of messages #1665

Closed danilredko closed 8 months ago

danilredko commented 9 months ago

Describe the bug

async sendMessages({ batch, messages }: { batch: Batch; messages: Message[] }): Promise<void> {
        this.logger.info(`Sending ${messages.length} events`)
        if (messages.length === 0) {
            return
        }

        const batchLastOffset = parseInt(batch.lastOffset(), 10)
        const partition = batch.partition
        const producer = await this.getProducersForBatch(batch)
        const tx = await producer.transaction()
        try {
            await tx.send({
                topic: this.config.kafka.producerTopic!,
                compression: CompressionTypes.Snappy,
                messages,
            })

            await tx.sendOffsets({
                consumerGroupId: this.config.kafka.groupID!,
                topics: [
                    {
                        topic: this.config.kafka.consumerTopic!,
                        partitions: [
                            {
                                offset: (batchLastOffset + 1).toString(),
                                partition,
                            },
                        ],
                    },
                ],
            })
            await tx.commit()
        } catch (e) {
            await tx.abort()
            throw e
        }
    }

async consumeAndProduceMessages({
        messageHandler,
    }: {
        messageHandler: ConsumeProduceMessagesHandler
    }): Promise<void> {
        if (!this.config.kafka.producerTopic) {
            throw new Error('producerTopic is not set')
        }

        await this.consumer.connect()
        await this.consumer.subscribe({ topic: this.config.kafka.consumerTopic })

        await this.consumer.run({
            partitionsConsumedConcurrently: this.config.kafka.partitionConsumerConcurrency,
            eachBatchAutoResolve: false,
            eachBatch: async ({ batch }) => {
                const resultMessages = await messageHandler(batch.messages)
                await this.sendMessages({
                    batch,
                    messages: resultMessages,
                })
            },
        })
    }

eachBatchAutoResolve: false and using of transactiontx results in an infinite number of messages to producerTopic. To Reproduce

  1. Run a producer that continuously produces messages to a topic
  2. Run a consumer that subscribes to that topic (consumeAndProduceMessages)
  3. It would result in an infinite publishing messages to ProducerTopic.

Expected behavior ConsumeAndProduceMessage - publishes a message to producerTopic for each message from consumerTopic.

Observed behavior ConsumeAndProduceMessage - publishes infinite number of messages to producerTopic.

Environment: