tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.74k stars 525 forks source link

[Producer] Failed to send messages: Connection error: write after end #1617

Open Rishav-Sharma opened 1 year ago

Rishav-Sharma commented 1 year ago

Describe the bug

I am sending multiple messages to confluent kafka topic using kafkajs "^2.2.4" library. I am able to establish a successful connection to the producer. I am creating the connection at the beginning of the lambda before I send any message to kafka using following function:

const initializeProducer = async(cfg) => { try { const kafka = new Kafka({ clientId: cfg.clientId, brokers: [cfg.brokerUrl], ssl: true, logLevel: 2, sasl: { mechanism: 'plain', username: cfg.producerUsername, password: cfg.producerPassword, }, }); producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner, acks: 'all', }); await producer.connect(); console.debug('Producer connected'); return true; } catch (error) { console.error('Error while creating producer connection: ', error); return; } };

The error is coming while I am trying to do producer.send() (At least that's what I believe because otherwise I would've got an exception in the above function).

The errors are coming in the following order.

Error log:

2023-08-28T22:00:41.342+05:30

2023-08-28T16:30:41.342Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR
{ "level": "ERROR", "timestamp": "2023-08-28T16:30:41.342Z", "logger": "kafkajs", "message": "[Connection] Connection timeout", "broker": "broker_urlus-east-1.aws.confluent.cloud:9092", "clientId": "crm" }

