oleksiyk / kafka

Apache Kafka 0.9 client for Node
MIT License
297 stars 85 forks source link

Connect retry limit #131

Closed koresar closed 7 years ago

koresar commented 8 years ago

The following works this way for Producers: 1) The driver continuously tries to (re)connect to Kafka when calling producer.send(). 2) After every retry attempt it rejects an error. Typically

{ [NoKafkaConnectionError: Error: connect ECONNREFUSED]
  name: 'NoKafkaConnectionError',
  server: '127.0.0.1:9092',
  message: 'Error: connect ECONNREFUSED' }

3) As soon as connectivity is back everything goes back to normal.

This new retry should be used carefully with Consumers though. Because after the retry number of attempts it will reject and never attempt to reconnect.

Producer Example 1. Reject an error every ~3 seconds when you are trying to producer.send()

reconnectionDelay: {retries: 3}

Producer Example 2. These configurations make no changes from the current no-kafka behavior:

reconnectionDelay: undefiend
reconnectionDelay: {retries: 0}

Consumer Example 1. Reject an error after ~3 seconds and never attempt to reconnect.

reconnectionDelay: {retries: 3}

Consumer Example 2. Trying to reconnect forever. No changes from the current no-kafka behavior:

reconnectionDelay: undefiend
reconnectionDelay: {retries: 0}

I tested it back and forth on my 2 node.js processes. Works as expected.

If you think we should give this a go I'll be happy to continue working on this PR (tests, README, etc.)

oleksiyk commented 8 years ago

While this solves the problem for producers it is really useless for consumers. It even makes the behaviour of consumers much hard to document and understand (and so to use it correctly) because it will depend on a consumer state/phase where the error happened.

I'm not yet sure about proper solution though...

koresar commented 8 years ago

As to my test the changes are rather simple. Easy to explain. Easy to understand.

Maybe there should be two clients? One for producers, another for consumers?

oleksiyk commented 8 years ago

Your changes will only affect the behaviour of consumer in init call and sometimes in subscribe call. It won't affect fetch call and won't change the (already rather complex) reconnection behaviour of GroupConsumer. So while the users of the library will see global retries option it just won't work in some cases which will lead to confusion and more issues/pull requests.

Proposal: The problem initially reported in #73 can be solved by using Promise.race([]) with timeout. Either embedded in library init call or simply in user code.

The problem with producer send can be solved the same way by providing a timeout option with each task/batch. There is already options.retries.attempts and options.retries.delay so there can be options.retries.timeout.

No other changes to reconnection behaviour.

koresar commented 8 years ago

The problem with producer send can be solved the same way by providing a timeout option with each task/batch. There is already options.retries.attempts and options.retries.delay so there can be options.retries.timeout.

Unfortunately, when there is no connectivity with Kafka the attempts and the delay are never used. The code never gets to the "retrying" part, because the Client is waiting for a Kafka connection indefinitely. And this is quite frustrating and unexpected. So, I'm afraid the new timeout option would be as useless as the retry-attempts options.

Also, if we introduce timeout in addition to delay and attempts the users of the driver will get confused very much.

Can you please elaborate on how you see the timeout feature work exactly?

Thanks!

oleksiyk commented 8 years ago

And this is quite frustrating and unexpected.

You see, this is exactly what I need. So this is not unexpected at least for one person :) I needed my library to always re-establish network connection. I can't easily restart the application code but I can easily restore Kafka connection or forward it to replica. This was intended behaviour.

Can you please elaborate on how you see the timeout feature work exactly?

If the message or batch of messages were not sent within timeout milliseconds then the promise associated with this message/batch will be rejected.

koresar commented 7 years ago

I needed my library to always re-establish network connection.

Let's leave connection re-establish aside and talk about message producing failures.

When a message is over a limit (1MB default) then Kafka server force closes connection. no-kafka tries to reconnect, and then sends the same >1MB message again. And again. And again... As the result our producer service eventually dies with OutOfMemory.

no-kafka should handle that case (force closing connection by Kafka server) gracefully.

@oleksiyk any suggestions?

oleksiyk commented 7 years ago

When a message is over a limit (1MB default) then Kafka server force closes connection.

When a message is over limit Kafka server will reply with MessageSizeTooLarge. And producer.send will immediately return the following result structure

[ { topic: 'kafka-test-topic',
    partition: 1,
    error:
     { [KafkaError: The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.]
       name: 'KafkaError',
       code: 'MessageSizeTooLarge',
       message: 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.' },
    offset: -1 },
  [length]: 1 ]
oleksiyk commented 7 years ago

Other than that, there is retries.attempts option that handles NoKafkaConnectionError

koresar commented 7 years ago

Actually Kafka brokers force closes the client connection on large messages. Here is the Kafka broker logs we see from time to time:

[2017-04-01 00:31:21,931] WARN Unexpected error from /10.200.21.127; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 11957258 larger than 1048576)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
        at kafka.network.Processor.poll(SocketServer.scala:472)
        at kafka.network.Processor.run(SocketServer.scala:412)
        at java.lang.Thread.run(Thread.java:745)

Connections are being established fine. No issue here.

Looks like I need to check the code of how we handle resend.

Thanks @oleksiyk

oleksiyk commented 7 years ago

This might be something else, I did a quick test for message size and it was handled as expected:

'use strict';

var kafka = require('no-kafka');
var producer = new kafka.Producer();

return producer.init()
.then(function () {
    return producer.send({
        topic: 'kafka-test-topic',
        partition: 1,
        message: {
            key: 'my-key',
            value: new Buffer(2 * 1024 * 1024)
        }
    });
})
.then(function (r) {
    console.log(require('util').inspect(r, true, 10, true));
    return producer.end();
})
.catch(function (err) {
    console.error(err);
    process.exit(255);
});