Open scmorse opened 7 years ago
What is your cluster setup? Any complex routing?
I talked to our dev-ops person, it seems that our development cluster has 3 brokers and a replication factor of 1.
I copied this repo, put some debug lines in, deployed it to our development environment, and the results are interesting:
[13/Feb/2017:19:43:41.589 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:43:41.592 +00:00] [] [info] - AY763536 connectionId:5 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:43:41.593 +00:00] [] [info] - AY763536 connectionId:6 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.178 +00:00] [] [info] - AY763536 connectionId:5 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.180 +00:00] [] [info] - AY763536 connectionId:6 attempted to resolve message with correlationId:8 but no resolve found!
In connection.js
, I made each Connection
instance have a unique connectionId
. There are only 3 connections that log messages for receiving duplicate responses, so I'm guessing that each connection corresponds to a connection to one of the 3 brokers.
Looking at the relevant sent/received messages for one of the connections:
[13/Feb/2017:19:43:41.585 +00:00] [] [info] - AY763536 connectionId:4 sending message with correlationId:8, messageHex:00100000000000080010746e732d6b61666b612d636c69656e74, message:\x00\x00\x00\x00\x00\x00\x00tns-kafka-client
[13/Feb/2017:19:43:41.588 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:43:41.588 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:43:41.589 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.171 +00:00] [] [info] - AY763536 connectionId:4 sending message with correlationId:8, messageHex:00100000000000080010746e732d6b61666b612d636c69656e74, message:\x00\x00\x00\x00\x00\x00\x00tns-kafka-client
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
So the same connection object is sending two messages with the same correlation ID (strange, right? The messages are sent 3 minutes apart), and each time, we receive 2 replies. The second reply seems to have the exact same contents as the first reply, so that fact that the second reply has essentially been ignored seems okay.
Is it producer? Simple or Group consumer? Can you share the code?
Also, is it SSL connection or not? Can you show initial (connecting/connected) log messages?
Is it producer? Simple or Group consumer? Can you share the code?
I can't really share the code, but we do use a GroupConsumer and a Producer, although I think the GroupConsumer is where these issues are happening. The Consumer uses a DefaultAssignmentStrategy
. Our connectionString
is of the form ip1:port,ip2:port,ip3:port
. Our topics have 100 partitions, and 2 group consumers.
Also, is it SSL connection or not? Can you show initial (connecting/connected) log messages?
No SSL. And I put the log level to 4 (debug), but I don't see any (connecting/connected) log messages.
I confirmed that each of the connections that are having this issue with receiving duplicate responses are stored in the brokerConnections
object in client.js
.
It is unexpected, though, that an instance of Connection
would send two messages with the same correlationId, isn't it? It seems to be the exact same message, but sent 3 minutes apart, as shown in the logs in my previous comment. And each time the message is sent, it gets 2 or more replies.
I also see about 40 different instances of the Connection object being opened. Does this seem normal, or higher than you would expect?
Possibly related to this error, the Kafka logs show some rebalancing, which I'm not entirely sure why is happening, since we have fairly stable consumers that don't go on/off line all that often, and I don't think the brokers are changing very frequently either.
I would like to try to bring up a Kafka cluster with 3 brokers on my local machine using docker, and make a small code sample to see if the error is reproducible, but that would take some time that I can't really commit to this issue right now. For the time being, the error seems benign, so we are ignoring it.
Side note, you don't happen to have any plans to implement an equivalent of the ConsumerRebalanceListener, do you? I'm guessing not. The dev-ops person who manages our Kafka clusters thinks that committing the most recent offset when partitions get revoked might help this issue, although we already commit the latest offset after each batch of messages that we process.
When a rebalance happens and a client loses a partition, do no-kafka GroupConsumers finish processing all messages they have already started before giving up control of those partitions?
The duplicated messages seem to be metadata messages rather than actual kafka messages that were put onto a topic, anyway, so maybe the rebalancing isn't related to this issue.
Are there messages such as Rejoining group on RebalanceInProgress
or others?
When a rebalance happens and a client loses a partition, do no-kafka GroupConsumers finish processing all messages they have already started before giving up control of those partitions?
This is not controlled by GroupConsumer, if there was successful fetch request then all these messages are being passed to processing handler, if Kafka server has started a rebalance then fetch request will fail with something like NotLeaderForPartition
.
If I correctly understand those messages HEX dumps, they are ListGroup
requests. Are you using GroupAdmin
and its listGroups
method?
If I correctly understand those messages HEX dumps, they are ListGroup requests. Are you using GroupAdmin and its listGroups method?
Yes, we use that as part of our health check mechanism, to verify that our communication with Kafka is working. Thank you for pointing out that those messages are ListGroups
messages. I saw the responses, but wasn't sure what they were from, I guess I just thought it was part of the metadata.
I'm still not sure of the root cause of the duplicate responses to the listGroups call, but it seems like an error we can safely catch and ignore, since communication with kafka is still working in this instance, which is all we really care about.
Are there messages such as Rejoining group on RebalanceInProgress or others?
No, but there may be a logging configuration problem.
Can you stop using listGroups
request and/or GroupAdmin
and see if it resolves duplicate messages problem?
I would like to confirm that the problem happens only to GroupAdmin
and not GroupConsumer
or Producer
@scmorse This might be trivial, but do you have different environments (like dev and test) that use the different topic names with same group-id on same broker?
Can you stop using listGroups request and/or GroupAdmin and see if it resolves duplicate messages problem?
I removed the call to listGroups but kept the initialization of the the GroupAdmin
, and the problem went away. No more log messages about "cannot read property 'resolve' of undefined".
I would like to confirm that the problem happens only to GroupAdmin and not GroupConsumer or Producer
Yes, that seems to be the case. It's not even in the initialization of the group consumer, it's something specifically about the listGroups
call.
This might be trivial, but do you have different environments (like dev and test) that use the different topic names with same group-id on same broker?
We do have different environments, but the topic names and the group names encode the environment name into their titles, so they should be unique-per-env. We do use just one constant clientId
though, and I thought kafka might be confusing our clients and that might be causing the duped message. But I tried using a unique clientId
as well and the error persisted.
@oleksiyk is it best practice to use a random clientId
each time the app starts, or is it okay to have multiple consumers with the same clientId
?
I have replaced the listGroups
call with a call to describeGroup
, and the error has gone away, so that is the work around we are going to go with right now.
is it best practice to use a random clientId each time the app starts, or is it okay to have multiple consumers with the same clientId?
@scmorse I use the same clientId
each time.
Yes, that seems to be the case. It's not even in the initialization of the group consumer, it's something specifically about the listGroups call.
The problem is that GroupAdmin doesn't refresh broker metadata, so once Kafka cluster rebalances it sends listGroup request to outdated list of brokers which triggers the error. I will fix that.
@oleksiyk The problem went away when I started using describeGroup
, though. Wouldn't that call have the same metadata issue if GroupAdmins weren't refreshing their metadata?
Switching from listGroups
to describeGroup
has gotten rid of the cannot read property 'resolve' of undefined
errors, but has surfaced a few protocol errors:
RangeError: Trying to access beyond buffer length
at _Reader.Reader.demand (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:79:15)
at _Reader.define.read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/index.js:118:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as raw] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:47:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:496:27)
at _Reader.wrapper [as string] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:56:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:496:27)
at _Reader.wrapper [as DescribeGroupResponse_ConsumerGroupMemberItem] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:83:18)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:496:27)
at _Reader.wrapper [as DescribeGroupResponse_GroupItem] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:95:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:495:27)
at _Reader.wrapper [as DescribeGroupResponse] (/app/node_modules/lodash/lodash.js:5356:16)
at /app/node_modules/tns-kafka/node_modules/no-kafka/lib/client.js:743:55
at bound (domain.js:287:14)
at runBound (domain.js:300:12)
at tryCatcher (/app/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/app/node_modules/bluebird/js/release/promise.js:512:31)
at Promise._settlePromise (/app/node_modules/bluebird/js/release/promise.js:569:18)
at Promise._settlePromise0 (/app/node_modules/bluebird/js/release/promise.js:614:10)
at Promise._settlePromises (/app/node_modules/bluebird/js/release/promise.js:693:18)
at Async._drainQueue (/app/node_modules/bluebird/js/release/async.js:133:16)
at Async._drainQueues (/app/node_modules/bluebird/js/release/async.js:143:10)
at Immediate.Async.drainQueues [as _onImmediate] (/app/node_modules/bluebird/js/release/async.js:17:14)
at processImmediate [as _immediateCallback] (timers.js:383:17)
And
RangeError: index out of range
at checkOffset (buffer.js:688:11)
at Buffer.readInt32BE (buffer.js:853:5)
at _Reader.define.read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/index.js:101:47)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:496:27)
at _Reader.wrapper (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:65:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:56:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:496:27)
at _Reader.wrapper [as DescribeGroupResponse_ConsumerGroupMemberItem] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:83:18)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:496:27)
at _Reader.wrapper [as DescribeGroupResponse_GroupItem] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:497:27)
at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:95:14)
at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/app/node_modules/lodash/lodash.js:495:27)
at _Reader.wrapper [as DescribeGroupResponse] (/app/node_modules/lodash/lodash.js:5356:16)
at /app/node_modules/tns-kafka/node_modules/no-kafka/lib/client.js:743:55
at bound (domain.js:287:14)
at runBound (domain.js:300:12)
at tryCatcher (/app/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/app/node_modules/bluebird/js/release/promise.js:512:31)
at Promise._settlePromise (/app/node_modules/bluebird/js/release/promise.js:569:18)
at Promise._settlePromise0 (/app/node_modules/bluebird/js/release/promise.js:614:10)
at Promise._settlePromises (/app/node_modules/bluebird/js/release/promise.js:693:18)
at Async._drainQueue (/app/node_modules/bluebird/js/release/async.js:133:16)
at Async._drainQueues (/app/node_modules/bluebird/js/release/async.js:143:10)
at Immediate.Async.drainQueues [as _onImmediate] (/app/node_modules/bluebird/js/release/async.js:17:14)
at processImmediate [as _immediateCallback] (timers.js:383:17)
These errors seem to occur much less frequently than the listGroups
error, although I suspect the issues may be related.
Both of these errors relate to admin.js:95:14
(DescribeGroupResponse) > admin.js:83:18
(DescribeGroupResponse_ConsumerGroupMemberItem) > admin.js:56:14
(DescribeGroupResponse_ConsumerGroupMemberItem). Looks like a protocol level error, although I'm sure @oleksiyk, you would have more insight.
(Using Kafka 0.9.0.1, btw)
Can you try https://github.com/oleksiyk/kafka/commit/238fe7a50ae1cba147b17eb72824551b1bab1afc and see if it fixes the protocol error?
I'm having a difficult time deploying the package referencing a git url (works locally just can't deploy it), would you mind publishing the updated code in a prerelease?
npm version patch
npm version prerelease
npm publish --tag debug-protocol-fix
(I put the --tag
on there so that people who installed the 'latest' won't get this version until we verify it works)
Done, try it.
no-kafka@3.1.3-0
No luck. Pretty soon after it deployed, it logged another RangeError: Trying to access beyond buffer length
error.
I need to replicate it. Until I see what I get back from Kafka I can't really guess whats wrong with that packet.
How do you initiate each describeGroup
call? Do you create GroupAdmin
instance each time or using the same one? What is your connectionString
? Does it have any repeating/duplicate hosts? Maybe one time mentioned by IP and other by domain name?
I'm also getting
TypeError: Cannot read property 'resolve' of undefined
at Connection._receive (/app/node_modules/wix-bootstrap-greynode/node_modules/greynode-core/node_modules/no-kafka/lib/connection.js:205:30)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at TCP.onread (net.js:547:20)
I don't use any GroupAdmin
nor DescribeGroup
. Only GroupConsumer
and Producer
.
I have a suspicion that it happens on rebalancing - when I had a single node process consuming messages that didn't happen, but once more instances joined consuming the same topic & groupId those have started to appear
More instances in the same node process or different node processes?
same one
Once all instances had joined (deployed), looks like errors stop to appear. Still examining ...
@oleksiyk So I commented out your printout, plus added data
argument to it. Obviously I get a huge blob of binary data, which doesn't look nice in log. However I do see in between some readable stuff and those are definitely the complete listing of the topics we have in our cluster.
I'm not that much familiar with kafka protocol, will try to find out which message is resolved twice in my case.
Any hints are more than welcome :)
Regardless of my findings to come, if any, WDYT about wrapping this.queue[correlationId].resolve(new Buffer(data.slice(4, length + 4)));
with try...catch...report
meanwhile to keep node process from dying disgracefully?
Can you save the data
of the duplicate message into the separate file (fs.writeFile
, etc) and share it here in any way?
@oleksiyk here you go. Recorded four messages. kafka_messages.tar.gz
@oleksiyk did you have a chance to take a look into my dumps? Anything interesting?
I didn't have much progress.
My plan is to add additional copy of queue
with TTL eviction and to try and correlate duplicate messages using that new queue - WDYT?
@hugebdu what were the log messages right before the error occurred? How easily can you replicate it? Can you describe that process?
How do you restart them? Which instances trigger the error - those that keep running or those that are being restarted?
Those are docker. start
and stop
will do.
I think I've seen it both in those that did the restart and those that didn't move.
@hugebdu How many Kafka servers are running? Is it single topic subscription or multiple topics?
Three brokers in kafka cluster. Single topic that is
"Socket timeout" - do you stop Kafka servers (brokers)?
Nope
I guess a consumer instance sends a metadataRequest
that timeouts for some reason (there is definitely something with the network connection going on), that correlationId
request (and its promise) is then removed from this.queue
but a moment later the response is received from Kafka brokers - and that triggers "wrong correlationId error".
As I mentioned before, we run our node processes in docker and we do suspect some networking problems there (planning to upgrade to latest docker - there were many network stack related fixes).
Meanwhile WDYT about wrapping this.queue[correlactionId]
with try...catch...log
?
Meanwhile we're preparing some sanity & stress test, to see there's no messages lost, including on those errors, which currently cause node process to restart (via node cluster).
I we can safely ignore "wrong correlationId error" in this case. No need in try/catch I think, just drop that packet and log a warning message. I will update the code.
Great, will wait for the fix. Thanks a lot Oleksiy! Great package (so far the best for node out there i could find) add the attitude!
I've published it as 3.2.2. It silently drops that packet without log message (this has to be changed later when it comes to changing logging globally in no-kafka).
Awesome, will upgrade tomorrow. Thanks a lot
We use
no-kafka
at my workplace (currently using version 3.0.1), and we are seeing a strange issue:The issue happens regularly, 10-50 times per hour, in all deployed instances of our service (I have not been able to reproduce this locally, but my local setup only uses 1 broker and a replication factor of 1). I am curious to know if the package creators have any idea why this might be happening, and what the impact might be.
From looking at the code, it seems that either a kafka message is being sent which the library does not expect a response for but is receiving one, OR kafka is sending multiple responses for the same message.