2023-08-28T16:30:41.342Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR {"level":"ERROR","timestamp":"2023-08-28T16:30:41.342Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"broker_urlus-east-1.aws.confluent.cloud:9092","clientId":"crm"}

2023-08-28T22:00:41.346+05:30

2023-08-28T16:30:41.346Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR
{ "level": "ERROR", "timestamp": "2023-08-28T16:30:41.346Z", "logger": "kafkajs", "message": "[Producer] Failed to send messages: Connection timeout", "retryCount": 0, "retryTime": 334 }

2023-08-28T16:30:41.346Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR {"level":"ERROR","timestamp":"2023-08-28T16:30:41.346Z","logger":"kafkajs","message":"[Producer] Failed to send messages: Connection timeout","retryCount":0,"retryTime":334}

2023-08-28T22:00:42.354+05:30

2023-08-28T16:30:42.354Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR
{ "level": "ERROR", "timestamp": "2023-08-28T16:30:42.354Z", "logger": "kafkajs", "message": "[Connection] Connection error: Client network socket disconnected before secure TLS connection was established", "broker": "broker_urlus-east-1.aws.confluent.cloud:9092", "clientId": "crm", "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (node:internal/errors:705:14)\n at TLSSocket.onConnectEnd (node:_tls_wrap:1594:19)\n at TLSSocket.emit (node:events:525:35)\n at TLSSocket.emit (node:domain:489:12)\n at endReadableNT (node:internal/streams/readable:1358:12)\n at processTicksAndRejections (node:internal/process/task_queues:83:21)" }

2023-08-28T16:30:42.354Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR {"level":"ERROR","timestamp":"2023-08-28T16:30:42.354Z","logger":"kafkajs","message":"[Connection] Connection error: Client network socket disconnected before secure TLS connection was established","broker":"broker_urlus-east-1.aws.confluent.cloud:9092","clientId":"crm","stack":"Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (node:internal/errors:705:14)\n at TLSSocket.onConnectEnd (node:_tls_wrap:1594:19)\n at TLSSocket.emit (node:events:525:35)\n at TLSSocket.emit (node:domain:489:12)\n at endReadableNT (node:internal/streams/readable:1358:12)\n at processTicksAndRejections (node:internal/process/task_queues:83:21)"}

2023-08-28T22:00:42.356+05:30

2023-08-28T16:30:42.356Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR
{ "level": "ERROR", "timestamp": "2023-08-28T16:30:42.356Z", "logger": "kafkajs", "message": "[Connection] Connection error: write after end", "broker": "broker_urlus-east-1.aws.confluent.cloud:9092", "clientId": "crm", "stack": "Error [ERR_STREAM_WRITE_AFTER_END]: write after end\n at new NodeError (node:internal/errors:387:5)\n at _write (node:internal/streams/writable:321:11)\n at TLSSocket.Writable.write (node:internal/streams/writable:336:10)\n at Object.sendRequest (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/connection.js?:409:27)\n at SocketRequest.send [as sendRequest] (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/index.js?:134:23)\n at SocketRequest.send (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/socketRequest.js?:88:10)\n at RequestQueue.sendSocketRequest (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/index.js?:182:19)\n at RequestQueue.push (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/index.js?:162:12)\n at eval (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/connection.js?:404:29)\n at new Promise ()" }

2023-08-28T16:30:42.356Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR {"level":"ERROR","timestamp":"2023-08-28T16:30:42.356Z","logger":"kafkajs","message":"[Connection] Connection error: write after end","broker":"broker_urlus-east-1.aws.confluent.cloud:9092","clientId":"crm","stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end\n at new NodeError (node:internal/errors:387:5)\n at _write (node:internal/streams/writable:321:11)\n at TLSSocket.Writable.write (node:internal/streams/writable:336:10)\n at Object.sendRequest (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/connection.js?:409:27)\n at SocketRequest.send [as sendRequest] (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/index.js?:134:23)\n at SocketRequest.send (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/socketRequest.js?:88:10)\n at RequestQueue.sendSocketRequest (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/index.js?:182:19)\n at RequestQueue.push (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/requestQueue/index.js?:162:12)\n at eval (webpack://invoice-events-producer/./node_modules/kafkajs/src/network/connection.js?:404:29)\n at new Promise ()"}

2023-08-28T22:00:42.359+05:30

2023-08-28T16:30:42.359Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR
{ "level": "ERROR", "timestamp": "2023-08-28T16:30:42.359Z", "logger": "kafkajs", "message": "[Producer] Failed to send messages: Connection error: write after end", "retryCount": 1, "retryTime": 538 }

2023-08-28T16:30:42.359Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 ERROR {"level":"ERROR","timestamp":"2023-08-28T16:30:42.359Z","logger":"kafkajs","message":"[Producer] Failed to send messages: Connection error: write after end","retryCount":1,"retryTime":538}

2023-08-28T22:00:43.489+05:30

2023-08-28T16:30:43.489Z 799b5a9a-6c4f-46c5-8f3b-ce78fc249e04 DEBUG File 1/1:: Batch: 1:: Producer response: [ { "topicName": "my_topic_name", "partition": 0, "errorCode": 0, "baseOffset": "44", "logAppendTime": "-1", "logStartOffset": "0" } , { "topicName": "my_topic_name", "partition": 2, "errorCode": 0, "baseOffset": "40", "logAppendTime": "-1", "logStartOffset": "0" } , { "topicName": "my_topic_name", "partition": 1, "errorCode": 0, "baseOffset": "54", "logAppendTime": "-1", "logStartOffset": "0" } , { "topicName": "my_topic_name", "partition": 3, "errorCode": 0, "baseOffset": "28", "logAppendTime": "-1", "logStartOffset": "0" } ]

It's interesting that after all this, the messages are being sent successfully. I encountered this error only on 28 Aug, 2023. Since then I've not encountered this error again. But I want to know the possible reason for this error so we can avoid it in future.

I understand that since I'm using the default connection timeout of 1000ms I am getting connection timeout.

But the thing that I'm not able to wrap my head around is why I am getting "[Connection] Connection error: write after end".

Can someone please tell me the reason for this and more specifically why they're occurring in a specific order:

Environment:

edutedu commented 9 months ago

+1

SacrumDeus commented 7 months ago

I've encounted the same problem. My applications (producers and consumers) are running in a docker cluster and are communicating via kakfa.

Environment (erronous container):

This application (producer) receives HTTP requests and sends them to the kafka server. My consumer applications receives the messages. However, the producer throws an error:

{
   "level":"ERROR",
   "timestamp":"2024-03-04T16:46:36.825Z",
   "logger":"kafkajs",
   "message":"[Connection] Connection error: write after end",
   "broker":"stack_kafka:9093",
   "clientId":"service-import-xxx",
   "stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end
        at new NodeError (node:internal/errors:405:5)
        at _write (node:internal/streams/writable:329:11)
        at Writable.write (node:internal/streams/writable:344:10)
        at Object.sendRequest (/home/user/service-import/node_modules/kafkajs/src/network/connection.js:409:27)
        at SocketRequest.send [as sendRequest] (/home/user/service-import/node_modules/user/src/network/requestQueue/index.js:134:23)
        at SocketRequest.send (/home/user/service-import/node_modules/kafkajs/src/network/requestQueue/socketRequest.js:88:10)
        at RequestQueue.sendSocketRequest (/home/user/service-import/node_modules/kafkajs/src/network/requestQueue/index.js:182:19)
        at RequestQueue.push (/home/user/service-import/node_modules/kafkajs/src/network/requestQueue/index.js:162:12)
        at /home/user/service-import/node_modules/kafkajs/src/network/connection.js:404:29
        at new Promise (<anonymous>)"
}

I've found a pretty bad workaround but this has worked for me:

    // send
    ...

    // sleep for a second
    await new Promise((resolve) => setTimeout(resolve, 1000));

    // disconnect producer
    await producer.disconnect();