tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 523 forks source link

v1.16.0 - Lossing data when one node of the cluster is lost #1303

Open cjimenezsaiz opened 2 years ago

cjimenezsaiz commented 2 years ago

Describe the bug I open this issue to clarify if what I see is correct and must be solve by the user in some way, or is a bug. The specific configuration that i have tested:

The size of the test is: 60
{"level":"ERROR","timestamp":"2022-02-28T09:28:59.133Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:9092","clientId":"creator","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":2,"size":123}
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "0",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "0",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "0",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 1 - 60/600
dbsave: 1.090s
The size of the test is: 60
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "20",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "20",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "20",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 2 - 120/600
dbsave: 25.286ms

In the middle of the test I stop one of the servers (not the controller of the cluster) with docker stop 85fsa..., and in the most of cases, not always, this is the log that i receive:

The size of the test is: 60
{"level":"ERROR","timestamp":"2022-02-28T09:29:09.132Z","logger":"kafkajs","message":"[Producer] Failed to send messages: Closed connection","retryCount":0,"retryTime":243}
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "40",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "40",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 3 - 180/600
dbsave: 1.262s
The size of the test is: 60
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "60",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "40",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "60",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 4 - 240/600
dbsave: 9.988ms

As you can see the send to one of the servers failed, but the send method resolved with the response of only 2 server, and the send its not retried.

In the last job process you can see that the final result is that we have lost 20 messages without warning from the library:

The size of the test is: 60
[
  {
    "topicName": "myTopic",
    "partition": 0,
    "errorCode": 0,
    "baseOffset": "180",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 1,
    "errorCode": 0,
    "baseOffset": "160",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  },
  {
    "topicName": "myTopic",
    "partition": 2,
    "errorCode": 0,
    "baseOffset": "180",
    "logAppendTime": "-1",
    "logStartOffset": "0"
  }
]
OPERATION 10 - 600/600
dbsave: 7.462ms

The snipped of code that I use to reproduce this behaviour is:

import { Kafka, Message } from 'kafkajs';
import fs from 'fs';

var access = fs.createWriteStream('test.log');
//@ts-ignore ignore is a test
process.stdout.write = process.stderr.write = access.write.bind(access);

function createMessages(from: number, to: number): Message[] {
  const messages: Message[] = [];
  for (let index = from; index < to; index += 1) {
    messages.push({ value: JSON.stringify({ index }) });
  }
  return messages;
}
const client = new Kafka({
  clientId: 'creator',
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9093'],
  enforceRequestTimeout: true,
  requestTimeout: 3000,
  connectionTimeout: 3000,
});
const producer = client.producer({
  maxInFlightRequests: 1,
  transactionalId: 'my-transactional-producer',
  idempotent: true,
  transactionTimeout: 3000,
  retry: { retries: Number.MAX_SAFE_INTEGER },
});
const consumer = client.consumer({ groupId: 'myGroup' });

(async () => {
  await producer.connect();
  await consumer.connect();
})();

let pending = false;
let consuming = false;
let count = 0;
let aggregatedValue = 0;
let messagesCount = 0;
const messageBatchSize = 60;
const messageFinalAmount = 60 * 10;
const expectedResult = (messageFinalAmount * (messageFinalAmount - 1)) / 2;

console.log(`The final result should be ${expectedResult}`);

setInterval(async () => {
  if (count === messageFinalAmount / messageBatchSize && !consuming) {
    await consumer.subscribe({ topic: 'myTopic', fromBeginning: true });
    await consumer.run({
      eachMessage: async ({ message }) => {
        if (!message.value) {
          throw new Error(`TEST IS BROKEN`);
        }
        const value = JSON.parse(message.value.toString()).index;
        aggregatedValue = aggregatedValue + value;
        console.log(
          `${value} - ${aggregatedValue} - ${
            aggregatedValue === expectedResult ? 'ALL MESSAGE HAS BEEN RECEIVED' : 'PENDING ...'
          }`
        );
      },
    });
    consuming = true;
    return;
  }
  if (pending || consuming) {
    return;
  }
  const messages = createMessages(messagesCount, messagesCount + messageBatchSize);
  console.log(`The size of the test is: ${messages.length}`);
  console.time('dbsave');
  pending = true;
  try {
    const result = await producer.send({ topic: 'myTopic', acks: -1, messages, timeout: 3000 });
    console.log(JSON.stringify(result, null, 2));
    count++;
    messagesCount += messageBatchSize;
    console.log(`OPERATION ${count} - ${messagesCount}/${messageFinalAmount}`);
    console.timeEnd('dbsave');
    pending = false;
  } catch (error) {
    console.log(`ERROR IN OPERATION ${error.message}`);
    console.timeEnd('dbsave');
    pending = false;
  }
}, 5000);

And the docker-compose file:

version: '3.8'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
  kafka-1:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  kafka-2:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9093:9093'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  kafka-3:
    image: confluentinc/cp-kafka:latest
    ports:
      - '9094:9094'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_COMMIT_REQUIRED_ACKS: -1
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
  kowl:
    image: quay.io/cloudhut/kowl:master
    depends_on:
      - zookeeper-1
      - kafka-1
      - kafka-2
      - kafka-3
    environment:
      KAFKA_BROKERS: kafka-1:29092,kafka-2:29093, kafka-3:29094
    ports:
      - "8080:8080"

