Open aaaeeeo opened 4 years ago
Could you try running the consumer with DEBUG
log level and posting the logs here? If the broker is not found in the metadata, we should refresh the metadata and try again.
If you aren't working with sensitive data, it would be helpful to also get response buffers to be able to see what actual metadata we got back. This can be enabled with the environment variable KAFKAJS_DEBUG_PROTOCOL_BUFFERS
. Note that this will not include actual messages, but it can include things like hostnames, ports, etc.
Hi, I'm experiencing this issue at the moment. It occurs when the group coordinator port is different than the broker port. When the group coordinator isn't found in the cached metadata, it refreshes the cache. But it only attempts to retrieve the broker metadata, not the group coordinator metadata. Then the group coordinator node isn't found in the metadata, causing a few iterations of failures before giving up entirely.
Here's a debug log showing what's happening:
>> Connect to the Kafka broker URL
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.115Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"server1:10163","clientId":"PricingSimulatorConnection","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.250Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":0,"expectResponse":true,"size":40}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.281Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 2)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":0,"size":272,"data":{"errorCode":0,"apiVersions":[{"apiKey":0,"minVersion":0,"maxVersion":6},{"apiKey":1,"minVersion":0,"maxVersion":8},{"apiKey":2,"minVersion":0,"maxVersion":3},{"apiKey":3,"minVersion":0,"maxVersion":6},{"apiKey":4,"minVersion":0,"maxVersion":1},{"apiKey":5,"minVersion":0,"maxVersion":0},{"apiKey":6,"minVersion":0,"maxVersion":4},{"apiKey":7,"minVersion":0,"maxVersion":1},{"apiKey":8,"minVersion":0,"maxVersion":4},{"apiKey":9,"minVersion":0,"maxVersion":4},{"apiKey":10,"minVersion":0,"maxVersion":2},{"apiKey":11,"minVersion":0,"maxVersion":3},{"apiKey":12,"minVersion":0,"maxVersion":2},{"apiKey":13,"minVersion":0,"maxVersion":2},{"apiKey":14,"minVersion":0,"maxVersion":2},{"apiKey":15,"minVersion":0,"maxVersion":2},{"apiKey":16,"minVersion":0,"maxVersion":2},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":2},{"apiKey":19,"minVersion":0,"maxVersion":3},{"apiKey":20,"minVersion":0,"maxVersion":2},{"apiKey":21,"minVersion":0,"maxVersion":1},{"apiKey":22,"minVersion":0,"maxVersion":1},{"apiKey":23,"minVersion":0,"maxVersion":1},{"apiKey":24,"minVersion":0,"maxVersion":1},{"apiKey":25,"minVersion":0,"maxVersion":1},{"apiKey":26,"minVersion":0,"maxVersion":1},{"apiKey":27,"minVersion":0,"maxVersion":0},{"apiKey":28,"minVersion":0,"maxVersion":1},{"apiKey":29,"minVersion":0,"maxVersion":1},{"apiKey":30,"minVersion":0,"maxVersion":1},{"apiKey":31,"minVersion":0,"maxVersion":1},{"apiKey":32,"minVersion":0,"maxVersion":2},{"apiKey":33,"minVersion":0,"maxVersion":1},{"apiKey":34,"minVersion":0,"maxVersion":1},{"apiKey":35,"minVersion":0,"maxVersion":1},{"apiKey":36,"minVersion":0,"maxVersion":0},{"apiKey":37,"minVersion":0,"maxVersion":1},{"apiKey":38,"minVersion":0,"maxVersion":1},{"apiKey":39,"minVersion":0,"maxVersion":1},{"apiKey":40,"minVersion":0,"maxVersion":1},{"apiKey":41,"minVersion":0,"maxVersion":1},{"apiKey":42,"minVersion":0,"maxVersion":1}],"throttleTime":0,"clientSideThrottleTime":0}}{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.283Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"server1:10163","supportAuthenticationProtocol":true}{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.288Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":1,"expectResponse":true,"size":47}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.328Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":1,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.330Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"server1:10163"}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.337Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":2,"expectResponse":true,"size":613}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.366Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":2,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.368Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"server1:10163"}
>> Authenticated successfully
>> Request the metadata for the specified URL
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.380Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 6)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":3,"expectResponse":true,"size":71}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.413Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":3,"size":156,"data":{"throttleTime":0,"brokers":[{"nodeId":931905689,"host":"server2","port":11061,"rack":null}],"clusterId":"tcm-pulsar1","controllerId":-1,"topicMetadata":[{"topicErrorCode":0,"topic":"pricing.simulator.status","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":931905689,"replicas":[931905689],"isr":[931905689],"offlineReplicas":[]}]}],"clientSideThrottleTime":0}}
>> Response Metadata returns a different broker URL, nodeId = 931905689
>> Connect to different broker URL
{"level":"INFO","timestamp":"2021-10-27T14:53:52.416Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"PricingSimulatorControlStatus"}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.421Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"server2:11061","clientId":"PricingSimulatorConnection","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.535Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"server2:11061","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.537Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":0,"expectResponse":true,"size":47}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.566Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":0,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.568Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"server2:11061"}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.569Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":1,"expectResponse":true,"size":613}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.598Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":1,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.600Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"server2:11061"}
>> Authenticated successfully
>> Request the Group Coordinator
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.603Z","logger":"kafkajs","message":"[Connection] Request GroupCoordinator(key: 10, version: 2)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":2,"expectResponse":true,"size":72}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.643Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 2)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":2,"size":70,"data":{"throttleTime":0,"errorCode":0,"errorMessage":null,"coordinator":{"nodeId":1190085561,"host":"server3","port":11060},"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.645Z","logger":"kafkajs","message":"[Cluster] Found group coordinator","nodeId":1190085561}
>> Returns the Group Coordinator, says the nodeId is 1190085561.
>> The response indicates the Group Coordinator is different host and port than the broker (server3 11060 and server2 11061)
>> Then Request Metadata again, but for the broker host and port (server2 11061)
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.646Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 6)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":3,"expectResponse":true,"size":71}
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.676Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":3,"size":156,"data":{"throttleTime":0,"brokers":[{"nodeId":931905689,"host":"server2","port":11061,"rack":null}],"clusterId":"tcm-pulsar1","controllerId":931905689,"topicMetadata":[{"topicErrorCode":0,"topic":"pricing.simulator.status","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":931905689,"replicas":[931905689],"isr":[931905689],"offlineReplicas":[]}]}],"clientSideThrottleTime":0}}
>> Returns Matadata, says nodeId is 931905689
{"level":"DEBUG","timestamp":"2021-10-27T14:53:52.678Z","logger":"kafkajs","message":"[Cluster] Broker 1190085561 not found in the cached metadata, refreshing metadata and trying again...","groupId":"PricingSimulatorControlStatus","retryCount":0,"retryTime":299}
>> Doesn't like this, so tries it again a few more times before failing entirely.
This only occurs when the broker and group coordinator have different hosts/ports. I occasionally get the broker and group coordinator on the same host/port when I connect. When this happens the connection works fine. Here's a log showing that:
>> Connect to the Kafka broker URL
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.180Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"server1:10163","clientId":"PricingSimulatorConnection","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.354Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":0,"expectResponse":true,"size":40}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.381Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 2)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":0,"size":272,"data":{"errorCode":0,"apiVersions":[{"apiKey":0,"minVersion":0,"maxVersion":6},{"apiKey":1,"minVersion":0,"maxVersion":8},{"apiKey":2,"minVersion":0,"maxVersion":3},{"apiKey":3,"minVersion":0,"maxVersion":6},{"apiKey":4,"minVersion":0,"maxVersion":1},{"apiKey":5,"minVersion":0,"maxVersion":0},{"apiKey":6,"minVersion":0,"maxVersion":4},{"apiKey":7,"minVersion":0,"maxVersion":1},{"apiKey":8,"minVersion":0,"maxVersion":4},{"apiKey":9,"minVersion":0,"maxVersion":4},{"apiKey":10,"minVersion":0,"maxVersion":2},{"apiKey":11,"minVersion":0,"maxVersion":3},{"apiKey":12,"minVersion":0,"maxVersion":2},{"apiKey":13,"minVersion":0,"maxVersion":2},{"apiKey":14,"minVersion":0,"maxVersion":2},{"apiKey":15,"minVersion":0,"maxVersion":2},{"apiKey":16,"minVersion":0,"maxVersion":2},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":2},{"apiKey":19,"minVersion":0,"maxVersion":3},{"apiKey":20,"minVersion":0,"maxVersion":2},{"apiKey":21,"minVersion":0,"maxVersion":1},{"apiKey":22,"minVersion":0,"maxVersion":1},{"apiKey":23,"minVersion":0,"maxVersion":1},{"apiKey":24,"minVersion":0,"maxVersion":1},{"apiKey":25,"minVersion":0,"maxVersion":1},{"apiKey":26,"minVersion":0,"maxVersion":1},{"apiKey":27,"minVersion":0,"maxVersion":0},{"apiKey":28,"minVersion":0,"maxVersion":1},{"apiKey":29,"minVersion":0,"maxVersion":1},{"apiKey":30,"minVersion":0,"maxVersion":1},{"apiKey":31,"minVersion":0,"maxVersion":1},{"apiKey":32,"minVersion":0,"maxVersion":2},{"apiKey":33,"minVersion":0,"maxVersion":1},{"apiKey":34,"minVersion":0,"maxVersion":1},{"apiKey":35,"minVersion":0,"maxVersion":1},{"apiKey":36,"minVersion":0,"maxVersion":0},{"apiKey":37,"minVersion":0,"maxVersion":1},{"apiKey":38,"minVersion":0,"maxVersion":1},{"apiKey":39,"minVersion":0,"maxVersion":1},{"apiKey":40,"minVersion":0,"maxVersion":1},{"apiKey":41,"minVersion":0,"maxVersion":1},{"apiKey":42,"minVersion":0,"maxVersion":1}],"throttleTime":0,"clientSideThrottleTime":0}}{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.384Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"server1:10163","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.388Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":1,"expectResponse":true,"size":47}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.416Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":1,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.418Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"server1:10163"}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.421Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":2,"expectResponse":true,"size":613}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.452Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":2,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.453Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"server1:10163"}
>> Authenticated successfully
>> Request the metadata for the specified URL
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.463Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 6)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":3,"expectResponse":true,"size":71}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.506Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"server1:10163","clientId":"PricingSimulatorConnection","correlationId":3,"size":156,"data":{"throttleTime":0,"brokers":[{"nodeId":931905689,"host":"server2","port":11061,"rack":null}],"clusterId":"tcm-pulsar1","controllerId":-1,"topicMetadata":[{"topicErrorCode":0,"topic":"pricing.simulator.status","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":931905689,"replicas":[931905689],"isr":[931905689],"offlineReplicas":[]}]}],"clientSideThrottleTime":0}}
>> Response Metadata returns a different broker URL, nodeId = 931905689
>> Connect to different broker URL
{"level":"INFO","timestamp":"2021-10-28T12:13:02.508Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"PricingSimulatorControlStatus"}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.510Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"server2:11061","clientId":"PricingSimulatorConnection","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.621Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"server2:11061","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.623Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":0,"expectResponse":true,"size":47}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.647Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":0,"size":17,"data":{"errorCode":0,"enabledMechanisms":["PLAIN"]}}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.648Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"server2:11061"}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.650Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":1,"expectResponse":true,"size":613}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.683Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":1,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.684Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"server2:11061"}
>> Authenticated successfully
>> Request the Group Coordinator
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.689Z","logger":"kafkajs","message":"[Connection] Request GroupCoordinator(key: 10, version: 2)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":2,"expectResponse":true,"size":72}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.728Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 2)","broker":"server2:11061","clientId":"PricingSimulatorConnection","correlationId":2,"size":70,"data":{"throttleTime":0,"errorCode":0,"errorMessage":null,"coordinator":{"nodeId":931905689,"host":"server2","port":11061},"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2021-10-28T12:13:02.729Z","logger":"kafkajs","message":"[Cluster] Found group coordinator","nodeId":931905689}
>> Returns the Group Coordinator, says the nodeId is 931905689.
>> The response indicates the Group Coordinator is the SAME host and port as the broker (11061)
>> Carry on normally from here
Same issue to me .
I think kafkajs
should connect Group Coordinator with host&port when cannot found node in cached metadata
please, help
同样的问题,按照官方示例写得代码,消息能发出,但是消费链接不上,请大佬帮忙
Describe the bug Getting
KafkaJSNonRetriableError
when the client try to consume message:To Reproduce Blow consumer code was used:
Expected behavior Consumer should be able to read messages.
Observed behavior Getting
KafkaJSNonRetriableError
with following log:Looks like the group coordinator broker somehow cannot be found in cached metadata?
Environment:
Additional context Tested using same config with Kafka-python, and it is working fine.