tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Consumer receives eachBatch over the maxBytes #1694

Open akospaska opened 5 months ago

akospaska commented 5 months ago

Describe the bug The consumer fetches messages over the maxBytes settings

To Reproduce

Consumer setup:

const consumer = kafka.consumer({ groupId: "my-group", maxBytes: 2500 });

  await consumer.run({
    eachBatch: async ({ batch }) => {
      console.log(`Received batch with ${batch.messages.length} messages`);
    },
  });

Producer setup:

Here are the messages what i want to send to the topic. Each message is arround 1017 bytes.

const messages = [
    { value: { id: 21, name: "a".repeat(1000) } },
    { value: { id: 22, name: "a".repeat(1000) } },
    { value: { id: 23, name: "a".repeat(1000) } },
    { value: { id: 24, name: "a".repeat(1000) } },
    { value: { id: 25, name: "a".repeat(1000) } },
    { value: { id: 26, name: "a".repeat(1000) } },
    { value: { id: 27, name: "a".repeat(1000) } },
    { value: { id: 28, name: "a".repeat(1000) } },
    { value: { id: 29, name: "a".repeat(1000) } },
    { value: { id: 210, name: "a".repeat(1000) } },
  ].map((msg) => ({
    value: JSON.stringify(msg.value),
  }));

Expected behavior: The consumer should receive the batches under 2500 bytes, so at maximum 2 messages in each batch.

Producer1 setup:

await producer.send({
    topic: "example-created-batch",
    messages,
  });

In this case the consumer receives all the messages in once, so 10 messages in the batch.

Producer2 setup:

 messages.map((message) =>
    producer.send({
      topic: "example-created-batch",
      messages: [message],
    })
  );

This solution sends the messages individually. In this case happens exactly what I want, In every batch are only 2 messages.

What am I missing?

Is there a way to send the messages in one request, but receives the batches till the maxBytes batch size?

sauravkr20 commented 4 months ago

i am having the same issue in the eachBatch , consumer setup i have configs to be


const consumer = this.kafkaInstance.kafka.consumer({
            // one webhook message is about 350-450 bytes
            // so it will wait 3 sec until there is more than one message, and will take up max of 4000 bytes in a batch
            groupId: group,
            maxBytesPerPartition: 1,
            minBytes: 400,
            maxWaitTimeInMs: 3000,
            retry: {
                retries: 5,
                restartOnFailure: async (err: Error) => {
                    SlackAlertService.getInstance().notify(
                        SlackChannels.PRIORITY_ERRORS,
                        'kafka Consumer shut down due to max retry, restart manually after resolving',
                        this.defaultLogger,
                    );
                    this.defaultLogger.error('Kafka Consumer Failure, restart manually after resolving: ', err);
                    return false;
                },
            },
        });

but i am getting batches with number of message and total sizes

Batch size: 1250, Byte size: 2156251 bytes Batch size: 312, Byte size: 537889 bytes Batch size: 157, Byte size: 270669 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 312, Byte size: 537889 bytes Batch size: 156, Byte size: 268945 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 156, Byte size: 268945 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 313, Byte size: 539613 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 625, Byte size: 1077501 bytes Batch size: 156, Byte size: 268945 bytes Batch size: 156, Byte size: 268945 bytes Batch size: 156, Byte size: 268945 bytes Batch size: 156, Byte size: 268945 bytes Batch size: 625, Byte size: 1077501 bytes

david-dvora commented 1 month ago

Same issue here, latest version 2.2.4

p-flock commented 1 week ago

I am also seeing this behavior - I have tuned the number of partitions consumed concurrently as well as max bytes per partition in order to ensure my workers do not run out of memory - but we continue to receive batches over the configured maxBytesPerPartition setting.

I am wondering if this is due to a lack of specificity, and potentially maxBytesPerPartition applies to compressed response size, and when uncompressed by kafka js the batches exceed the specified limit.

If this is the case it would be nice to note it in the documentations