Expected behavior I expect the method rejects with an error in order to retry the operations by my shelve or try to find the server that its the responsible of the partition now.

Observed behavior Even one of the servers fails in the middle of the process of sending new records to the cluster, the send method resolves without error and the data is lost.

Environment:

cjimenezsaiz commented 2 years ago

Hi there,

I've debugged a little bit more the issue, I don't know the code enough well, but I think I have found where is the problem.

In the sendMessages file, the responses of the brokers is tracked by the Map responsePerBroker.

https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L20-L26

If the request is fulfilled, the response is setted, but if there is a failure in the response, the response from the broker is deleted.

https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L116-L121

This logic is okey when you have a problem in the request but the broker is still there. For that cases where the broker is lost (from 3 brokers in a cluster to 2), in the next retry of the createProducerRequests no request will be performed, due too right now we have only 2 brokers and both of them has an answer in the map responsePerBroker (brokersWithoutResponse will be empty).

https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L65-L69

So, the Promise.all() will be resolved without any code execution and the sendMessages function will return the result of the previous iteration.

https://github.com/tulios/kafkajs/blob/1876abcb5effd6183c1fbaa50dee9cef269b67a5/src/producer/sendMessages.js#L132-L136

I think this logic should be based in partitiions instead of brokers.

Let me know your thoughts

Nevon commented 2 years ago

This is clearly a bug.

I think this logic should be based in partitiions instead of brokers.

So if your analysis is correct, then I suppose what's needed is to during retries detect that we still have outstanding partitions to produce to, check who the new leader for that partition is and then issue new requests.

I'm not sure about the cluster behavior during this operation though. If it's a graceful shutdown of a broker in the cluster, I would expect leadership to change to one of the other brokers, in which case the above approach should work, but if it's an ungraceful shutdown I'm not sure that this will make a difference because the automatic leadership rebalance only kicks in after 5 minutes by default. Regardless, at least we should be able to throw a meaningful error in the case we can't find a leader for the partition, rather than silently fail as is the case currently.

cjimenezsaiz commented 2 years ago

Hi @Nevon,

As you say there will be a complete different behaviour depending on the server "shutdown style". A possible solution is to change the resolve response of the send function adding a list the messages that were commited and not, in that way the user can retry to send the uncommited messages by himself. In the second try, if the cluster is able to receive the messages (even with one server less), you will be able to send all the messages or receive a new error with a clear message of the actual problem.

This is the way that some SDKs use to solve this kind of issues.

For example this is the response of AWS SDK for Kinesis Client or Firehose Client:

{
    "FailedRecordCount": 2,
    "Records": [
        {
            "SequenceNumber": "49543463076548007577105092703039560359975228518395012686", 
            "ShardId": "shardId-000000000000"
        }, 
        {
            "ErrorCode": "ProvisionedThroughputExceededException",
            "ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
        },
        {
            "ErrorCode": "InternalFailure",
            "ErrorMessage": "Internal service failure."
        }
    ]
}

Of course we must the select the correct format to be sure that we don't broke any previous implementation.

Nevon commented 2 years ago

I think if we have any failures that we couldn't handle, we should reject producer.send. We have similar situations with other operations that do multiple things towards multiple brokers, where the operation can partially fail. For example, admin.createTopics can succeed for one topic and fail for another. #1104 is a good example of how we handle that. I think we could do something similar for this.

cjimenezsaiz commented 2 years ago

If the producer.send is rejected, the user will try to retry the complete "job", so in the most of the cases some messages will be commited twice, something that it's not the ideal situation.

Nevon commented 2 years ago

Check it out, this issue has come up before #43. Let's use that instead.

Nevon commented 2 years ago

Actually, nevermind, this deserves its own issue. Part of it is to communicate the partial failure better to the user, but another part is handling the error to begin with. Realized this one millisecond after closing.

cjimenezsaiz commented 2 years ago

As extended error classes are currently be using in the code, we have to decide if the correct approach is to resolve with more info or maybe, reject with a special error.

At the same time, we should use Promise.allSettled()instead of Promise.all(), to be sure that we control which messages were commited, here we need to take into account that Nodejs 12.9.0 will be necessary.

t-d-d commented 2 years ago

There is a all settled() implementation in /utils.

On Thu, 10 Mar 2022, 23:09 Carlos Jiménez Saiz, @.***> wrote:

As extended error classes are currently be using in the code, we have to decide if the correct approach is to resolve with more info or maybe, reject with a special error.

At the same time, we should use Promise.allSettled()instead of Promise.all(), to be sure that we control which messages were commited, here we need to take into account that Nodejs 12.9.0 will be necessary.

— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/1303#issuecomment-1064164351, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABDLW5T2IF327FH7TXLIB7LU7IGD7ANCNFSM5PQWJ33A . You are receiving this because you are subscribed to this thread.Message ID: @.***>

Igor-Kuzmin commented 2 years ago

Hi, We also faced this problem, and it is critical for us. I would like to fix it if you don't mind.

cjimenezsaiz commented 2 years ago

Hi, no problem from my side, I have sudden rush of work and I have deferred this task with no forecast date

nick4fake commented 1 year ago

Sorry, any movement on this? Sounds like something that might happen on any production system that uses this library

eladchen commented 6 months ago

So basically this issue means that once a node is down (like when MSK clusters go through security patches, which are frequent) we may end up losing messages?

cjimenezsaiz commented 5 months ago

@eladchen Yes