Open caseycrites opened 4 years ago
I've tried to reproduce this myself using Toxiproxy to introduce a timeout between the client and the broker, but I haven't been able to get it to actually trigger the error. The branch is repro-919
in case anyone wants to give it a go.
AFAIK, the socket timeout is decided by the operating system, with no real way to override it from within NodeJS. According to sysctl
, my timeout should be 75 seconds:
$ sysctl net.inet.tcp | grep keepinit
net.inet.tcp.keepinit: 75000
So I configure my client to talk to Toxiproxy and for Toxiproxy to proxy to the broker. I connect the client and produce once just to see that everything works. Then I introduce the timeout into the proxy, setting it to stop all data but not actually close the connection. If it's stopping keepalive packets, then I would expect an ETIMEDOUT after 75 seconds.
That's not what I'm seeing though. Instead, it sits for about 10 minutes waiting before it actually times out. And I don't get an ETIMEDOUT error, and the client tries to reconnect several times (looks like nested retriers):
$ time node examples/producer.js
info: ┏ [Producer] Initialized proxy
info: ┃ [ 0] {
info: ┃ [ 1] "timestamp": "2020-10-14T13:36:40.312Z",
info: ┃ [ 2] "logger": "kafkajs",
info: ┃ [ 3] "proxy": {
info: ┃ [ 4] "toxiproxy": {
info: ┃ [ 5] "host": "http://192.168.0.38:8474"
info: ┃ [ 6] },
info: ┃ [ 7] "name": "kafka-proxy",
info: ┃ [ 8] "listen": "[::]:39094",
info: ┃ [ 9] "upstream": "kafka:29094",
info: ┃ [10] "enabled": true,
info: ┃ [11] "toxics": []
info: ┃ [12] }
info: ┗ [13] }
info: ┏ Sending 236 messages #0...
info: ┃ [0] {
info: ┃ [1] "timestamp": "2020-10-14T13:36:40.370Z",
info: ┃ [2] "logger": "kafkajs"
info: ┗ [3] }
info: ┏ Messages sent #0
info: ┃ [ 0] {
info: ┃ [ 1] "timestamp": "2020-10-14T13:36:40.468Z",
info: ┃ [ 2] "logger": "kafkajs",
info: ┃ [ 3] "response": [
info: ┃ [ 4] {
info: ┃ [ 5] "topicName": "topic-test",
info: ┃ [ 6] "partition": 0,
info: ┃ [ 7] "errorCode": 0,
info: ┃ [ 8] "baseOffset": "5878",
info: ┃ [ 9] "logAppendTime": "-1",
info: ┃ [10] "logStartOffset": "0"
info: ┃ [11] }
info: ┃ [12] ],
info: ┃ [13] "msgNumber": 236
info: ┗ [14] }
info: ┏ [Producer] Creating network timeout
info: ┃ [ 0] {
info: ┃ [ 1] "timestamp": "2020-10-14T13:36:40.472Z",
info: ┃ [ 2] "logger": "kafkajs",
info: ┃ [ 3] "toxic": {
info: ┃ [ 4] "attributes": {
info: ┃ [ 5] "timeout": 0
info: ┃ [ 6] },
info: ┃ [ 7] "name": "timeout_downstream",
info: ┃ [ 8] "stream": "downstream",
info: ┃ [ 9] "toxicity": 1,
info: ┃ [10] "type": "timeout"
info: ┃ [11] }
info: ┗ [12] }
info: ┏ Sending 573 messages #1...
info: ┃ [0] {
info: ┃ [1] "timestamp": "2020-10-14T13:36:40.473Z",
info: ┃ [2] "logger": "kafkajs"
info: ┗ [3] }
// ... Sits here for about 10 minutes
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:41.268Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:41.269Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 0,
error: ┃ [4] "retryTime": 335
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:41.275Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:42.608Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:42.608Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 1,
error: ┃ [4] "retryTime": 542
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:42.613Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:44.155Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:44.155Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 2,
error: ┃ [4] "retryTime": 964
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:44.160Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:46.120Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:46.121Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 3,
error: ┃ [4] "retryTime": 1864
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:46.125Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:48.991Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:48.992Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 4,
error: ┃ [4] "retryTime": 3810
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:48.995Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:53.804Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to broker, reconnecting
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:53.805Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 5,
error: ┃ [4] "retryTime": 7784
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:53.812Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "localhost:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:54.810Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:54.811Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 0,
error: ┃ [4] "retryTime": 249
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:54.815Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:56.064Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:56.065Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 1,
error: ┃ [4] "retryTime": 404
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:56.068Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:57.470Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:57.471Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 2,
error: ┃ [4] "retryTime": 748
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:57.475Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:59.227Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:59.227Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 3,
error: ┃ [4] "retryTime": 1220
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:46:59.232Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:01.449Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:01.450Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 4,
error: ┃ [4] "retryTime": 2494
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:01.454Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:04.952Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:04.952Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 5,
error: ┃ [4] "retryTime": 5136
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:04.959Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:05.956Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:05.957Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 0,
error: ┃ [4] "retryTime": 282
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:05.961Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:07.241Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:07.242Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 1,
error: ┃ [4] "retryTime": 564
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:07.246Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:08.814Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:08.814Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 2,
error: ┃ [4] "retryTime": 1092
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:08.819Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:10.917Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:10.918Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 3,
error: ┃ [4] "retryTime": 2474
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:10.928Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:14.398Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:14.399Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 4,
error: ┃ [4] "retryTime": 4666
error: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:14.406Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer",
error: ┃ [5] "stack": "Error: Client network socket disconnected before secure TLS connection was established\n at connResetException (internal/errors.js:613:14)\n at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n at TLSSocket.emit (events.js:326:22)\n at endReadableNT (_stream_readable.js:1226:12)\n at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
error: ┏ [Connection] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:20.071Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "broker": "192.168.0.38:39094",
error: ┃ [4] "clientId": "example-producer"
error: ┗ [5] }
error: ┏ [BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:20.071Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "retryCount": 5,
error: ┃ [4] "retryTime": 10370
error: ┗ [5] }
error: ┏ [example/producer] Connection timeout
error: ┃ [0] {
error: ┃ [1] "timestamp": "2020-10-14T13:47:20.072Z",
error: ┃ [2] "logger": "kafkajs",
error: ┃ [3] "stack": "KafkaJSNonRetriableError\n Caused by: KafkaJSNonRetriableError\n Caused by: KafkaJSConnectionError: Connection timeout\n at Timeout.onTimeout [as _onTimeout] (/Users/tommybrunn/workspace/kafkajs/src/network/connection.js:165:23)\n at listOnTimeout (internal/timers.js:551:17)\n at processTimers (internal/timers.js:494:7)"
error: ┗ [4] }
info: ┏ process.on SIGTERM
info: ┃ [0] {
info: ┃ [1] "timestamp": "2020-10-14T13:47:20.072Z",
info: ┃ [2] "logger": "kafkajs"
info: ┗ [3] }
node examples/producer.js 0.59s user 0.13s system 0% cpu 10:40.30 total
Another option might be to write a test where you implement a custom socket factory with a socket that you can trigger to emit the timeout
event manually.
I'll try to reproduce with toxiproxy and/or a custom socket factory today, since I'd like to figure this out.
I think if I'm not able to reproduce in a timely fashion, I'm going to turn off kafkajs retries altogether and just catch and retry myself.
Based on my testing yesterday, I'm not sure there's a way to work around this without shutting off retries altogether. The code as currently written closes down the socket connection to a single broker, not the entire connection, so even if I implement a custom socket factory and listen for ETIMEDOUT
events, I'm not even sure how to best get things reconnected since calling producer.connect
would likely bail early since it already has connected brokers, and if I disconnect/connect the producer I'd presumably still need to call produce.send
again because the request queue will be drained, in which case I may as well just turn off retries and retry failures myself.
The code as currently written closes down the socket connection to a single broker, not the entire connection, so even if I implement a custom socket factory and listen for ETIMEDOUT events
Can you share a link to what you're referring to? I would have expected this to come via the socket timeout
event, which we listen to and throw a KafkaJSConnectionError
in response to, but I guess that's not the case.
Hello,
I've made an attempt to reproduce the issue using docker, see below:
KafkaJS version 1.15.0
Kafka version 2.8 (Confluent Platform 6.2.0)
NodeJS version from node:lts-alpine
image
Just run the script start-repro-timeout.sh
It starts a zookeeper + 3 brokers + control-center
The producer code is very simple.
Config used is:
const kafka = new Kafka({
clientId: 'my-kafkajs-producer',
brokers: ['broker1:9092','broker2:9092','broker3:9092'],
enforceRequestTimeout: true,
logLevel: logLevel.DEBUG,
acks:1,
connectionTimeout: 20000,
})
It allows only one pending request in order to make troubleshooting easier.
I'm blocking IP address (using iptables) corresponding to kafkaJS client container in broker1
container, so KafkaJS producer does not receive any response back from broker1
ip=$(docker inspect -f '{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $(docker ps -aq) | grep client-kafkajs | cut -d " " -f 3)
docker exec -e ip=$ip --privileged --user root broker1 sh -c "iptables -A OUTPUT -p tcp -d $ip -j DROP"
let the test run 5 minutes
sleep 300
Unblocking IP address $ip corresponding to kafkaJS client
docker exec -e ip=$ip --privileged --user root broker1 sh -c "iptables -D OUTPUT -p tcp -d $ip -j DROP"
let the test run 5 minutes
sleep 300
Traffic is blocked at 11:32:33
:
11:32:33 ℹ️ Blocking IP address 172.18.0.6 corresponding to kafkaJS client
11:32:33 ℹ️ Grepping for WARN|ERROR|Metadata|timed out
30 seconds later request timeout:
[[11:33:04.669]] [LOG] Producer request timed out at 1630927984668 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":17,"createdAt":1630927954687,"sentAt":1630927954687,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
Then we see a Connection error: read ETIMEDOUT
:
[[11:33:29.928]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:33:29.928Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: read ETIMEDOUT\n at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"}
That seems to trigger a disconnect:
[[11:33:29.929]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:33:29.929Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:33:29.930]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:33:29.930Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
I do not see a re-connect, but requests are retried:
Retries:
[[11:33:34.935]] [LOG] Producer request timed out at 1630928014935 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":18,"createdAt":1630927984955,"sentAt":1630927984955,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:34:05.382]] [LOG] Producer request timed out at 1630928045381 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":19,"createdAt":1630928015401,"sentAt":1630928015401,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:34:36.243]] [LOG] Producer request timed out at 1630928076243 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":20,"createdAt":1630928046263,"sentAt":1630928046263,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:07.880]] [LOG] Producer request timed out at 1630928107880 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":21,"createdAt":1630928077900,"sentAt":1630928077900,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:41.097]] [LOG] Producer request timed out at 1630928141097 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":22,"createdAt":1630928111116,"sentAt":1630928111116,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:35:41.098]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:35:41.098Z","logger":"kafkajs","message":"[Producer] Request Produce(key: 0, version: 5) timed out","retryCount":0,"retryTime":282}
[[11:35:41.099]] [LOG] failed to send data KafkaJSRequestTimeoutError: Request Produce(key: 0, version: 5) timed out
[[11:36:11.879]] [LOG] Producer request timed out at 1630928171879 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":23,"createdAt":1630928141899,"sentAt":1630928141899,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:36:42.178]] [LOG] Producer request timed out at 1630928202178 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":24,"createdAt":1630928172198,"sentAt":1630928172198,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:37:12.826]] [LOG] Producer request timed out at 1630928232826 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"createdAt":1630928202845,"sentAt":1630928202845,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:37:43.965]] [LOG] Producer request timed out at 1630928263965 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":26,"createdAt":1630928233984,"sentAt":1630928233984,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:16.155]] [LOG] Producer request timed out at 1630928296154 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":27,"createdAt":1630928266176,"sentAt":1630928266176,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:51.223]] [LOG] Producer request timed out at 1630928331223 {"broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":28,"createdAt":1630928301243,"sentAt":1630928301243,"pendingDuration":0,"apiName":"Produce","apiKey":0,"apiVersion":5}
[[11:38:51.224]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:38:51.224Z","logger":"kafkajs","message":"[Producer] Request Produce(key: 0, version: 5) timed out","retryCount":0,"retryTime":320}
[[11:38:51.225]] [LOG] failed to send data KafkaJSRequestTimeoutError: Request Produce(key: 0, version: 5) timed out
etc...
Note: Request metadata happening on broker3, broker1 is seen as ok as expected (because connection issue is only happening from broker1 to kakfaJS client):
[[11:37:46.168]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:37:46.167Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"broker3:9092","clientId":"my-kafkajs-producer","correlationId":17,"expectResponse":true,"size":47}
[[11:37:46.169]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:37:46.169Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"broker3:9092","clientId":"my-kafkajs-producer","correlationId":17,"size":255,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"broker2","port":9092,"rack":null},{"nodeId":3,"host":"broker3","port":9092,"rack":null},{"nodeId":1,"host":"broker1","port":9092,"rack":null}],"clusterId":"_jMoVOJEQiS8ez1Eo1ucpQ","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"kafkajs","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":2,"replicas":[2,3,1],"isr":[2,3,1],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":1,"leader":3,"replicas":[3,1,2],"isr":[3,1,2],"offlineReplicas":[]},{"partitionErrorCode":0,"partitionId":2,"leader":1,"replicas":[1,2,3],"isr":[1,2,3],"offlineReplicas":[]}]}]}}
At 11:42:33
the iptables rule is removed:
11:42:33 ℹ️ Unblocking IP address 172.18.0.6 corresponding to kafkaJS client
We see a disconnection, probably because broker disconnected for good the client (due to connections.max.idle.ms
which is 10 minutes by default). This time we have Kafka server has closed connection
followed by Connecting
:
[[11:42:34.388]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.387Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.388]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.388Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.388]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.388Z","logger":"kafkajs","message":"[Connection] Kafka server has closed connection","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[11:42:34.394]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T11:42:34.394Z","logger":"kafkajs","message":"[Connection] Connection error: write EPIPE","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: write EPIPE\n at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:94:16)"}
[[11:42:34.855]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T11:42:34.855Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"broker1:9092","clientId":"my-kafkajs-producer","ssl":false,"sasl":false}
Full logs are here
I re-ran a test with 5 minutes of iptables instead of 10 minutes (to avoid the disconnection from the broker due to connections.max.idle.ms
)
Traffic was blocked at 12:50:45
:
12:50:45 ℹ️ Blocking IP address 172.20.0.6 corresponding to kafkaJS client
Around 60 seconds later we see disconnection:
[[12:51:44.730]] [ERROR] {"level":"ERROR","timestamp":"2021-09-06T12:51:44.730Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"broker1:9092","clientId":"my-kafkajs-producer","stack":"Error: read ETIMEDOUT\n at TCP.onStreamRead (internal/stream_base_commons.js:209:20)"}
[[12:51:44.730]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T12:51:44.730Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
[[12:51:44.731]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T12:51:44.731Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"broker1:9092","clientId":"my-kafkajs-producer"}
When traffic is back at 12:55:45
:
12:55:45 ℹ️ Unblocking IP address 172.20.0.6 corresponding to kafkaJS client
We see a retry right after:
[[12:56:00.874]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T12:56:00.874Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 5)","broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"expectResponse":true,"size":510055}
After 21 seconds (not sure why??), we see accumulated responses (blocked by iptables)
[[12:56:21.021]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.021Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":15}
[[12:56:21.023]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.023Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":16}
[[12:56:21.026]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.026Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":17}
[[12:56:21.029]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.029Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":18}
[[12:56:21.032]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.032Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":19}
[[12:56:21.034]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.033Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":20}
[[12:56:21.035]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.035Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":21}
[[12:56:21.038]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.038Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":22}
[[12:56:21.040]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.040Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":23}
[[12:56:21.043]] [WARN] {"level":"WARN","timestamp":"2021-09-06T12:56:21.042Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"my-kafkajs-producer","broker":"broker1:9092","correlationId":24}
Followed by request response:
[[12:56:21.044]] [LOG] {"level":"DEBUG","timestamp":"2021-09-06T12:56:21.044Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 5)","broker":"broker1:9092","clientId":"my-kafkajs-producer","correlationId":25,"size":55,"data":{"topics":[{"topicName":"kafkajs","partitions":[{"partition":0,"errorCode":0,"baseOffset":"83","logAppendTime":"-1","logStartOffset":"0"}]}],"throttleTime":0}}
So even if there was a disconnection, it seems that kafkaJS is able to send request again when connection is back ?
Full logs are here
Describe the bug A lambda I'm running will occasionally receive an ETIMEDOUT error when producing messages to Kafka. When this happens, kafkajs disconnects from the broker and never reconnects, but will continue to retry sending the messages.
To Reproduce
Unfortunately I cannot link to sample code or give you a way to reproduce this reliably (maybe someone reading this can guide me toward a way of forcing an ETIMEDOUT error?), but I will explain our setup with as much code as I can. We're not doing anything too strange here, just setting it up and producing messages.
We configure the kafka object with the following:
and then the producer, nothing wild here:
Expected behavior
I expect this to disconnect from the broker, try to reconnect to the broker, and if it successfully does that, start retrying the messages.
Observed behavior
tl;dr the timeout error happens, kafkajs disconnects from the broker, messages are retried without reconnecting to the broker.
This code is running in an aws lambda and happily chugs along doing what it should for awhile, then we receive an ETIMEDOUT error while producing. Log looks like the following:
after this, with debug logging turned on, we see the following logs:
[Connection] disconnecting...
[RequestQueue] Waiting for pending requests
[Connection] Kafka server has closed connection
[Connection] disconnecting...
[Connection] disconnected
[Connection] Request Metadata(key: 3, version: 5)
[Connection] Request Metadata(key: 3, version: 5)
[Connection] Request Metadata(key: 3, version: 5)
(presumably the 3 last lines are one request to each of the 3 brokers) and then the lambda times out. EDIT: the 3 last lines are actually just 3 retries to 1 broker, and they happen a few seconds apart from each other
Environment:
Additional context I have dug around in the code quite a bit trying to understand what's going on here and these are my findings, unclear whether they're actually useful, but here they are.
There are retriers at several levels of the stack, I found them in
connection.js
(although this one seems to be unused?),cluster/index.js
,messageProducer.js
,producer/index.js
andsendMessages.js
. The retrier inmessageProducer.js
is not created withcreateRetry
and includes code to actually reconnect to the broker if it's disconnected (great!), but afaict the retrier insendMessages.js
catches this error first and retries the messages without attempting a reconnect to the cluster. Honestly though, any of the retries further down the stack other than the one insendMessages.js
could be what's catching it, they all use the generic retrier fromcreateRetry
.Also, in my attempts to work around what I saw as the problem, I added a disconnect listener to my code that listens for disconnects and tries to reinitiate the connection, but the
DISCONNECT
event never seems to fire (based on logging toproducer.logger().info()
I set up in the listener) when the ETIMEDOUT error happens, so the listener does not get a chance to do its job. I have tests in my codebase to trigger theDISCONNECT
event and this code does what I'd expect it to in those tests. Code for the listener follows: