Closed eliw00d closed 5 years ago
I haven't done any testing yet, so don't take this as an authoritative answer, but I would suspect that there's something in your environment that is caching the IP.
KafkaJS uses the Net and TLS modules to create sockets, which defaults to using the dns.lookup
method to resolve domains. By default, that has no caching mechanism and just delegates to the OS.
So, when we provide
brokers: [process.env.BROKER]
where process.env.BROKER
is something like 'kafka-broker.service.consul:9092'
, we then get error messages like:
Error: connect ETIMEDOUT www.xxx.yyy.zzz:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1106:14)
But, when we remote into the pod and ping 'kafka-broker.service.consul:9092'
, we get the correct IP. It could very well be something else in our environment, but I'm not sure why what would happen.
When you .connect()
using something like 'kafka-broker.service.consul:9092'
, would it do a lookup at the time of connection and then only ever know about the resolved IPs from that initial connection? We only connect once, when the pod gets deployed.
The lookup would only happen when it tries to establish a connection, yes, so if the broker is replaced and the resolved IP changed, my assumption would be that the existing connection is closed and KafkaJS would try to reconnect, which should mean that a new lookup is done - but this is only an assumption at this point (the lookup is done internally by Net/TLS.Socket, not by us, so I'm not 100% sure).
I think this sounds like a promising line of reasoning though. It would be good to try to get a repro case up. Do you think you could explain a bit more about your setup and maybe share some debug logs from when this is happening?
And just to make sure, what's your consul dns_config
? By default, it allows stale reads.
@eliw00d it works like this:
1) when the client connects the first time, it uses the brokers in the list to fetch metadata about the cluster
2) The metadata includes the ips of all nodes in the cluster, this information is cached and expires according to metadataMaxAge
, which by default is 5 minutes. We also re-fetch metadata on some errors
3) The library will fallback to the original brokers (seed brokers) if it can't use the metadata ips
Like @Nevon said, we use NET and TLS, so we don't cache DNS, it just delegates to the OS.
Have you configured KAFKA_ADVERTISED_HOST_NAME
in your cluster? You can try to run your consumers with log-level debug
, it should give you more information about what is happening.
I know that we changed the dns_config to not allow stale reads and I'm pretty sure we don't have KAFKA_ADVERTISED_HOST_NAME configured anywhere.
I will try logging to see if that helps narrow anything down, and try to get some more information for you guys.
So, I put some logging in and re-deployed the pod. Then, I deleted the brokers it knew about and tried to do a producer.send()
. This resulted in connection timeouts for all of the previously resolved IPs (172.23.1.78, 172.23.1.192, 172.23.1.194) but no attempts to do a new lookup (which would result in 172.23.1.110 and 172.23.1.182). It finally failed with a retries exceeded error. One thing I found interesting is this log:
{"level":"debug","time":1565189319221,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"my-kafka-broker.service.consul:9092","clientId":"my-client-id","correlationId":2,"size":459,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"172.23.1.110","port":9092,"rack":"my-rack"},{"nodeId":3,"host":"172.23.1.182","port":9092,"rack":"my-rack"}],"clusterId":"my-cluster-id","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"my-topic","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":2,"replicas":[2,3,1],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":2,"leader":2,"replicas":[1,2,3],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":1,"leader":3,"replicas":[3,1,2],"isr":[3,2],"offlineReplicas":[1]}]},{"topicErrorCode":0,"topic":"my-topic","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":3,"replicas":[3,1,2],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":2,"leader":2,"replicas":[2,3,1],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":1,"leader":2,"replicas":[1,2,3],"isr":[3,2],"offlineReplicas":[1]}]}]},"msg":"Response Metadata(key: 3, version: 5)","v":1}
where you can clearly see the brokers have been updated. So, KafkaJS does have the updated metadata during this time.
It definitely seems like no new lookups were done when producer.send()
was called. Would the result be different if I did something like:
await producer.connect()
await producer.send()
await producer.disconnect()
? It already seems to connect/disconnect on its own before/after the call to producer.send()
, so this seemed unnecessary.
Here is a snippet of the preceding logs (there were many similar loops):
{"level":"debug","time":1565189318072,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"172.23.1.192:9092","clientId":"my-client-id","ssl":false,"sasl":false,"msg":"Connecting","v":1} {"level":"error","time":1565189319072,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"172.23.1.192:9092","clientId":"my-client-id","msg":"Connection timeout","v":1} {"level":"error","time":1565189319072,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"BrokerPool","retryCount":5,"retryTime":7612,"msg":"Failed to connect to broker, reconnecting","v":1} {"level":"debug","time":1565189319073,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"my-kafka-broker.service.consul:9092","clientId":"my-client-id","ssl":false,"sasl":false,"msg":"Connecting","v":1} {"level":"debug","time":1565189319220,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"my-kafka-broker.service.consul:9092","clientId":"my-client-id","correlationId":2,"expectResponse":true,"size":86,"msg":"Request Metadata(key: 3, version: 5)","v":1}
Okay, so I finally dug through the source and found this. So, if the nodeId matches the previous metadata's nodeId, it returns result without any changes. In our case, we reuse nodeIds, so it never gets a chance to check if the host and port match or assign new values to that broker.
Maybe that could be changed to:
this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
const existingBroker = result[nodeId]
if (existingBroker && existingBroker.host === host && existingBroker.port === port && existingBroker.rack === rack) {
or similar ?
@tulios - In response to your points made above:
- The metadata includes the ips of all nodes in the cluster, this information is cached and expires according to metadataMaxAge, which by default is 5 minutes. We also re-fetch metadata on some errors
This should likely refetch metadata if a broker connection fails. Otherwise, if a broker IP changes (such as when hosted in Kubernetes), you could have up to 5 minutes of a missing broker (or more).
- The library will fallback to the original brokers (seed brokers) if it can't use the metadata ips
This can be dangerous. It should likely fallback to the original bootstrap servers and do another metadata lookup. See https://issues.apache.org/jira/browse/KAFKA-3068 for reasoning from the past.
Broker IPs should not be assumed as near-static, if metadata is outdated the cache should be updated if possible (i.e. broker is up but IP changed), or fail if the broker is down (as expected).
KAFKA_ADVERTISED_HOST_NAME
is set on the broker properly -- I'd take a close look at how the Java clients handle metadata caching and broker metadata changes.
@devshawn Seed brokers in this case = original bootstrap servers
@eliw00d we can consider this a bug, if nodeId
, host
and ip
are the same we should reuse the broker, otherwise we should replace.
I don't know how often rack would change, if at all, but since it's used here would it be safe to ensure that it is criteria for replacing as well?
What would be a good workaround for this in the interim?
@eliw00d we have the pre-release 1.11.0-beta.1
with the fix (PR #457), give it a try.
@tulios It works! Thank you! With idempotent on it actually times out our request while retrying all the previous IPs, but eventually succeeds in the background. So, we'll just have to figure that out separately. Thanks again!
We use *.service.consul URLs so that the IPs can be resolved during runtime to the correct Kafka brokers. However, we started noticing stale IPs whenever KafkaJS tried to connect. If we deleted the pods in k8s and created new ones, the IPs were correct.
I haven't really dug too deep into the source yet to know exactly where this happens, but maybe a configuration option would be nice to disable the caching? Or is there something I am missing?