tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Intermittent `Error: read ETIMEDOUT` while consuming with sasl/ssl enabled #1635

Open ghost opened 1 year ago

ghost commented 1 year ago

Describe the bug I have a simple consumer with default configs. I am using sasl (scram-sha-512) with ssl. The consumer consumers as expected, but everyone once in a while, it stops consuming and the logs show the following error:

REQUEST at 1698772873191 | kafka0:9094 | example-consumer | 5377 | 1 | Fetch | 11
{"level":"DEBUG","timestamp":"2023-10-31T17:21:13.191Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 11)","broker":"kafka0:9094","clientId":"example-consumer","correlationId":5377,"size":115,"data":"[filtered]"}
{"level":"ERROR","timestamp":"2023-10-31T17:22:12.726Z","logger":"kafkajs","message":"[Connection] Connection error: read ETIMEDOUT","broker":"kafka0:9094","clientId":"example-consumer","stack":"Error: read ETIMEDOUT\n    at TLSWrap.onStreamRead (node:internal/stream_base_commons:217:20)"}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:12.731Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"kafka0:9094","clientId":"example-consumer"}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:12.736Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"kafka0:9094","clientId":"example-consumer"}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:12.744Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"kafka0:9094","clientId":"example-consumer","ssl":true,"sasl":true}

Along with above error, I also sometimes see the following log:

REQUEST at 1698772936274 | kafka0:9094 | example-consumer | 18139 | 12 | Heartbeat | 3
{"level":"ERROR","timestamp":"2023-10-31T17:22:16.274Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka0:9094","clientId":"example-consumer","error":"The coordinator is not aware of this member","correlationId":18139,"size":10}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:16.274Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka0:9094","clientId":"example-consumer","error":"The coordinator is not aware of this member","correlationId":18139,"payload":{"type":"Buffer","data":"[filtered]"}}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:16.275Z","logger":"kafkajs","message":"[FetchManager] Stopped fetchers"}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:16.275Z","logger":"kafkajs","message":"[Runner] Error while scheduling fetch manager, trying again...","groupId":"test-group-5","memberId":"example-consumer-cf90b406-2861-46f5-ad14-369f8ee0314d","error":"Connection error: read ETIMEDOUT","stack":"KafkaJSConnectionError: Connection error: read ETIMEDOUT\n    at TLSSocket.onError (/Users/saketmehta/Moveworks/kafkajs/node_modules/kafkajs/src/network/connection.js:210:23)\n    at TLSSocket.emit (node:events:526:28)\n    at emitErrorNT (node:internal/streams/destroy:157:8)\n    at emitErrorCloseNT (node:internal/streams/destroy:122:3)\n    at processTicksAndRejections (node:internal/process/task_queues:83:21)","retryCount":2,"retryTime":1660}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:17.937Z","logger":"kafkajs","message":"[FetchManager] Starting..."}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:17.940Z","logger":"kafkajs","message":"[FetchManager] Created 3 fetchers","nodeIds":["0","1","2"],"concurrency":1}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:17.961Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 11)","broker":"kafka0:9094","clientId":"example-consumer","correlationId":5378,"expectResponse":true,"size":134}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:17.961Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 11)","broker":"kafka1:9094","clientId":"example-consumer","correlationId":5479,"expectResponse":true,"size":134}
{"level":"DEBUG","timestamp":"2023-10-31T17:22:17.962Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 11)","broker":"kafka2:9094","clientId":"example-consumer","correlationId":5469,"expectResponse":true,"size":134}

Each time this happens, I see a timeout error related to the TLS connection. The consumer recovers automatically after a couple minutes usually, but during that time no messages are being processed. This can be seen from the below graph as well:

image

I checked the kafka broker during the same period and it looks like consumer fails to send a heartbeat.

2023-10-31 14:31:13,902 INFO [GroupCoordinator 0]: Member example-consumer-a9fe9556-542a-4fe4-a5e0-0508e81b0b7b in group test-group-5 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]

To Reproduce

  1. Run a producer that continuously produces messages to a topic at a fixed rate
  2. Run a consumer that subscribes to that topic and enable debug logs
  3. Observe intermittent issue described above

Expected behavior Smooth consumption without drops.

Environment:

Additional context An interesting thing is I carried out the experiment on 2 clusters, and the issue only happens on one of them. The problematic cluster is deployed on kubernetes via Strimzi, and the cluster that works fine is deployed via docker on aws ec2 hosts. Both the clusters are provisioned the same amount of resources, and the broker configs, topic configs etc. are identical.

ghost commented 1 year ago

I found something interesting! I had this config in the broker:

replica.selector.class: org.apache.kafka.common.replica.RackAwareReplicaSelector

but the clients were not specifying a rackId. Deleting this config from the broker immediately fixed the issue I described above.

42someone commented 4 weeks ago

Additional info for future reads: About config How to implement it with kafkajs