tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

There is already an ongoing transaction for this producer #430

Open gilesbradshaw opened 5 years ago

gilesbradshaw commented 5 years ago

I'm trying to manually commit each message I consume.

Here's my code..

import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['system76-pc:9094']
})

const producer = kafka.producer()
const consumer = kafka.consumer({
  groupId: 'test-group',
})

const tProducer = kafka.producer({
  transactionalId: '1',
})

const run = async () => {
  // Producing
  await producer.connect()
  await tProducer.connect()

  let x: number = 0
  setInterval(
    async () => {
      // return
      console.log(`sending ${x}`)
      await producer.send({
        topic: 'test-topic',
        messages: [
          { value: Buffer.from(` ${new Date().getTime()}: Hello KafkaJS user!: ${x}`) },
        ],
      })
      console.log(`sent ${x}`)
      x += 1

    },
    100,
  )

  // Consuming
  await consumer.connect()
  console.log('consumer connected')
  await consumer.subscribe({
    topic: 'test-topic',
    fromBeginning: true,
  })
  console.log('consumer subscribed')

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        date: new Date().getTime(),
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
      const transaction = await tProducer.transaction()
      await transaction
        .sendOffsets({
          consumerGroupId: 'test-group',
          topics: [{
            topic,
            partitions: [{
              partition,
              offset: message.offset,
            }]
          }]
        })
      await transaction.commit()
    },
  })
}

run()
  .catch(console.error)

This works to a point then I get the following error thrown...

{"level":"ERROR","timestamp":"2019-07-15T11:32:19.655Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNumberOfRetriesExceeded: There is already an ongoing transaction for this producer. Please end the transaction before beginning another.","groupId":"test-group","retryCount":1

so it seems to me that even once I have awaited for the transaction.commit() the transaction is actually still in progress and next time around this causes an error.

Is this right? If so how can I await the transaction no longer being in progress

(I've "fixed" it by having two tProducers and rotating them which seems to give time for the previous tranasaction to complete after it'#s committed but that seems a bit naff)

tulios commented 5 years ago

Hi @gilesbradshaw, I have investigated this problem, and I can't reproduce it on the latest version. Can you try version 1.12.0-beta.0?

gilesbradshaw commented 5 years ago

Hi nope I'm still getting it with 1.12.0-beta.2

gilesbradshaw commented 5 years ago

however - if I create a new producer every time the problem does not occur...

(I don't need the tProducer.connect())

import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['system76-pc:9094']
})

const producer = kafka.producer()
const consumer = kafka.consumer({
  groupId: 'test-group',
})

const run = async () => {
  // Producing
  await producer.connect()

  let x: number = 0
  setInterval(
    async () => {
      // return
      console.log(`sending ${x}`)
      await producer.send({
        topic: 'test-topic',
        messages: [
          { value: Buffer.from(` ${new Date().getTime()}: Hello KafkaJS user!: ${x}`) },
        ],
      })
      console.log(`sent ${x}`)
      x += 1

    },
    100,
  )

  // Consuming
  await consumer.connect()
  console.log('consumer connected')
  await consumer.subscribe({
    topic: 'test-topic',
    fromBeginning: true,
  })
  console.log('consumer subscribed')

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        date: new Date().getTime(),
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
     const tProducer = kafka.producer({
        transactionalId: '1',
     })
      const transaction = await tProducer.transaction()
      await transaction
        .sendOffsets({
          consumerGroupId: 'test-group',
          topics: [{
            topic,
            partitions: [{
              partition,
              offset: message.offset,
            }]
          }]
        })
      await transaction.commit()
    },
  })
}

run()
  .catch(console.error)
caio-favero commented 11 months ago

I ran into this issue in the most recent version

What I found is that the transactionalId is tied to the Producer instead of the transaction. It makes sense considering you intend to keep consistency and order in each message you sent within this producer

However, this is an issue when working with a high load of data in a short span of time (my case)

export interface ProducerConfig {
  createPartitioner?: ICustomPartitioner
  retry?: RetryOptions
  metadataMaxAge?: number
  allowAutoTopicCreation?: boolean
  idempotent?: boolean
  transactionalId?: string
  transactionTimeout?: number
  maxInFlightRequests?: number
}

You can reproduce this error by sending a log of messages in a short span of time. Something like this:

setInterval(async () => {
  const transaction = await producer.transaction();
  transaction.send({ topic: 'test', messages: [{ value: 'test' }] });
  transaction.commit();
}, 1);

One possible solution is internaly queuing those transactions and sending the next message when the previous one is commited (or aborted?) Or managing the transactionalId on const transaction = await producer.transaction(). This could work but defeats the idea of queuing messages inside that one producer

I really don't have a solution but I thought I should bring this issue for consideration