tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

The idempotent producer doesn't work properly when sending R request, R > # of partitions #1005

Closed hiroya8649 closed 2 years ago

hiroya8649 commented 3 years ago

Describe the bug Said the # of the partitions = N. R > N

Produce R requests, but only the first N requests are caught by consumer , and the left R - N requests are gone. And I don't get any rejections. (the await Promise.all(promises) in producer doesn't throw any errors) If I set idempotent to false then I can catch exactly R requests with consumer.

To Reproduce https://github.com/hiroya8649/kafka-report

node consumer.js
node producer.js

Expected behavior If set idempotent to true, the consumer should catch the same amount of requests send by the producer.

Observed behavior Produce 5 requests (R = 5, N = 3), here is the result of producer and consumer:

producer

InstrumentationEvent {
  id: 0,
  type: 'producer.network.request',
  timestamp: 1611581796507,
  payload: {
    broker: 'localhost:9092',
    clientId: 'my-app',
    correlationId: 0,
    size: 338,
    createdAt: 1611581796506,
    sentAt: 1611581796507,
    pendingDuration: 1,
    duration: 0,
    apiName: 'ApiVersions',
    apiKey: 18,
    apiVersion: 2
  }
}
InstrumentationEvent {
  id: 1,
  type: 'producer.network.request',
  timestamp: 1611581796513,
  payload: {
    broker: 'localhost:9092',
    clientId: 'my-app',
    correlationId: 1,
    size: 1813,
    createdAt: 1611581796512,
    sentAt: 1611581796513,
    pendingDuration: 1,
    duration: 0,
    apiName: 'Metadata',
    apiKey: 3,
    apiVersion: 6
  }
}
InstrumentationEvent {
  id: 2,
  type: 'producer.network.request',
  timestamp: 1611581796515,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 0,
    size: 20,
    createdAt: 1611581796515,
    sentAt: 1611581796515,
    pendingDuration: 0,
    duration: 0,
    apiName: 'InitProducerId',
    apiKey: 22,
    apiVersion: 1
  }
}
InstrumentationEvent {
  id: 3,
  type: 'producer.network.request',
  timestamp: 1611581796516,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 1,
    size: 177,
    createdAt: 1611581796516,
    sentAt: 1611581796516,
    pendingDuration: 0,
    duration: 0,
    apiName: 'Metadata',
    apiKey: 3,
    apiVersion: 6
  }
}
InstrumentationEvent {
  id: 4,
  type: 'producer.network.request',
  timestamp: 1611581796523,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 2,
    size: 59,
    createdAt: 1611581796522,
    sentAt: 1611581796522,
    pendingDuration: 0,
    duration: 1,
    apiName: 'Produce',
    apiKey: 0,
    apiVersion: 7
  }
}
InstrumentationEvent {
  id: 5,
  type: 'producer.network.request',
  timestamp: 1611581796523,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 3,
    size: 59,
    createdAt: 1611581796522,
    sentAt: 1611581796523,
    pendingDuration: 1,
    duration: 0,
    apiName: 'Produce',
    apiKey: 0,
    apiVersion: 7
  }
}
InstrumentationEvent {
  id: 6,
  type: 'producer.network.request',
  timestamp: 1611581796524,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 4,
    size: 59,
    createdAt: 1611581796523,
    sentAt: 1611581796523,
    pendingDuration: 0,
    duration: 1,
    apiName: 'Produce',
    apiKey: 0,
    apiVersion: 7
  }
}
InstrumentationEvent {
  id: 7,
  type: 'producer.network.request',
  timestamp: 1611581796524,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 5,
    size: 59,
    createdAt: 1611581796523,
    sentAt: 1611581796523,
    pendingDuration: 0,
    duration: 1,
    apiName: 'Produce',
    apiKey: 0,
    apiVersion: 7
  }
}
InstrumentationEvent {
  id: 8,
  type: 'producer.network.request',
  timestamp: 1611581796524,
  payload: {
    broker: 'hao-MS-7B85:9092',
    clientId: 'my-app',
    correlationId: 6,
    size: 59,
    createdAt: 1611581796523,
    sentAt: 1611581796523,
    pendingDuration: 0,
    duration: 1,
    apiName: 'Produce',
    apiKey: 0,
    apiVersion: 7
  }
}
[
  [
    {
      topicName: 'first_topic',
      partition: 2,
      errorCode: 0,
      baseOffset: '14',
      logAppendTime: '-1',
      logStartOffset: '0'
    }
  ],
  [
    {
      topicName: 'first_topic',
      partition: 0,
      errorCode: 0,
      baseOffset: '14',
      logAppendTime: '-1',
      logStartOffset: '0'
    }
  ],
  [
    {
      topicName: 'first_topic',
      partition: 1,
      errorCode: 0,
      baseOffset: '16',
      logAppendTime: '-1',
      logStartOffset: '0'
    }
  ],
  [
    {
      topicName: 'first_topic',
      partition: 0,
      errorCode: 0,
      baseOffset: '14',
      logAppendTime: '1611581796522',
      logStartOffset: '0'
    }
  ],
  [
    {
      topicName: 'first_topic',
      partition: 2,
      errorCode: 0,
      baseOffset: '14',
      logAppendTime: '1611581796521',
      logStartOffset: '0'
    }
  ]
]
Cost: 8ms

consumer

{ partition: 2, offset: '14', value: 'Hello! 1611581796516-0' }
{ partition: 0, offset: '14', value: 'Hello! 1611581796516-1' }
{ partition: 1, offset: '16', value: 'Hello! 1611581796516-2' }

Environment:

t-d-d commented 3 years ago

Looking at the code, it seems that concurrent calls to produce() are not handled correctly. The sequence number for the partition is not being read or updated at the right time, which means subsequent (concurrent) messages to the same partition are sent with the same sequence number. These will get ignored by the broker as it will interpret them as repeats.