Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.1k stars 391 forks source link

memory leak on exception in produce() when using opaque data #953

Closed emelski closed 2 years ago

emelski commented 2 years ago

Environment Information

Steps to Reproduce

producer.produce() allows the caller to provide arbitrary data to be included in delivery reports via the opaque parameter. Normally the opaque data is persisted by the call to produce() and then released when the delivery report is made. In the case of an error in produce(), no delivery report ever comes for the message, so the opaque data is never released, resulting in a memory leak.

Assuming a kafka instance running at localhost:9093 (I used the bitnami docker image in my test), and that kafka is setup not to auto-create unknown topics, this script makes it possible to see the leak.

const {once} = require('events')
const Kafka = require('node-rdkafka')

function dr(err, report) {
  console.log({err}, 'delivery report')
}

async function main() {
  const prod = new Kafka.Producer({
    'dr_cb': true,
    'metadata.broker.list': 'localhost:9093',
    'security.protocol': 'sasl_plaintext',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'user',
    'sasl.password': 'bitnami',
    'topic.metadata.propagation.max.ms': 500
  }, {})
  prod.on('delivery-report', dr)
  console.log('connecting')
  prod.connect()
  await once(prod, 'ready')
  console.log('connected')

  for (let i = 0 ; i < 1000 ; ++i) {
    console.log(process.memoryUsage(), `producing ${i}`)
    try {
      let opaque = { leaked: Buffer.from(`${i}`.repeat(1048576)) }
      prod.produce('bogus', -1, Buffer.from('my message'), null, Date.now(), opaque)
      prod.flush(500, () => { })
    } catch (err) {
      console.log({err}, `error producing ${i}`)
    }
    await new Promise(r => setTimeout(r, 100));
  }
}

main()
emelski commented 2 years ago

Submitted a PR that fixes the leak and which I've been using in production for several months.

https://github.com/Blizzard/node-rdkafka/pull/954