tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Transactional producer sending phantom messages #684

Closed jo3bingham closed 4 years ago

jo3bingham commented 4 years ago

Following up with #682, I wanted to know why messages sent from our producer had evenly numbered offsets in kafka and why the high-watermark for that topic/partition was always NumberOfMessages+1.

So I created a simple topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Then I created a simple transactional producer, and admin to display watermark:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  brokers: ['127.0.0.1:9092'],
  clientId: 'producer',
})

const config = {
  idempotent: true,
  maxInFlightRequests: 1,
  transactionalId: 'id',
}

const admin = kafka.admin()
const producer = kafka.producer(config)

const run = async() => {
  await admin.connect()
  await producer.connect()

  const transaction = await producer.transaction()
  try {
    await transaction.send({
      topic: 'test',
      messages: [
        { key: 'a', value: '1' },
      ],
    })
    await transaction.commit()
  } catch (ex) {
    await transaction.abort()
    console.log(ex)
  }

  const offsets = await admin.fetchTopicOffsets('test')
  console.log(offsets)

  await admin.disconnect()
  await producer.disconnect()
}

run().catch(console.error)

The first run of the code produced:

[ { partition: 0, offset: '2', high: '2', low: '0' } ]

The second produced:

[ { partition: 0, offset: '4', high: '4', low: '0' } ]

The third produced:

[ { partition: 0, offset: '6', high: '6', low: '0' } ]

As you can see, the offset and high watermark increments by two even though I'm only sending one message.

Next, I modified the code to use the producer to send the message:

...
  await producer.connect()

  try {
    await producer.send({
      topic: 'test',
      messages: [
        { key: 'a', value: '1' },
      ],
    })
  } catch (ex) {
    console.log(ex)
  }

  const offsets = await admin.fetchTopicOffsets('test')
  console.log(offsets)
...

The first run of the modified code produced:

[ { partition: 0, offset: '7', high: '7', low: '0' } ]

The second produced:

[ { partition: 0, offset: '8', high: '8', low: '0' } ]

The third produced:

[ { partition: 0, offset: '9', high: '9', low: '0' } ]

Now the offset and high watermark is only incrementing by one, as one would assume.

Consuming the topic from the beginning retrieves six messages (as expected since I sent 3 via transaction and 3 via producer):

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  brokers: ['127.0.0.1:9092'],
  clientId: 'consumer',
})

const topic = 'test'
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async() => {
  await consumer.connect()
  await consumer.subscribe({ topic, fromBeginning: true })
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
      console.log(`- ${prefix} ${message.key}#${message.value}`)
    },
  })
}

run().catch(console.error)

Output:

- test[0 | 0] / 1585749266604 a#1
- test[0 | 2] / 1585749280837 a#1
- test[0 | 4] / 1585749367956 a#1
- test[0 | 6] / 1585749576105 a#1
- test[0 | 7] / 1585749698091 a#1
- test[0 | 8] / 1585749812388 a#1
Nevon commented 4 years ago

I haven't looked at transactions in over a year (maybe closer to two), but unless I'm mistaken, when you call commit, what happens is that the transaction coordinator writes a transaction commit marker to the partition. A transaction commit marker is how a consumer knows whether or not a certain message has actually been committed or is currently part of an open transaction (or one that has been rolled back).

When a consumer reads from the partition, any message that does not have an associated transaction commit marker will not be returned to the user (assuming the client understands transactions and is configured to only read committed messages). So the phantom message you are talking about is that transaction commit marker.

You can verify this by publishing N messages within a single transaction before committing, and you should see the offset incrementing by N+1.

Feel free to re-open if I'm wrong about this, but I'm 95% sure that this is the case. You can read more about how transactions work here.