Open vykimo opened 5 years ago
Is there any chance you could write a small example that reproduces the issue? If it's creating new connections and never closing them, it would make sense that you would eventually exhaust all your ports, but this shouldn't happen in the first place.
Just create a consumer that produce a message to topic-test when it consumes one from topic-test.
You'll see number of ports growing with command (win) : netstat -n | wc -l
And error will come approximately at 25k ports openned.
await consumer.connect()
await consumer.subscribe({ topic: 'topic-test', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
let producer = kafka.producer({
maxInFlightRequests: 1,
idempotent: true,
allowAutoTopicCreation: true,
})
await producer.connect();
await producer.send({ topic: 'topic-test', messages: [{
key: 'H',
value: 'Hello user ! '
}] }).catch(console.error);
await producer.disconnect();
},
}).catch(console.error)
I was indeed able to reproduce it on my machine as well. Here's what a snippet of the debug log looks like (basically one iteration of the fetch loop):
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.404Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 out of 1 topics","topics":["topic-test-repro-360"],"activeTopics":["topic-test-repro-360"],"pausedTopics":[]}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.405Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":157,"expectResponse":true,"size":106}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.413Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":157,"size":326,"data":"[filtered]"}
- topic-test-repro-360[0 | 300] / 1557746668345 a333a6bd-8162-48fe-9778-69dfabe943a2#Message from 0:298
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.413Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"10.3.220.236:9094","clientId":"repro-360","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.438Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":0,"expectResponse":true,"size":23}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.439Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 2)","broker":"10.3.220.236:9094","clientId":"repro-360","error":"The version of API is not supported","correlationId":0,"payload":{"type":"Buffer","data":"[filtered]"}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.439Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":1,"expectResponse":true,"size":23}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.442Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":1,"size":272,"data":{"errorCode":0,"apiVersions":[{"apiKey":0,"minVersion":0,"maxVersion":5},{"apiKey":1,"minVersion":0,"maxVersion":7},{"apiKey":2,"minVe
rsion":0,"maxVersion":2},{"apiKey":3,"minVersion":0,"maxVersion":5},{"apiKey":4,"minVersion":0,"maxVersion":1},{"apiKey":5,"minVersion":0,"maxVersion":0},{"apiKey":6,"minVersion":0,"maxVersion":4},{"apiKey":7,"minVersion":0,"maxVersion":1},{"apiKey":8,"minVersion":0,"maxVersion":3},{"apiKey":9,"minVersion":0,"maxVersion":3},{"apiKey":10,"minVersion":0,"maxVersio
n":1},{"apiKey":11,"minVersion":0,"maxVersion":2},{"apiKey":12,"minVersion":0,"maxVersion":1},{"apiKey":13,"minVersion":0,"maxVersion":1},{"apiKey":14,"minVersion":0,"maxVersion":1},{"apiKey":15,"minVersion":0,"maxVersion":1},{"apiKey":16,"minVersion":0,"maxVersion":1},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":1},{"apiK
ey":19,"minVersion":0,"maxVersion":2},{"apiKey":20,"minVersion":0,"maxVersion":1},{"apiKey":21,"minVersion":0,"maxVersion":0},{"apiKey":22,"minVersion":0,"maxVersion":0},{"apiKey":23,"minVersion":0,"maxVersion":0},{"apiKey":24,"minVersion":0,"maxVersion":0},{"apiKey":25,"minVersion":0,"maxVersion":0},{"apiKey":26,"minVersion":0,"maxVersion":0},{"apiKey":27,"minV
ersion":0,"maxVersion":0},{"apiKey":28,"minVersion":0,"maxVersion":0},{"apiKey":29,"minVersion":0,"maxVersion":0},{"apiKey":30,"minVersion":0,"maxVersion":0},{"apiKey":31,"minVersion":0,"maxVersion":0},{"apiKey":32,"minVersion":0,"maxVersion":1},{"apiKey":33,"minVersion":0,"maxVersion":0},{"apiKey":34,"minVersion":0,"maxVersion":0},{"apiKey":35,"minVersion":0,"m
axVersion":0},{"apiKey":36,"minVersion":0,"maxVersion":0},{"apiKey":37,"minVersion":0,"maxVersion":0},{"apiKey":38,"minVersion":0,"maxVersion":0},{"apiKey":39,"minVersion":0,"maxVersion":0},{"apiKey":40,"minVersion":0,"maxVersion":0},{"apiKey":41,"minVersion":0,"maxVersion":0},{"apiKey":42,"minVersion":0,"maxVersion":0}],"throttleTime":0}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.443Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"10.3.220.236:9094","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.443Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":2,"expectResponse":true,"size":30}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.445Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":2,"size":47,"data":{"errorCode":0,"enabledMechanisms":["PLAIN","SCRAM-SHA-512","SCRAM-SHA-256"]}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.445Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"10.3.220.236:9094"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.445Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":3,"expectResponse":true,"size":41}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.453Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":3,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.453Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"10.3.220.236:9094"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.454Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":4,"expectResponse":true,"size":50}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.460Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":4,"size":175,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"10.3.220.236","port":9100,"rack":null},{"nodeId":1,"host":"10.3.220.236","port":9097,"rack":null},{"nodeId":0,"host":"10.3.220.236","port":9094,"rack":null}],"clusterId":"RfHiX6R6SM2vUnYCulmGbw","controllerId":0,"topicMetadata":[{"topicErrorCode":0,"topic":"topic-test-repro-360","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":0,"replicas":[0],"isr":[0],"offlineReplicas":[]}]}]}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.460Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":5,"expectResponse":true,"size":191}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.464Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":5,"size":68,"data":{"topics":[{"topicName":"topic-test-repro-360","partitions":[{"partition":0,"errorCode":0,"baseOffset":"302","logAppendTime":"-1","logStartOffset":"0"}]}],"throttleTime":0}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.464Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"10.3.220.236:9094","clientId":"repro-360"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.464Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"10.3.220.236:9094","clientId":"repro-360"}
- topic-test-repro-360[0 | 301] / 1557746668392 ec79997a-53d4-4548-9c37-a9dbd3e6c242#Message from 0:299
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.464Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"10.3.220.236:9094","clientId":"repro-360","ssl":true,"sasl":true}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.482Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 2)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":0,"expectResponse":true,"size":23}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.483Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 2)","broker":"10.3.220.236:9094","clientId":"repro-360","error":"The version of API is not supported","correlationId":0,"payload":{"type":"Buffer","data":"[filtered]"}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.484Z","logger":"kafkajs","message":"[Connection] Request ApiVersions(key: 18, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":1,"expectResponse":true,"size":23}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.485Z","logger":"kafkajs","message":"[Connection] Response ApiVersions(key: 18, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":1,"size":272,"data":{"errorCode":0,"apiVersions":[{"apiKey":0,"minVersion":0,"maxVersion":5},{"apiKey":1,"minVersion":0,"maxVersion":7},{"apiKey":2,"minVersion":0,"maxVersion":2},{"apiKey":3,"minVersion":0,"maxVersion":5},{"apiKey":4,"minVersion":0,"maxVersion":1},{"apiKey":5,"minVersion":0,"maxVersion":0},{"apiKey":6,"minVersion":0,"maxVersion":4},{"apiKey":7,"minVersion":0,"maxVersion":1},{"apiKey":8,"minVersion":0,"maxVersion":3},{"apiKey":9,"minVersion":0,"maxVersion":3},{"apiKey":10,"minVersion":0,"maxVersion":1},{"apiKey":11,"minVersion":0,"maxVersion":2},{"apiKey":12,"minVersion":0,"maxVersion":1},{"apiKey":13,"minVersion":0,"maxVersion":1},{"apiKey":14,"minVersion":0,"maxVersion":1},{"apiKey":15,"minVersion":0,"maxVersion":1},{"apiKey":16,"minVersion":0,"maxVersion":1},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":1},{"apiKey":19,"minVersion":0,"maxVersion":2},{"apiKey":20,"minVersion":0,"maxVersion":1},{"apiKey":21,"minVersion":0,"maxVersion":0},{"apiKey":22,"minVersion":0,"maxVersion":0},{"apiKey":23,"minVersion":0,"maxVersion":0},{"apiKey":24,"minVersion":0,"maxVersion":0},{"apiKey":25,"minVersion":0,"maxVersion":0},{"apiKey":26,"minVersion":0,"maxVersion":0},{"apiKey":27,"minVersion":0,"maxVersion":0},{"apiKey":28,"minVersion":0,"maxVersion":0},{"apiKey":29,"minVersion":0,"maxVersion":0},{"apiKey":30,"minVersion":0,"maxVersion":0},{"apiKey":31,"minVersion":0,"maxVersion":0},{"apiKey":32,"minVersion":0,"maxVersion":1},{"apiKey":33,"minVersion":0,"maxVersion":0},{"apiKey":34,"minVersion":0,"maxVersion":0},{"apiKey":35,"minVersion":0,"maxVersion":0},{"apiKey":36,"minVersion":0,"maxVersion":0},{"apiKey":37,"minVersion":0,"maxVersion":0},{"apiKey":38,"minVersion":0,"maxVersion":0},{"apiKey":39,"minVersion":0,"maxVersion":0},{"apiKey":40,"minVersion":0,"maxVersion":0},{"apiKey":41,"minVersion":0,"maxVersion":0},{"apiKey":42,"minVersion":0,"maxVersion":0}],"throttleTime":0}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.485Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"10.3.220.236:9094","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.485Z","logger":"kafkajs","message":"[Connection] Request SaslHandshake(key: 17, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":2,"expectResponse":true,"size":30}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.487Z","logger":"kafkajs","message":"[Connection] Response SaslHandshake(key: 17, version: 1)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":2,"size":47,"data":{"errorCode":0,"enabledMechanisms":["PLAIN","SCRAM-SHA-512","SCRAM-SHA-256"]}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.487Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] Authenticate with SASL PLAIN","broker":"10.3.220.236:9094"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.487Z","logger":"kafkajs","message":"[Connection] Request SaslAuthenticate(key: 36, version: 0)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":3,"expectResponse":true,"size":41}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.489Z","logger":"kafkajs","message":"[Connection] Response SaslAuthenticate(key: 36, version: 0)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":3,"size":12,"data":{"errorCode":0,"errorMessage":null,"authBytes":{"type":"Buffer","data":[0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.489Z","logger":"kafkajs","message":"[SASLPlainAuthenticator] SASL PLAIN authentication successful","broker":"10.3.220.236:9094"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.490Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":4,"expectResponse":true,"size":50}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.497Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":4,"size":175,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"10.3.220.236","port":9100,"rack":null},{"nodeId":1,"host":"10.3.220.236","port":9097,"rack":null},{"nodeId":0,"host":"10.3.220.236","port":9094,"rack":null}],"clusterId":"RfHiX6R6SM2vUnYCulmGbw","controllerId":0,"topicMetadata":[{"topicErrorCode":0,"topic":"topic-test-repro-360","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":0,"replicas":[0],"isr":[0],"offlineReplicas":[]}]}]}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.501Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":5,"expectResponse":true,"size":191}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.505Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 5)","broker":"10.3.220.236:9094","clientId":"repro-360","correlationId":5,"size":68,"data":{"topics":[{"topicName":"topic-test-repro-360","partitions":[{"partition":0,"errorCode":0,"baseOffset":"303","logAppendTime":"-1","logStartOffset":"0"}]}],"throttleTime":0}}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.505Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"10.3.220.236:9094","clientId":"repro-360"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.505Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"10.3.220.236:9094","clientId":"repro-360"}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.505Z","logger":"kafkajs","message":"[Connection] Request OffsetCommit(key: 8, version: 3)","broker":"10.3.220.236:9100","clientId":"repro-360","correlationId":162,"expectResponse":true,"size":151}
{"level":"DEBUG","timestamp":"2019-05-13T11:24:28.512Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 3)","broker":"10.3.220.236:9100","clientId":"repro-360","correlationId":162,"size":44,"data":{"throttleTime":0,"responses":[{"topic":"topic-test-repro-360","partitions":[{"partition":0,"errorCode":0}]}]}}
If I don't create a producer for each message, and instead re-use the same one, I don't get the same problem. However, since you are disconnecting the producer after using it, it should be closing the socket, so it shouldn't leak (we can see in the debug logs that it is disconnecting) - so there must be some kind of bug in how we work with the sockets.
However, since you are disconnecting the producer after using it, it should be closing the socket, so it shouldn't leak (we can see in the debug logs that it is disconnecting)
Yes, That's my point. I decided to create a new producer every time in order to guarantee idempotency. But when disconnecting, socket must be closed and not leak.
@vykimo I was on vacation, so I missed some discussion about the issue you opened before. If I remember correctly, you were calling producer.send
without waiting, and that's why the idempotent producer failed for you. However, in your example, you await the send operation. Is your real code a lot different than the example? And if you await, can't you use a single producer?
Yes, my code is way different that this one. You advise me to create a kind of queue and send all my messages sequentially with await. But it will slow down a lot my process in case of parallization, isn't it?
And it does not solve the issue, sockets does not close when producer disconnect(). Or at least not enough faster. So does it exist a mean to reduce closing delay or does it exist a way to reuse openned sockets? (as HTTP Keep Alive Agent does)
Hi everyone, I noticed when I start many consumption/productions of message. After +4000 messages, I get systematically this error :
It loops on this error until I stop the program. When I restart it, the problem disappear... until it reach again ~4-5k messages.
First I was thinking it was because my 1 broker fails because of the load, so I added 2 additionnal brokers. But even with 3 brokers I get the error for the 3 urls !
I think it's related to TCP temporary ports. When I type
netstat -n
I get many and many lines like that :I counted more than 39k lines.
In order to help you, I get similar error when I was calling too much an API : my ports allowed were saturated. I solve it by setting a keepAlive HTTP Agent (with Infinity free sockets). Is that a similar issue with kafkaJs? Maybe you already know how to solve it?
Thanks in advance